1ae8c6e27Sflorian /*
2ae8c6e27Sflorian * util/tube.c - pipe service
3ae8c6e27Sflorian *
4ae8c6e27Sflorian * Copyright (c) 2008, NLnet Labs. All rights reserved.
5ae8c6e27Sflorian *
6ae8c6e27Sflorian * This software is open source.
7ae8c6e27Sflorian *
8ae8c6e27Sflorian * Redistribution and use in source and binary forms, with or without
9ae8c6e27Sflorian * modification, are permitted provided that the following conditions
10ae8c6e27Sflorian * are met:
11ae8c6e27Sflorian *
12ae8c6e27Sflorian * Redistributions of source code must retain the above copyright notice,
13ae8c6e27Sflorian * this list of conditions and the following disclaimer.
14ae8c6e27Sflorian *
15ae8c6e27Sflorian * Redistributions in binary form must reproduce the above copyright notice,
16ae8c6e27Sflorian * this list of conditions and the following disclaimer in the documentation
17ae8c6e27Sflorian * and/or other materials provided with the distribution.
18ae8c6e27Sflorian *
19ae8c6e27Sflorian * Neither the name of the NLNET LABS nor the names of its contributors may
20ae8c6e27Sflorian * be used to endorse or promote products derived from this software without
21ae8c6e27Sflorian * specific prior written permission.
22ae8c6e27Sflorian *
23ae8c6e27Sflorian * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24ae8c6e27Sflorian * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25ae8c6e27Sflorian * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26ae8c6e27Sflorian * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27ae8c6e27Sflorian * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28ae8c6e27Sflorian * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29ae8c6e27Sflorian * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30ae8c6e27Sflorian * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31ae8c6e27Sflorian * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32ae8c6e27Sflorian * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33ae8c6e27Sflorian * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34ae8c6e27Sflorian */
35ae8c6e27Sflorian
36ae8c6e27Sflorian /**
37ae8c6e27Sflorian * \file
38ae8c6e27Sflorian *
39ae8c6e27Sflorian * This file contains pipe service functions.
40ae8c6e27Sflorian */
41ae8c6e27Sflorian #include "config.h"
42ae8c6e27Sflorian #include "util/tube.h"
43ae8c6e27Sflorian #include "util/log.h"
44ae8c6e27Sflorian #include "util/net_help.h"
45ae8c6e27Sflorian #include "util/netevent.h"
46ae8c6e27Sflorian #include "util/fptr_wlist.h"
47ae8c6e27Sflorian #include "util/ub_event.h"
48*d500c338Sflorian #ifdef HAVE_POLL_H
49*d500c338Sflorian #include <poll.h>
50*d500c338Sflorian #endif
51ae8c6e27Sflorian
52ae8c6e27Sflorian #ifndef USE_WINSOCK
53ae8c6e27Sflorian /* on unix */
54ae8c6e27Sflorian
55ae8c6e27Sflorian #ifndef HAVE_SOCKETPAIR
56ae8c6e27Sflorian /** no socketpair() available, like on Minix 3.1.7, use pipe */
57ae8c6e27Sflorian #define socketpair(f, t, p, sv) pipe(sv)
58ae8c6e27Sflorian #endif /* HAVE_SOCKETPAIR */
59ae8c6e27Sflorian
tube_create(void)60ae8c6e27Sflorian struct tube* tube_create(void)
61ae8c6e27Sflorian {
62ae8c6e27Sflorian struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
63ae8c6e27Sflorian int sv[2];
64ae8c6e27Sflorian if(!tube) {
65ae8c6e27Sflorian int err = errno;
66ae8c6e27Sflorian log_err("tube_create: out of memory");
67ae8c6e27Sflorian errno = err;
68ae8c6e27Sflorian return NULL;
69ae8c6e27Sflorian }
70ae8c6e27Sflorian tube->sr = -1;
71ae8c6e27Sflorian tube->sw = -1;
72ae8c6e27Sflorian if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
73ae8c6e27Sflorian int err = errno;
74ae8c6e27Sflorian log_err("socketpair: %s", strerror(errno));
75ae8c6e27Sflorian free(tube);
76ae8c6e27Sflorian errno = err;
77ae8c6e27Sflorian return NULL;
78ae8c6e27Sflorian }
79ae8c6e27Sflorian tube->sr = sv[0];
80ae8c6e27Sflorian tube->sw = sv[1];
81ae8c6e27Sflorian if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
82ae8c6e27Sflorian int err = errno;
83ae8c6e27Sflorian log_err("tube: cannot set nonblocking");
84ae8c6e27Sflorian tube_delete(tube);
85ae8c6e27Sflorian errno = err;
86ae8c6e27Sflorian return NULL;
87ae8c6e27Sflorian }
88ae8c6e27Sflorian return tube;
89ae8c6e27Sflorian }
90ae8c6e27Sflorian
tube_delete(struct tube * tube)91ae8c6e27Sflorian void tube_delete(struct tube* tube)
92ae8c6e27Sflorian {
93ae8c6e27Sflorian if(!tube) return;
94ae8c6e27Sflorian tube_remove_bg_listen(tube);
95ae8c6e27Sflorian tube_remove_bg_write(tube);
96ae8c6e27Sflorian /* close fds after deleting commpoints, to be sure.
97ae8c6e27Sflorian * Also epoll does not like closing fd before event_del */
98ae8c6e27Sflorian tube_close_read(tube);
99ae8c6e27Sflorian tube_close_write(tube);
100ae8c6e27Sflorian free(tube);
101ae8c6e27Sflorian }
102ae8c6e27Sflorian
tube_close_read(struct tube * tube)103ae8c6e27Sflorian void tube_close_read(struct tube* tube)
104ae8c6e27Sflorian {
105ae8c6e27Sflorian if(tube->sr != -1) {
106ae8c6e27Sflorian close(tube->sr);
107ae8c6e27Sflorian tube->sr = -1;
108ae8c6e27Sflorian }
109ae8c6e27Sflorian }
110ae8c6e27Sflorian
tube_close_write(struct tube * tube)111ae8c6e27Sflorian void tube_close_write(struct tube* tube)
112ae8c6e27Sflorian {
113ae8c6e27Sflorian if(tube->sw != -1) {
114ae8c6e27Sflorian close(tube->sw);
115ae8c6e27Sflorian tube->sw = -1;
116ae8c6e27Sflorian }
117ae8c6e27Sflorian }
118ae8c6e27Sflorian
tube_remove_bg_listen(struct tube * tube)119ae8c6e27Sflorian void tube_remove_bg_listen(struct tube* tube)
120ae8c6e27Sflorian {
121ae8c6e27Sflorian if(tube->listen_com) {
122ae8c6e27Sflorian comm_point_delete(tube->listen_com);
123ae8c6e27Sflorian tube->listen_com = NULL;
124ae8c6e27Sflorian }
125ae8c6e27Sflorian free(tube->cmd_msg);
126ae8c6e27Sflorian tube->cmd_msg = NULL;
127ae8c6e27Sflorian }
128ae8c6e27Sflorian
tube_remove_bg_write(struct tube * tube)129ae8c6e27Sflorian void tube_remove_bg_write(struct tube* tube)
130ae8c6e27Sflorian {
131ae8c6e27Sflorian if(tube->res_com) {
132ae8c6e27Sflorian comm_point_delete(tube->res_com);
133ae8c6e27Sflorian tube->res_com = NULL;
134ae8c6e27Sflorian }
135ae8c6e27Sflorian if(tube->res_list) {
136ae8c6e27Sflorian struct tube_res_list* np, *p = tube->res_list;
137ae8c6e27Sflorian tube->res_list = NULL;
138ae8c6e27Sflorian tube->res_last = NULL;
139ae8c6e27Sflorian while(p) {
140ae8c6e27Sflorian np = p->next;
141ae8c6e27Sflorian free(p->buf);
142ae8c6e27Sflorian free(p);
143ae8c6e27Sflorian p = np;
144ae8c6e27Sflorian }
145ae8c6e27Sflorian }
146ae8c6e27Sflorian }
147ae8c6e27Sflorian
148ae8c6e27Sflorian int
tube_handle_listen(struct comm_point * c,void * arg,int error,struct comm_reply * ATTR_UNUSED (reply_info))149ae8c6e27Sflorian tube_handle_listen(struct comm_point* c, void* arg, int error,
150ae8c6e27Sflorian struct comm_reply* ATTR_UNUSED(reply_info))
151ae8c6e27Sflorian {
152ae8c6e27Sflorian struct tube* tube = (struct tube*)arg;
153ae8c6e27Sflorian ssize_t r;
154ae8c6e27Sflorian if(error != NETEVENT_NOERROR) {
155ae8c6e27Sflorian fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
156ae8c6e27Sflorian (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
157ae8c6e27Sflorian return 0;
158ae8c6e27Sflorian }
159ae8c6e27Sflorian
160ae8c6e27Sflorian if(tube->cmd_read < sizeof(tube->cmd_len)) {
161ae8c6e27Sflorian /* complete reading the length of control msg */
162ae8c6e27Sflorian r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
163ae8c6e27Sflorian sizeof(tube->cmd_len) - tube->cmd_read);
164ae8c6e27Sflorian if(r==0) {
165ae8c6e27Sflorian /* error has happened or */
166ae8c6e27Sflorian /* parent closed pipe, must have exited somehow */
167ae8c6e27Sflorian fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
168ae8c6e27Sflorian (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
169ae8c6e27Sflorian tube->listen_arg);
170ae8c6e27Sflorian return 0;
171ae8c6e27Sflorian }
172ae8c6e27Sflorian if(r==-1) {
173ae8c6e27Sflorian if(errno != EAGAIN && errno != EINTR) {
174ae8c6e27Sflorian log_err("rpipe error: %s", strerror(errno));
175ae8c6e27Sflorian }
176ae8c6e27Sflorian /* nothing to read now, try later */
177ae8c6e27Sflorian return 0;
178ae8c6e27Sflorian }
179ae8c6e27Sflorian tube->cmd_read += r;
180ae8c6e27Sflorian if(tube->cmd_read < sizeof(tube->cmd_len)) {
181ae8c6e27Sflorian /* not complete, try later */
182ae8c6e27Sflorian return 0;
183ae8c6e27Sflorian }
184ae8c6e27Sflorian tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
185ae8c6e27Sflorian if(!tube->cmd_msg) {
186ae8c6e27Sflorian log_err("malloc failure");
187ae8c6e27Sflorian tube->cmd_read = 0;
188ae8c6e27Sflorian return 0;
189ae8c6e27Sflorian }
190ae8c6e27Sflorian }
191ae8c6e27Sflorian /* cmd_len has been read, read remainder */
192ae8c6e27Sflorian r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
193ae8c6e27Sflorian tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
194ae8c6e27Sflorian if(r==0) {
195ae8c6e27Sflorian /* error has happened or */
196ae8c6e27Sflorian /* parent closed pipe, must have exited somehow */
197ae8c6e27Sflorian fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
198ae8c6e27Sflorian (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
199ae8c6e27Sflorian tube->listen_arg);
200ae8c6e27Sflorian return 0;
201ae8c6e27Sflorian }
202ae8c6e27Sflorian if(r==-1) {
203ae8c6e27Sflorian /* nothing to read now, try later */
204ae8c6e27Sflorian if(errno != EAGAIN && errno != EINTR) {
205ae8c6e27Sflorian log_err("rpipe error: %s", strerror(errno));
206ae8c6e27Sflorian }
207ae8c6e27Sflorian return 0;
208ae8c6e27Sflorian }
209ae8c6e27Sflorian tube->cmd_read += r;
210ae8c6e27Sflorian if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
211ae8c6e27Sflorian /* not complete, try later */
212ae8c6e27Sflorian return 0;
213ae8c6e27Sflorian }
214ae8c6e27Sflorian tube->cmd_read = 0;
215ae8c6e27Sflorian
216ae8c6e27Sflorian fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
217ae8c6e27Sflorian (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
218ae8c6e27Sflorian NETEVENT_NOERROR, tube->listen_arg);
219ae8c6e27Sflorian /* also frees the buf */
220ae8c6e27Sflorian tube->cmd_msg = NULL;
221ae8c6e27Sflorian return 0;
222ae8c6e27Sflorian }
223ae8c6e27Sflorian
224ae8c6e27Sflorian int
tube_handle_write(struct comm_point * c,void * arg,int error,struct comm_reply * ATTR_UNUSED (reply_info))225ae8c6e27Sflorian tube_handle_write(struct comm_point* c, void* arg, int error,
226ae8c6e27Sflorian struct comm_reply* ATTR_UNUSED(reply_info))
227ae8c6e27Sflorian {
228ae8c6e27Sflorian struct tube* tube = (struct tube*)arg;
229ae8c6e27Sflorian struct tube_res_list* item = tube->res_list;
230ae8c6e27Sflorian ssize_t r;
231ae8c6e27Sflorian if(error != NETEVENT_NOERROR) {
232ae8c6e27Sflorian log_err("tube_handle_write net error %d", error);
233ae8c6e27Sflorian return 0;
234ae8c6e27Sflorian }
235ae8c6e27Sflorian
236ae8c6e27Sflorian if(!item) {
237ae8c6e27Sflorian comm_point_stop_listening(c);
238ae8c6e27Sflorian return 0;
239ae8c6e27Sflorian }
240ae8c6e27Sflorian
241ae8c6e27Sflorian if(tube->res_write < sizeof(item->len)) {
242ae8c6e27Sflorian r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
243ae8c6e27Sflorian sizeof(item->len) - tube->res_write);
244ae8c6e27Sflorian if(r == -1) {
245ae8c6e27Sflorian if(errno != EAGAIN && errno != EINTR) {
246ae8c6e27Sflorian log_err("wpipe error: %s", strerror(errno));
247ae8c6e27Sflorian }
248ae8c6e27Sflorian return 0; /* try again later */
249ae8c6e27Sflorian }
250ae8c6e27Sflorian if(r == 0) {
251ae8c6e27Sflorian /* error on pipe, must have exited somehow */
252ae8c6e27Sflorian /* cannot signal this to pipe user */
253ae8c6e27Sflorian return 0;
254ae8c6e27Sflorian }
255ae8c6e27Sflorian tube->res_write += r;
256ae8c6e27Sflorian if(tube->res_write < sizeof(item->len))
257ae8c6e27Sflorian return 0;
258ae8c6e27Sflorian }
259ae8c6e27Sflorian r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
260ae8c6e27Sflorian item->len - (tube->res_write - sizeof(item->len)));
261ae8c6e27Sflorian if(r == -1) {
262ae8c6e27Sflorian if(errno != EAGAIN && errno != EINTR) {
263ae8c6e27Sflorian log_err("wpipe error: %s", strerror(errno));
264ae8c6e27Sflorian }
265ae8c6e27Sflorian return 0; /* try again later */
266ae8c6e27Sflorian }
267ae8c6e27Sflorian if(r == 0) {
268ae8c6e27Sflorian /* error on pipe, must have exited somehow */
269ae8c6e27Sflorian /* cannot signal this to pipe user */
270ae8c6e27Sflorian return 0;
271ae8c6e27Sflorian }
272ae8c6e27Sflorian tube->res_write += r;
273ae8c6e27Sflorian if(tube->res_write < sizeof(item->len) + item->len)
274ae8c6e27Sflorian return 0;
275ae8c6e27Sflorian /* done this result, remove it */
276ae8c6e27Sflorian free(item->buf);
277ae8c6e27Sflorian item->buf = NULL;
278ae8c6e27Sflorian tube->res_list = tube->res_list->next;
279ae8c6e27Sflorian free(item);
280ae8c6e27Sflorian if(!tube->res_list) {
281ae8c6e27Sflorian tube->res_last = NULL;
282ae8c6e27Sflorian comm_point_stop_listening(c);
283ae8c6e27Sflorian }
284ae8c6e27Sflorian tube->res_write = 0;
285ae8c6e27Sflorian return 0;
286ae8c6e27Sflorian }
287ae8c6e27Sflorian
tube_write_msg(struct tube * tube,uint8_t * buf,uint32_t len,int nonblock)288ae8c6e27Sflorian int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
289ae8c6e27Sflorian int nonblock)
290ae8c6e27Sflorian {
291ae8c6e27Sflorian ssize_t r, d;
292ae8c6e27Sflorian int fd = tube->sw;
293ae8c6e27Sflorian
294ae8c6e27Sflorian /* test */
295ae8c6e27Sflorian if(nonblock) {
296ae8c6e27Sflorian r = write(fd, &len, sizeof(len));
297ae8c6e27Sflorian if(r == -1) {
298ae8c6e27Sflorian if(errno==EINTR || errno==EAGAIN)
299ae8c6e27Sflorian return -1;
300ae8c6e27Sflorian log_err("tube msg write failed: %s", strerror(errno));
301ae8c6e27Sflorian return -1; /* can still continue, perhaps */
302ae8c6e27Sflorian }
303ae8c6e27Sflorian } else r = 0;
304ae8c6e27Sflorian if(!fd_set_block(fd))
305ae8c6e27Sflorian return 0;
306ae8c6e27Sflorian /* write remainder */
307ae8c6e27Sflorian d = r;
308ae8c6e27Sflorian while(d != (ssize_t)sizeof(len)) {
309ae8c6e27Sflorian if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
310ae8c6e27Sflorian if(errno == EAGAIN)
311ae8c6e27Sflorian continue; /* temporarily unavail: try again*/
312ae8c6e27Sflorian log_err("tube msg write failed: %s", strerror(errno));
313ae8c6e27Sflorian (void)fd_set_nonblock(fd);
314ae8c6e27Sflorian return 0;
315ae8c6e27Sflorian }
316ae8c6e27Sflorian d += r;
317ae8c6e27Sflorian }
318ae8c6e27Sflorian d = 0;
319ae8c6e27Sflorian while(d != (ssize_t)len) {
320ae8c6e27Sflorian if((r=write(fd, buf+d, len-d)) == -1) {
321ae8c6e27Sflorian if(errno == EAGAIN)
322ae8c6e27Sflorian continue; /* temporarily unavail: try again*/
323ae8c6e27Sflorian log_err("tube msg write failed: %s", strerror(errno));
324ae8c6e27Sflorian (void)fd_set_nonblock(fd);
325ae8c6e27Sflorian return 0;
326ae8c6e27Sflorian }
327ae8c6e27Sflorian d += r;
328ae8c6e27Sflorian }
329ae8c6e27Sflorian if(!fd_set_nonblock(fd))
330ae8c6e27Sflorian return 0;
331ae8c6e27Sflorian return 1;
332ae8c6e27Sflorian }
333ae8c6e27Sflorian
tube_read_msg(struct tube * tube,uint8_t ** buf,uint32_t * len,int nonblock)334ae8c6e27Sflorian int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
335ae8c6e27Sflorian int nonblock)
336ae8c6e27Sflorian {
337ae8c6e27Sflorian ssize_t r, d;
338ae8c6e27Sflorian int fd = tube->sr;
339ae8c6e27Sflorian
340ae8c6e27Sflorian /* test */
341ae8c6e27Sflorian *len = 0;
342ae8c6e27Sflorian if(nonblock) {
343ae8c6e27Sflorian r = read(fd, len, sizeof(*len));
344ae8c6e27Sflorian if(r == -1) {
345ae8c6e27Sflorian if(errno==EINTR || errno==EAGAIN)
346ae8c6e27Sflorian return -1;
347ae8c6e27Sflorian log_err("tube msg read failed: %s", strerror(errno));
348ae8c6e27Sflorian return -1; /* we can still continue, perhaps */
349ae8c6e27Sflorian }
350ae8c6e27Sflorian if(r == 0) /* EOF */
351ae8c6e27Sflorian return 0;
352ae8c6e27Sflorian } else r = 0;
353ae8c6e27Sflorian if(!fd_set_block(fd))
354ae8c6e27Sflorian return 0;
355ae8c6e27Sflorian /* read remainder */
356ae8c6e27Sflorian d = r;
357ae8c6e27Sflorian while(d != (ssize_t)sizeof(*len)) {
358ae8c6e27Sflorian if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
359ae8c6e27Sflorian log_err("tube msg read failed: %s", strerror(errno));
360ae8c6e27Sflorian (void)fd_set_nonblock(fd);
361ae8c6e27Sflorian return 0;
362ae8c6e27Sflorian }
363ae8c6e27Sflorian if(r == 0) /* EOF */ {
364ae8c6e27Sflorian (void)fd_set_nonblock(fd);
365ae8c6e27Sflorian return 0;
366ae8c6e27Sflorian }
367ae8c6e27Sflorian d += r;
368ae8c6e27Sflorian }
369a1a7ba80Sflorian if (*len >= 65536*2) {
370a1a7ba80Sflorian log_err("tube msg length %u is too big", (unsigned)*len);
371a1a7ba80Sflorian (void)fd_set_nonblock(fd);
372a1a7ba80Sflorian return 0;
373a1a7ba80Sflorian }
374ae8c6e27Sflorian *buf = (uint8_t*)malloc(*len);
375ae8c6e27Sflorian if(!*buf) {
376ae8c6e27Sflorian log_err("tube read out of memory");
377ae8c6e27Sflorian (void)fd_set_nonblock(fd);
378ae8c6e27Sflorian return 0;
379ae8c6e27Sflorian }
380ae8c6e27Sflorian d = 0;
381ae8c6e27Sflorian while(d < (ssize_t)*len) {
382ae8c6e27Sflorian if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
383ae8c6e27Sflorian log_err("tube msg read failed: %s", strerror(errno));
384ae8c6e27Sflorian (void)fd_set_nonblock(fd);
385ae8c6e27Sflorian free(*buf);
386ae8c6e27Sflorian return 0;
387ae8c6e27Sflorian }
388ae8c6e27Sflorian if(r == 0) { /* EOF */
389ae8c6e27Sflorian (void)fd_set_nonblock(fd);
390ae8c6e27Sflorian free(*buf);
391ae8c6e27Sflorian return 0;
392ae8c6e27Sflorian }
393ae8c6e27Sflorian d += r;
394ae8c6e27Sflorian }
395ae8c6e27Sflorian if(!fd_set_nonblock(fd)) {
396ae8c6e27Sflorian free(*buf);
397ae8c6e27Sflorian return 0;
398ae8c6e27Sflorian }
399ae8c6e27Sflorian return 1;
400ae8c6e27Sflorian }
401ae8c6e27Sflorian
402*d500c338Sflorian /** perform poll() on the fd */
403ae8c6e27Sflorian static int
pollit(int fd,struct timeval * t)404ae8c6e27Sflorian pollit(int fd, struct timeval* t)
405ae8c6e27Sflorian {
406*d500c338Sflorian struct pollfd fds;
407*d500c338Sflorian int pret;
408*d500c338Sflorian int msec = -1;
409*d500c338Sflorian memset(&fds, 0, sizeof(fds));
410*d500c338Sflorian fds.fd = fd;
411*d500c338Sflorian fds.events = POLLIN | POLLERR | POLLHUP;
412ae8c6e27Sflorian #ifndef S_SPLINT_S
413*d500c338Sflorian if(t)
414*d500c338Sflorian msec = t->tv_sec*1000 + t->tv_usec/1000;
415ae8c6e27Sflorian #endif
416*d500c338Sflorian
417*d500c338Sflorian pret = poll(&fds, 1, msec);
418*d500c338Sflorian
419*d500c338Sflorian if(pret == -1)
420ae8c6e27Sflorian return 0;
421*d500c338Sflorian if(pret != 0)
422*d500c338Sflorian return 1;
423*d500c338Sflorian return 0;
424ae8c6e27Sflorian }
425ae8c6e27Sflorian
tube_poll(struct tube * tube)426ae8c6e27Sflorian int tube_poll(struct tube* tube)
427ae8c6e27Sflorian {
428ae8c6e27Sflorian struct timeval t;
429ae8c6e27Sflorian memset(&t, 0, sizeof(t));
430ae8c6e27Sflorian return pollit(tube->sr, &t);
431ae8c6e27Sflorian }
432ae8c6e27Sflorian
tube_wait(struct tube * tube)433ae8c6e27Sflorian int tube_wait(struct tube* tube)
434ae8c6e27Sflorian {
435ae8c6e27Sflorian return pollit(tube->sr, NULL);
436ae8c6e27Sflorian }
437ae8c6e27Sflorian
tube_wait_timeout(struct tube * tube,int msec)4385c45b740Sflorian int tube_wait_timeout(struct tube* tube, int msec)
4395c45b740Sflorian {
440*d500c338Sflorian int ret = 0;
441*d500c338Sflorian
4425c45b740Sflorian while(1) {
443*d500c338Sflorian struct pollfd fds;
444*d500c338Sflorian memset(&fds, 0, sizeof(fds));
445*d500c338Sflorian
446*d500c338Sflorian fds.fd = tube->sr;
447*d500c338Sflorian fds.events = POLLIN | POLLERR | POLLHUP;
448*d500c338Sflorian ret = poll(&fds, 1, msec);
449*d500c338Sflorian
450*d500c338Sflorian if(ret == -1) {
4515c45b740Sflorian if(errno == EAGAIN || errno == EINTR)
4525c45b740Sflorian continue;
4535c45b740Sflorian return -1;
4545c45b740Sflorian }
4555c45b740Sflorian break;
4565c45b740Sflorian }
457*d500c338Sflorian
458*d500c338Sflorian if(ret != 0)
459*d500c338Sflorian return 1;
460*d500c338Sflorian return 0;
4615c45b740Sflorian }
4625c45b740Sflorian
tube_read_fd(struct tube * tube)463ae8c6e27Sflorian int tube_read_fd(struct tube* tube)
464ae8c6e27Sflorian {
465ae8c6e27Sflorian return tube->sr;
466ae8c6e27Sflorian }
467ae8c6e27Sflorian
tube_setup_bg_listen(struct tube * tube,struct comm_base * base,tube_callback_type * cb,void * arg)468ae8c6e27Sflorian int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
469ae8c6e27Sflorian tube_callback_type* cb, void* arg)
470ae8c6e27Sflorian {
471ae8c6e27Sflorian tube->listen_cb = cb;
472ae8c6e27Sflorian tube->listen_arg = arg;
473ae8c6e27Sflorian if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
474ae8c6e27Sflorian 0, tube_handle_listen, tube))) {
475ae8c6e27Sflorian int err = errno;
476ae8c6e27Sflorian log_err("tube_setup_bg_l: commpoint creation failed");
477ae8c6e27Sflorian errno = err;
478ae8c6e27Sflorian return 0;
479ae8c6e27Sflorian }
480ae8c6e27Sflorian return 1;
481ae8c6e27Sflorian }
482ae8c6e27Sflorian
tube_setup_bg_write(struct tube * tube,struct comm_base * base)483ae8c6e27Sflorian int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
484ae8c6e27Sflorian {
485ae8c6e27Sflorian if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
486ae8c6e27Sflorian 1, tube_handle_write, tube))) {
487ae8c6e27Sflorian int err = errno;
488ae8c6e27Sflorian log_err("tube_setup_bg_w: commpoint creation failed");
489ae8c6e27Sflorian errno = err;
490ae8c6e27Sflorian return 0;
491ae8c6e27Sflorian }
492ae8c6e27Sflorian return 1;
493ae8c6e27Sflorian }
494ae8c6e27Sflorian
tube_queue_item(struct tube * tube,uint8_t * msg,size_t len)495ae8c6e27Sflorian int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
496ae8c6e27Sflorian {
497ae8c6e27Sflorian struct tube_res_list* item;
498ae8c6e27Sflorian if(!tube || !tube->res_com) return 0;
499ae8c6e27Sflorian item = (struct tube_res_list*)malloc(sizeof(*item));
500ae8c6e27Sflorian if(!item) {
501ae8c6e27Sflorian free(msg);
502ae8c6e27Sflorian log_err("out of memory for async answer");
503ae8c6e27Sflorian return 0;
504ae8c6e27Sflorian }
505ae8c6e27Sflorian item->buf = msg;
506ae8c6e27Sflorian item->len = len;
507ae8c6e27Sflorian item->next = NULL;
508ae8c6e27Sflorian /* add at back of list, since the first one may be partially written */
509ae8c6e27Sflorian if(tube->res_last)
510ae8c6e27Sflorian tube->res_last->next = item;
511ae8c6e27Sflorian else tube->res_list = item;
512ae8c6e27Sflorian tube->res_last = item;
513ae8c6e27Sflorian if(tube->res_list == tube->res_last) {
514ae8c6e27Sflorian /* first added item, start the write process */
515ae8c6e27Sflorian comm_point_start_listening(tube->res_com, -1, -1);
516ae8c6e27Sflorian }
517ae8c6e27Sflorian return 1;
518ae8c6e27Sflorian }
519ae8c6e27Sflorian
tube_handle_signal(int ATTR_UNUSED (fd),short ATTR_UNUSED (events),void * ATTR_UNUSED (arg))520ae8c6e27Sflorian void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
521ae8c6e27Sflorian void* ATTR_UNUSED(arg))
522ae8c6e27Sflorian {
523ae8c6e27Sflorian log_assert(0);
524ae8c6e27Sflorian }
525ae8c6e27Sflorian
526ae8c6e27Sflorian #else /* USE_WINSOCK */
527ae8c6e27Sflorian /* on windows */
528ae8c6e27Sflorian
529ae8c6e27Sflorian
tube_create(void)530ae8c6e27Sflorian struct tube* tube_create(void)
531ae8c6e27Sflorian {
532ae8c6e27Sflorian /* windows does not have forks like unix, so we only support
533ae8c6e27Sflorian * threads on windows. And thus the pipe need only connect
534ae8c6e27Sflorian * threads. We use a mutex and a list of datagrams. */
535ae8c6e27Sflorian struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
536ae8c6e27Sflorian if(!tube) {
537ae8c6e27Sflorian int err = errno;
538ae8c6e27Sflorian log_err("tube_create: out of memory");
539ae8c6e27Sflorian errno = err;
540ae8c6e27Sflorian return NULL;
541ae8c6e27Sflorian }
542ae8c6e27Sflorian tube->event = WSACreateEvent();
543ae8c6e27Sflorian if(tube->event == WSA_INVALID_EVENT) {
544ae8c6e27Sflorian free(tube);
545ae8c6e27Sflorian log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
546*d500c338Sflorian return NULL;
547ae8c6e27Sflorian }
548ae8c6e27Sflorian if(!WSAResetEvent(tube->event)) {
549ae8c6e27Sflorian log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
550ae8c6e27Sflorian }
551ae8c6e27Sflorian lock_basic_init(&tube->res_lock);
552ae8c6e27Sflorian verbose(VERB_ALGO, "tube created");
553ae8c6e27Sflorian return tube;
554ae8c6e27Sflorian }
555ae8c6e27Sflorian
tube_delete(struct tube * tube)556ae8c6e27Sflorian void tube_delete(struct tube* tube)
557ae8c6e27Sflorian {
558ae8c6e27Sflorian if(!tube) return;
559ae8c6e27Sflorian tube_remove_bg_listen(tube);
560ae8c6e27Sflorian tube_remove_bg_write(tube);
561ae8c6e27Sflorian tube_close_read(tube);
562ae8c6e27Sflorian tube_close_write(tube);
563ae8c6e27Sflorian if(!WSACloseEvent(tube->event))
564ae8c6e27Sflorian log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
565ae8c6e27Sflorian lock_basic_destroy(&tube->res_lock);
566ae8c6e27Sflorian verbose(VERB_ALGO, "tube deleted");
567ae8c6e27Sflorian free(tube);
568ae8c6e27Sflorian }
569ae8c6e27Sflorian
tube_close_read(struct tube * ATTR_UNUSED (tube))570ae8c6e27Sflorian void tube_close_read(struct tube* ATTR_UNUSED(tube))
571ae8c6e27Sflorian {
572ae8c6e27Sflorian verbose(VERB_ALGO, "tube close_read");
573ae8c6e27Sflorian }
574ae8c6e27Sflorian
tube_close_write(struct tube * ATTR_UNUSED (tube))575ae8c6e27Sflorian void tube_close_write(struct tube* ATTR_UNUSED(tube))
576ae8c6e27Sflorian {
577ae8c6e27Sflorian verbose(VERB_ALGO, "tube close_write");
578ae8c6e27Sflorian /* wake up waiting reader with an empty queue */
579ae8c6e27Sflorian if(!WSASetEvent(tube->event)) {
580ae8c6e27Sflorian log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
581ae8c6e27Sflorian }
582ae8c6e27Sflorian }
583ae8c6e27Sflorian
tube_remove_bg_listen(struct tube * tube)584ae8c6e27Sflorian void tube_remove_bg_listen(struct tube* tube)
585ae8c6e27Sflorian {
586ae8c6e27Sflorian verbose(VERB_ALGO, "tube remove_bg_listen");
587ae8c6e27Sflorian ub_winsock_unregister_wsaevent(tube->ev_listen);
588ae8c6e27Sflorian }
589ae8c6e27Sflorian
tube_remove_bg_write(struct tube * tube)590ae8c6e27Sflorian void tube_remove_bg_write(struct tube* tube)
591ae8c6e27Sflorian {
592ae8c6e27Sflorian verbose(VERB_ALGO, "tube remove_bg_write");
593ae8c6e27Sflorian if(tube->res_list) {
594ae8c6e27Sflorian struct tube_res_list* np, *p = tube->res_list;
595ae8c6e27Sflorian tube->res_list = NULL;
596ae8c6e27Sflorian tube->res_last = NULL;
597ae8c6e27Sflorian while(p) {
598ae8c6e27Sflorian np = p->next;
599ae8c6e27Sflorian free(p->buf);
600ae8c6e27Sflorian free(p);
601ae8c6e27Sflorian p = np;
602ae8c6e27Sflorian }
603ae8c6e27Sflorian }
604ae8c6e27Sflorian }
605ae8c6e27Sflorian
tube_write_msg(struct tube * tube,uint8_t * buf,uint32_t len,int ATTR_UNUSED (nonblock))606ae8c6e27Sflorian int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
607ae8c6e27Sflorian int ATTR_UNUSED(nonblock))
608ae8c6e27Sflorian {
609ae8c6e27Sflorian uint8_t* a;
610ae8c6e27Sflorian verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
611ae8c6e27Sflorian a = (uint8_t*)memdup(buf, len);
612ae8c6e27Sflorian if(!a) {
613ae8c6e27Sflorian log_err("out of memory in tube_write_msg");
614ae8c6e27Sflorian return 0;
615ae8c6e27Sflorian }
616ae8c6e27Sflorian /* always nonblocking, this pipe cannot get full */
617ae8c6e27Sflorian return tube_queue_item(tube, a, len);
618ae8c6e27Sflorian }
619ae8c6e27Sflorian
tube_read_msg(struct tube * tube,uint8_t ** buf,uint32_t * len,int nonblock)620ae8c6e27Sflorian int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
621ae8c6e27Sflorian int nonblock)
622ae8c6e27Sflorian {
623ae8c6e27Sflorian struct tube_res_list* item = NULL;
624ae8c6e27Sflorian verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
625ae8c6e27Sflorian *buf = NULL;
626ae8c6e27Sflorian if(!tube_poll(tube)) {
627ae8c6e27Sflorian verbose(VERB_ALGO, "tube read_msg nodata");
628ae8c6e27Sflorian /* nothing ready right now, wait if we want to */
629ae8c6e27Sflorian if(nonblock)
630ae8c6e27Sflorian return -1; /* would block waiting for items */
631ae8c6e27Sflorian if(!tube_wait(tube))
632ae8c6e27Sflorian return 0;
633ae8c6e27Sflorian }
634ae8c6e27Sflorian lock_basic_lock(&tube->res_lock);
635ae8c6e27Sflorian if(tube->res_list) {
636ae8c6e27Sflorian item = tube->res_list;
637ae8c6e27Sflorian tube->res_list = item->next;
638ae8c6e27Sflorian if(tube->res_last == item) {
639ae8c6e27Sflorian /* the list is now empty */
640ae8c6e27Sflorian tube->res_last = NULL;
641ae8c6e27Sflorian verbose(VERB_ALGO, "tube read_msg lastdata");
642ae8c6e27Sflorian if(!WSAResetEvent(tube->event)) {
643ae8c6e27Sflorian log_err("WSAResetEvent: %s",
644ae8c6e27Sflorian wsa_strerror(WSAGetLastError()));
645ae8c6e27Sflorian }
646ae8c6e27Sflorian }
647ae8c6e27Sflorian }
648ae8c6e27Sflorian lock_basic_unlock(&tube->res_lock);
649ae8c6e27Sflorian if(!item)
650ae8c6e27Sflorian return 0; /* would block waiting for items */
651ae8c6e27Sflorian *buf = item->buf;
652ae8c6e27Sflorian *len = item->len;
653ae8c6e27Sflorian free(item);
654ae8c6e27Sflorian verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
655ae8c6e27Sflorian return 1;
656ae8c6e27Sflorian }
657ae8c6e27Sflorian
tube_poll(struct tube * tube)658ae8c6e27Sflorian int tube_poll(struct tube* tube)
659ae8c6e27Sflorian {
660ae8c6e27Sflorian struct tube_res_list* item = NULL;
661ae8c6e27Sflorian lock_basic_lock(&tube->res_lock);
662ae8c6e27Sflorian item = tube->res_list;
663ae8c6e27Sflorian lock_basic_unlock(&tube->res_lock);
664ae8c6e27Sflorian if(item)
665ae8c6e27Sflorian return 1;
666ae8c6e27Sflorian return 0;
667ae8c6e27Sflorian }
668ae8c6e27Sflorian
tube_wait(struct tube * tube)669ae8c6e27Sflorian int tube_wait(struct tube* tube)
670ae8c6e27Sflorian {
671ae8c6e27Sflorian /* block on eventhandle */
672ae8c6e27Sflorian DWORD res = WSAWaitForMultipleEvents(
673ae8c6e27Sflorian 1 /* one event in array */,
674ae8c6e27Sflorian &tube->event /* the event to wait for, our pipe signal */,
675ae8c6e27Sflorian 0 /* wait for all events is false */,
676ae8c6e27Sflorian WSA_INFINITE /* wait, no timeout */,
677ae8c6e27Sflorian 0 /* we are not alertable for IO completion routines */
678ae8c6e27Sflorian );
679ae8c6e27Sflorian if(res == WSA_WAIT_TIMEOUT) {
680ae8c6e27Sflorian return 0;
681ae8c6e27Sflorian }
682ae8c6e27Sflorian if(res == WSA_WAIT_IO_COMPLETION) {
683ae8c6e27Sflorian /* a bit unexpected, since we were not alertable */
684ae8c6e27Sflorian return 0;
685ae8c6e27Sflorian }
686ae8c6e27Sflorian return 1;
687ae8c6e27Sflorian }
688ae8c6e27Sflorian
tube_wait_timeout(struct tube * tube,int msec)6895c45b740Sflorian int tube_wait_timeout(struct tube* tube, int msec)
6905c45b740Sflorian {
6915c45b740Sflorian /* block on eventhandle */
6925c45b740Sflorian DWORD res = WSAWaitForMultipleEvents(
6935c45b740Sflorian 1 /* one event in array */,
6945c45b740Sflorian &tube->event /* the event to wait for, our pipe signal */,
6955c45b740Sflorian 0 /* wait for all events is false */,
6965c45b740Sflorian msec /* wait for timeout */,
6975c45b740Sflorian 0 /* we are not alertable for IO completion routines */
6985c45b740Sflorian );
6995c45b740Sflorian if(res == WSA_WAIT_TIMEOUT) {
7005c45b740Sflorian return 0;
7015c45b740Sflorian }
7025c45b740Sflorian if(res == WSA_WAIT_IO_COMPLETION) {
7035c45b740Sflorian /* a bit unexpected, since we were not alertable */
7045c45b740Sflorian return -1;
7055c45b740Sflorian }
7065c45b740Sflorian return 1;
7075c45b740Sflorian }
7085c45b740Sflorian
tube_read_fd(struct tube * ATTR_UNUSED (tube))709ae8c6e27Sflorian int tube_read_fd(struct tube* ATTR_UNUSED(tube))
710ae8c6e27Sflorian {
711ae8c6e27Sflorian /* nothing sensible on Windows */
712ae8c6e27Sflorian return -1;
713ae8c6e27Sflorian }
714ae8c6e27Sflorian
715ae8c6e27Sflorian int
tube_handle_listen(struct comm_point * ATTR_UNUSED (c),void * ATTR_UNUSED (arg),int ATTR_UNUSED (error),struct comm_reply * ATTR_UNUSED (reply_info))716ae8c6e27Sflorian tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
717ae8c6e27Sflorian int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
718ae8c6e27Sflorian {
719ae8c6e27Sflorian log_assert(0);
720ae8c6e27Sflorian return 0;
721ae8c6e27Sflorian }
722ae8c6e27Sflorian
723ae8c6e27Sflorian int
tube_handle_write(struct comm_point * ATTR_UNUSED (c),void * ATTR_UNUSED (arg),int ATTR_UNUSED (error),struct comm_reply * ATTR_UNUSED (reply_info))724ae8c6e27Sflorian tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
725ae8c6e27Sflorian int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
726ae8c6e27Sflorian {
727ae8c6e27Sflorian log_assert(0);
728ae8c6e27Sflorian return 0;
729ae8c6e27Sflorian }
730ae8c6e27Sflorian
tube_setup_bg_listen(struct tube * tube,struct comm_base * base,tube_callback_type * cb,void * arg)731ae8c6e27Sflorian int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
732ae8c6e27Sflorian tube_callback_type* cb, void* arg)
733ae8c6e27Sflorian {
734ae8c6e27Sflorian tube->listen_cb = cb;
735ae8c6e27Sflorian tube->listen_arg = arg;
736ae8c6e27Sflorian if(!comm_base_internal(base))
737ae8c6e27Sflorian return 1; /* ignore when no comm base - testing */
738ae8c6e27Sflorian tube->ev_listen = ub_winsock_register_wsaevent(
739ae8c6e27Sflorian comm_base_internal(base), tube->event, &tube_handle_signal, tube);
740ae8c6e27Sflorian return tube->ev_listen ? 1 : 0;
741ae8c6e27Sflorian }
742ae8c6e27Sflorian
tube_setup_bg_write(struct tube * ATTR_UNUSED (tube),struct comm_base * ATTR_UNUSED (base))743ae8c6e27Sflorian int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
744ae8c6e27Sflorian struct comm_base* ATTR_UNUSED(base))
745ae8c6e27Sflorian {
746ae8c6e27Sflorian /* the queue item routine performs the signaling */
747ae8c6e27Sflorian return 1;
748ae8c6e27Sflorian }
749ae8c6e27Sflorian
tube_queue_item(struct tube * tube,uint8_t * msg,size_t len)750ae8c6e27Sflorian int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
751ae8c6e27Sflorian {
752ae8c6e27Sflorian struct tube_res_list* item;
753ae8c6e27Sflorian if(!tube) return 0;
754ae8c6e27Sflorian item = (struct tube_res_list*)malloc(sizeof(*item));
755ae8c6e27Sflorian verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
756ae8c6e27Sflorian if(!item) {
757ae8c6e27Sflorian free(msg);
758ae8c6e27Sflorian log_err("out of memory for async answer");
759ae8c6e27Sflorian return 0;
760ae8c6e27Sflorian }
761ae8c6e27Sflorian item->buf = msg;
762ae8c6e27Sflorian item->len = len;
763ae8c6e27Sflorian item->next = NULL;
764ae8c6e27Sflorian lock_basic_lock(&tube->res_lock);
765ae8c6e27Sflorian /* add at back of list, since the first one may be partially written */
766ae8c6e27Sflorian if(tube->res_last)
767ae8c6e27Sflorian tube->res_last->next = item;
768ae8c6e27Sflorian else tube->res_list = item;
769ae8c6e27Sflorian tube->res_last = item;
770ae8c6e27Sflorian /* signal the eventhandle */
771ae8c6e27Sflorian if(!WSASetEvent(tube->event)) {
772ae8c6e27Sflorian log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
773ae8c6e27Sflorian }
774ae8c6e27Sflorian lock_basic_unlock(&tube->res_lock);
775ae8c6e27Sflorian return 1;
776ae8c6e27Sflorian }
777ae8c6e27Sflorian
tube_handle_signal(int ATTR_UNUSED (fd),short ATTR_UNUSED (events),void * arg)778ae8c6e27Sflorian void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
779ae8c6e27Sflorian void* arg)
780ae8c6e27Sflorian {
781ae8c6e27Sflorian struct tube* tube = (struct tube*)arg;
782ae8c6e27Sflorian uint8_t* buf;
783ae8c6e27Sflorian uint32_t len = 0;
784ae8c6e27Sflorian verbose(VERB_ALGO, "tube handle_signal");
785ae8c6e27Sflorian while(tube_poll(tube)) {
786ae8c6e27Sflorian if(tube_read_msg(tube, &buf, &len, 1)) {
787ae8c6e27Sflorian fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
788ae8c6e27Sflorian (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR,
789ae8c6e27Sflorian tube->listen_arg);
790ae8c6e27Sflorian }
791ae8c6e27Sflorian }
792ae8c6e27Sflorian }
793ae8c6e27Sflorian
794ae8c6e27Sflorian #endif /* USE_WINSOCK */
795