13b6c3722Schristos /*
23b6c3722Schristos * util/tube.c - pipe service
33b6c3722Schristos *
43b6c3722Schristos * Copyright (c) 2008, NLnet Labs. All rights reserved.
53b6c3722Schristos *
63b6c3722Schristos * This software is open source.
73b6c3722Schristos *
83b6c3722Schristos * Redistribution and use in source and binary forms, with or without
93b6c3722Schristos * modification, are permitted provided that the following conditions
103b6c3722Schristos * are met:
113b6c3722Schristos *
123b6c3722Schristos * Redistributions of source code must retain the above copyright notice,
133b6c3722Schristos * this list of conditions and the following disclaimer.
143b6c3722Schristos *
153b6c3722Schristos * Redistributions in binary form must reproduce the above copyright notice,
163b6c3722Schristos * this list of conditions and the following disclaimer in the documentation
173b6c3722Schristos * and/or other materials provided with the distribution.
183b6c3722Schristos *
193b6c3722Schristos * Neither the name of the NLNET LABS nor the names of its contributors may
203b6c3722Schristos * be used to endorse or promote products derived from this software without
213b6c3722Schristos * specific prior written permission.
223b6c3722Schristos *
233b6c3722Schristos * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
243b6c3722Schristos * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
253b6c3722Schristos * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
263b6c3722Schristos * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
273b6c3722Schristos * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
283b6c3722Schristos * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
293b6c3722Schristos * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
303b6c3722Schristos * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
313b6c3722Schristos * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
323b6c3722Schristos * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
333b6c3722Schristos * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
343b6c3722Schristos */
353b6c3722Schristos
363b6c3722Schristos /**
373b6c3722Schristos * \file
383b6c3722Schristos *
393b6c3722Schristos * This file contains pipe service functions.
403b6c3722Schristos */
413b6c3722Schristos #include "config.h"
423b6c3722Schristos #include "util/tube.h"
433b6c3722Schristos #include "util/log.h"
443b6c3722Schristos #include "util/net_help.h"
453b6c3722Schristos #include "util/netevent.h"
463b6c3722Schristos #include "util/fptr_wlist.h"
473b6c3722Schristos #include "util/ub_event.h"
48*91f7d55fSchristos #ifdef HAVE_POLL_H
49*91f7d55fSchristos #include <poll.h>
50*91f7d55fSchristos #endif
513b6c3722Schristos
523b6c3722Schristos #ifndef USE_WINSOCK
533b6c3722Schristos /* on unix */
543b6c3722Schristos
553b6c3722Schristos #ifndef HAVE_SOCKETPAIR
563b6c3722Schristos /** no socketpair() available, like on Minix 3.1.7, use pipe */
573b6c3722Schristos #define socketpair(f, t, p, sv) pipe(sv)
583b6c3722Schristos #endif /* HAVE_SOCKETPAIR */
593b6c3722Schristos
tube_create(void)603b6c3722Schristos struct tube* tube_create(void)
613b6c3722Schristos {
623b6c3722Schristos struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
633b6c3722Schristos int sv[2];
643b6c3722Schristos if(!tube) {
653b6c3722Schristos int err = errno;
663b6c3722Schristos log_err("tube_create: out of memory");
673b6c3722Schristos errno = err;
683b6c3722Schristos return NULL;
693b6c3722Schristos }
703b6c3722Schristos tube->sr = -1;
713b6c3722Schristos tube->sw = -1;
723b6c3722Schristos if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
733b6c3722Schristos int err = errno;
743b6c3722Schristos log_err("socketpair: %s", strerror(errno));
753b6c3722Schristos free(tube);
763b6c3722Schristos errno = err;
773b6c3722Schristos return NULL;
783b6c3722Schristos }
793b6c3722Schristos tube->sr = sv[0];
803b6c3722Schristos tube->sw = sv[1];
813b6c3722Schristos if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
823b6c3722Schristos int err = errno;
833b6c3722Schristos log_err("tube: cannot set nonblocking");
843b6c3722Schristos tube_delete(tube);
853b6c3722Schristos errno = err;
863b6c3722Schristos return NULL;
873b6c3722Schristos }
883b6c3722Schristos return tube;
893b6c3722Schristos }
903b6c3722Schristos
tube_delete(struct tube * tube)913b6c3722Schristos void tube_delete(struct tube* tube)
923b6c3722Schristos {
933b6c3722Schristos if(!tube) return;
943b6c3722Schristos tube_remove_bg_listen(tube);
953b6c3722Schristos tube_remove_bg_write(tube);
963b6c3722Schristos /* close fds after deleting commpoints, to be sure.
973b6c3722Schristos * Also epoll does not like closing fd before event_del */
983b6c3722Schristos tube_close_read(tube);
993b6c3722Schristos tube_close_write(tube);
1003b6c3722Schristos free(tube);
1013b6c3722Schristos }
1023b6c3722Schristos
tube_close_read(struct tube * tube)1033b6c3722Schristos void tube_close_read(struct tube* tube)
1043b6c3722Schristos {
1053b6c3722Schristos if(tube->sr != -1) {
1063b6c3722Schristos close(tube->sr);
1073b6c3722Schristos tube->sr = -1;
1083b6c3722Schristos }
1093b6c3722Schristos }
1103b6c3722Schristos
tube_close_write(struct tube * tube)1113b6c3722Schristos void tube_close_write(struct tube* tube)
1123b6c3722Schristos {
1133b6c3722Schristos if(tube->sw != -1) {
1143b6c3722Schristos close(tube->sw);
1153b6c3722Schristos tube->sw = -1;
1163b6c3722Schristos }
1173b6c3722Schristos }
1183b6c3722Schristos
tube_remove_bg_listen(struct tube * tube)1193b6c3722Schristos void tube_remove_bg_listen(struct tube* tube)
1203b6c3722Schristos {
1213b6c3722Schristos if(tube->listen_com) {
1223b6c3722Schristos comm_point_delete(tube->listen_com);
1233b6c3722Schristos tube->listen_com = NULL;
1243b6c3722Schristos }
1253b6c3722Schristos free(tube->cmd_msg);
1263b6c3722Schristos tube->cmd_msg = NULL;
1273b6c3722Schristos }
1283b6c3722Schristos
tube_remove_bg_write(struct tube * tube)1293b6c3722Schristos void tube_remove_bg_write(struct tube* tube)
1303b6c3722Schristos {
1313b6c3722Schristos if(tube->res_com) {
1323b6c3722Schristos comm_point_delete(tube->res_com);
1333b6c3722Schristos tube->res_com = NULL;
1343b6c3722Schristos }
1353b6c3722Schristos if(tube->res_list) {
1363b6c3722Schristos struct tube_res_list* np, *p = tube->res_list;
1373b6c3722Schristos tube->res_list = NULL;
1383b6c3722Schristos tube->res_last = NULL;
1393b6c3722Schristos while(p) {
1403b6c3722Schristos np = p->next;
1413b6c3722Schristos free(p->buf);
1423b6c3722Schristos free(p);
1433b6c3722Schristos p = np;
1443b6c3722Schristos }
1453b6c3722Schristos }
1463b6c3722Schristos }
1473b6c3722Schristos
1483b6c3722Schristos int
tube_handle_listen(struct comm_point * c,void * arg,int error,struct comm_reply * ATTR_UNUSED (reply_info))1493b6c3722Schristos tube_handle_listen(struct comm_point* c, void* arg, int error,
1503b6c3722Schristos struct comm_reply* ATTR_UNUSED(reply_info))
1513b6c3722Schristos {
1523b6c3722Schristos struct tube* tube = (struct tube*)arg;
1533b6c3722Schristos ssize_t r;
1543b6c3722Schristos if(error != NETEVENT_NOERROR) {
1553b6c3722Schristos fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
1563b6c3722Schristos (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
1573b6c3722Schristos return 0;
1583b6c3722Schristos }
1593b6c3722Schristos
1603b6c3722Schristos if(tube->cmd_read < sizeof(tube->cmd_len)) {
1613b6c3722Schristos /* complete reading the length of control msg */
1623b6c3722Schristos r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
1633b6c3722Schristos sizeof(tube->cmd_len) - tube->cmd_read);
1643b6c3722Schristos if(r==0) {
1653b6c3722Schristos /* error has happened or */
1663b6c3722Schristos /* parent closed pipe, must have exited somehow */
1673b6c3722Schristos fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
1683b6c3722Schristos (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
1693b6c3722Schristos tube->listen_arg);
1703b6c3722Schristos return 0;
1713b6c3722Schristos }
1723b6c3722Schristos if(r==-1) {
1733b6c3722Schristos if(errno != EAGAIN && errno != EINTR) {
1743b6c3722Schristos log_err("rpipe error: %s", strerror(errno));
1753b6c3722Schristos }
1763b6c3722Schristos /* nothing to read now, try later */
1773b6c3722Schristos return 0;
1783b6c3722Schristos }
1793b6c3722Schristos tube->cmd_read += r;
1803b6c3722Schristos if(tube->cmd_read < sizeof(tube->cmd_len)) {
1813b6c3722Schristos /* not complete, try later */
1823b6c3722Schristos return 0;
1833b6c3722Schristos }
1843b6c3722Schristos tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
1853b6c3722Schristos if(!tube->cmd_msg) {
1863b6c3722Schristos log_err("malloc failure");
1873b6c3722Schristos tube->cmd_read = 0;
1883b6c3722Schristos return 0;
1893b6c3722Schristos }
1903b6c3722Schristos }
1913b6c3722Schristos /* cmd_len has been read, read remainder */
1923b6c3722Schristos r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
1933b6c3722Schristos tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
1943b6c3722Schristos if(r==0) {
1953b6c3722Schristos /* error has happened or */
1963b6c3722Schristos /* parent closed pipe, must have exited somehow */
1973b6c3722Schristos fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
1983b6c3722Schristos (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
1993b6c3722Schristos tube->listen_arg);
2003b6c3722Schristos return 0;
2013b6c3722Schristos }
2023b6c3722Schristos if(r==-1) {
2033b6c3722Schristos /* nothing to read now, try later */
2043b6c3722Schristos if(errno != EAGAIN && errno != EINTR) {
2053b6c3722Schristos log_err("rpipe error: %s", strerror(errno));
2063b6c3722Schristos }
2073b6c3722Schristos return 0;
2083b6c3722Schristos }
2093b6c3722Schristos tube->cmd_read += r;
2103b6c3722Schristos if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
2113b6c3722Schristos /* not complete, try later */
2123b6c3722Schristos return 0;
2133b6c3722Schristos }
2143b6c3722Schristos tube->cmd_read = 0;
2153b6c3722Schristos
2163b6c3722Schristos fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
2173b6c3722Schristos (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
2183b6c3722Schristos NETEVENT_NOERROR, tube->listen_arg);
2193b6c3722Schristos /* also frees the buf */
2203b6c3722Schristos tube->cmd_msg = NULL;
2213b6c3722Schristos return 0;
2223b6c3722Schristos }
2233b6c3722Schristos
2243b6c3722Schristos int
tube_handle_write(struct comm_point * c,void * arg,int error,struct comm_reply * ATTR_UNUSED (reply_info))2253b6c3722Schristos tube_handle_write(struct comm_point* c, void* arg, int error,
2263b6c3722Schristos struct comm_reply* ATTR_UNUSED(reply_info))
2273b6c3722Schristos {
2283b6c3722Schristos struct tube* tube = (struct tube*)arg;
2293b6c3722Schristos struct tube_res_list* item = tube->res_list;
2303b6c3722Schristos ssize_t r;
2313b6c3722Schristos if(error != NETEVENT_NOERROR) {
2323b6c3722Schristos log_err("tube_handle_write net error %d", error);
2333b6c3722Schristos return 0;
2343b6c3722Schristos }
2353b6c3722Schristos
2363b6c3722Schristos if(!item) {
2373b6c3722Schristos comm_point_stop_listening(c);
2383b6c3722Schristos return 0;
2393b6c3722Schristos }
2403b6c3722Schristos
2413b6c3722Schristos if(tube->res_write < sizeof(item->len)) {
2423b6c3722Schristos r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
2433b6c3722Schristos sizeof(item->len) - tube->res_write);
2443b6c3722Schristos if(r == -1) {
2453b6c3722Schristos if(errno != EAGAIN && errno != EINTR) {
2463b6c3722Schristos log_err("wpipe error: %s", strerror(errno));
2473b6c3722Schristos }
2483b6c3722Schristos return 0; /* try again later */
2493b6c3722Schristos }
2503b6c3722Schristos if(r == 0) {
2513b6c3722Schristos /* error on pipe, must have exited somehow */
2523b6c3722Schristos /* cannot signal this to pipe user */
2533b6c3722Schristos return 0;
2543b6c3722Schristos }
2553b6c3722Schristos tube->res_write += r;
2563b6c3722Schristos if(tube->res_write < sizeof(item->len))
2573b6c3722Schristos return 0;
2583b6c3722Schristos }
2593b6c3722Schristos r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
2603b6c3722Schristos item->len - (tube->res_write - sizeof(item->len)));
2613b6c3722Schristos if(r == -1) {
2623b6c3722Schristos if(errno != EAGAIN && errno != EINTR) {
2633b6c3722Schristos log_err("wpipe error: %s", strerror(errno));
2643b6c3722Schristos }
2653b6c3722Schristos return 0; /* try again later */
2663b6c3722Schristos }
2673b6c3722Schristos if(r == 0) {
2683b6c3722Schristos /* error on pipe, must have exited somehow */
2693b6c3722Schristos /* cannot signal this to pipe user */
2703b6c3722Schristos return 0;
2713b6c3722Schristos }
2723b6c3722Schristos tube->res_write += r;
2733b6c3722Schristos if(tube->res_write < sizeof(item->len) + item->len)
2743b6c3722Schristos return 0;
2753b6c3722Schristos /* done this result, remove it */
2763b6c3722Schristos free(item->buf);
2773b6c3722Schristos item->buf = NULL;
2783b6c3722Schristos tube->res_list = tube->res_list->next;
2793b6c3722Schristos free(item);
2803b6c3722Schristos if(!tube->res_list) {
2813b6c3722Schristos tube->res_last = NULL;
2823b6c3722Schristos comm_point_stop_listening(c);
2833b6c3722Schristos }
2843b6c3722Schristos tube->res_write = 0;
2853b6c3722Schristos return 0;
2863b6c3722Schristos }
2873b6c3722Schristos
tube_write_msg(struct tube * tube,uint8_t * buf,uint32_t len,int nonblock)2883b6c3722Schristos int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
2893b6c3722Schristos int nonblock)
2903b6c3722Schristos {
2913b6c3722Schristos ssize_t r, d;
2923b6c3722Schristos int fd = tube->sw;
2933b6c3722Schristos
2943b6c3722Schristos /* test */
2953b6c3722Schristos if(nonblock) {
2963b6c3722Schristos r = write(fd, &len, sizeof(len));
2973b6c3722Schristos if(r == -1) {
2983b6c3722Schristos if(errno==EINTR || errno==EAGAIN)
2993b6c3722Schristos return -1;
3003b6c3722Schristos log_err("tube msg write failed: %s", strerror(errno));
3013b6c3722Schristos return -1; /* can still continue, perhaps */
3023b6c3722Schristos }
3033b6c3722Schristos } else r = 0;
3043b6c3722Schristos if(!fd_set_block(fd))
3053b6c3722Schristos return 0;
3063b6c3722Schristos /* write remainder */
3073b6c3722Schristos d = r;
3083b6c3722Schristos while(d != (ssize_t)sizeof(len)) {
3093b6c3722Schristos if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
3103b6c3722Schristos if(errno == EAGAIN)
3113b6c3722Schristos continue; /* temporarily unavail: try again*/
3123b6c3722Schristos log_err("tube msg write failed: %s", strerror(errno));
3133b6c3722Schristos (void)fd_set_nonblock(fd);
3143b6c3722Schristos return 0;
3153b6c3722Schristos }
3163b6c3722Schristos d += r;
3173b6c3722Schristos }
3183b6c3722Schristos d = 0;
3193b6c3722Schristos while(d != (ssize_t)len) {
3203b6c3722Schristos if((r=write(fd, buf+d, len-d)) == -1) {
3213b6c3722Schristos if(errno == EAGAIN)
3223b6c3722Schristos continue; /* temporarily unavail: try again*/
3233b6c3722Schristos log_err("tube msg write failed: %s", strerror(errno));
3243b6c3722Schristos (void)fd_set_nonblock(fd);
3253b6c3722Schristos return 0;
3263b6c3722Schristos }
3273b6c3722Schristos d += r;
3283b6c3722Schristos }
3293b6c3722Schristos if(!fd_set_nonblock(fd))
3303b6c3722Schristos return 0;
3313b6c3722Schristos return 1;
3323b6c3722Schristos }
3333b6c3722Schristos
tube_read_msg(struct tube * tube,uint8_t ** buf,uint32_t * len,int nonblock)3343b6c3722Schristos int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
3353b6c3722Schristos int nonblock)
3363b6c3722Schristos {
3373b6c3722Schristos ssize_t r, d;
3383b6c3722Schristos int fd = tube->sr;
3393b6c3722Schristos
3403b6c3722Schristos /* test */
3413b6c3722Schristos *len = 0;
3423b6c3722Schristos if(nonblock) {
3433b6c3722Schristos r = read(fd, len, sizeof(*len));
3443b6c3722Schristos if(r == -1) {
3453b6c3722Schristos if(errno==EINTR || errno==EAGAIN)
3463b6c3722Schristos return -1;
3473b6c3722Schristos log_err("tube msg read failed: %s", strerror(errno));
3483b6c3722Schristos return -1; /* we can still continue, perhaps */
3493b6c3722Schristos }
3503b6c3722Schristos if(r == 0) /* EOF */
3513b6c3722Schristos return 0;
3523b6c3722Schristos } else r = 0;
3533b6c3722Schristos if(!fd_set_block(fd))
3543b6c3722Schristos return 0;
3553b6c3722Schristos /* read remainder */
3563b6c3722Schristos d = r;
3573b6c3722Schristos while(d != (ssize_t)sizeof(*len)) {
3583b6c3722Schristos if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
3593b6c3722Schristos log_err("tube msg read failed: %s", strerror(errno));
3603b6c3722Schristos (void)fd_set_nonblock(fd);
3613b6c3722Schristos return 0;
3623b6c3722Schristos }
3633b6c3722Schristos if(r == 0) /* EOF */ {
3643b6c3722Schristos (void)fd_set_nonblock(fd);
3653b6c3722Schristos return 0;
3663b6c3722Schristos }
3673b6c3722Schristos d += r;
3683b6c3722Schristos }
3697a540f2bSchristos if (*len >= 65536*2) {
3707a540f2bSchristos log_err("tube msg length %u is too big", (unsigned)*len);
3717a540f2bSchristos (void)fd_set_nonblock(fd);
3727a540f2bSchristos return 0;
3737a540f2bSchristos }
3743b6c3722Schristos *buf = (uint8_t*)malloc(*len);
3753b6c3722Schristos if(!*buf) {
3763b6c3722Schristos log_err("tube read out of memory");
3773b6c3722Schristos (void)fd_set_nonblock(fd);
3783b6c3722Schristos return 0;
3793b6c3722Schristos }
3803b6c3722Schristos d = 0;
3813b6c3722Schristos while(d < (ssize_t)*len) {
3823b6c3722Schristos if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
3833b6c3722Schristos log_err("tube msg read failed: %s", strerror(errno));
3843b6c3722Schristos (void)fd_set_nonblock(fd);
3853b6c3722Schristos free(*buf);
3863b6c3722Schristos return 0;
3873b6c3722Schristos }
3883b6c3722Schristos if(r == 0) { /* EOF */
3893b6c3722Schristos (void)fd_set_nonblock(fd);
3903b6c3722Schristos free(*buf);
3913b6c3722Schristos return 0;
3923b6c3722Schristos }
3933b6c3722Schristos d += r;
3943b6c3722Schristos }
3953b6c3722Schristos if(!fd_set_nonblock(fd)) {
3963b6c3722Schristos free(*buf);
3973b6c3722Schristos return 0;
3983b6c3722Schristos }
3993b6c3722Schristos return 1;
4003b6c3722Schristos }
4013b6c3722Schristos
402*91f7d55fSchristos /** perform poll() on the fd */
4033b6c3722Schristos static int
pollit(int fd,struct timeval * t)4043b6c3722Schristos pollit(int fd, struct timeval* t)
4053b6c3722Schristos {
406*91f7d55fSchristos struct pollfd fds;
407*91f7d55fSchristos int pret;
408*91f7d55fSchristos int msec = -1;
409*91f7d55fSchristos memset(&fds, 0, sizeof(fds));
410*91f7d55fSchristos fds.fd = fd;
411*91f7d55fSchristos fds.events = POLLIN | POLLERR | POLLHUP;
4123b6c3722Schristos #ifndef S_SPLINT_S
413*91f7d55fSchristos if(t)
414*91f7d55fSchristos msec = t->tv_sec*1000 + t->tv_usec/1000;
4153b6c3722Schristos #endif
416*91f7d55fSchristos
417*91f7d55fSchristos pret = poll(&fds, 1, msec);
418*91f7d55fSchristos
419*91f7d55fSchristos if(pret == -1)
4203b6c3722Schristos return 0;
421*91f7d55fSchristos if(pret != 0)
422*91f7d55fSchristos return 1;
423*91f7d55fSchristos return 0;
4243b6c3722Schristos }
4253b6c3722Schristos
tube_poll(struct tube * tube)4263b6c3722Schristos int tube_poll(struct tube* tube)
4273b6c3722Schristos {
4283b6c3722Schristos struct timeval t;
4293b6c3722Schristos memset(&t, 0, sizeof(t));
4303b6c3722Schristos return pollit(tube->sr, &t);
4313b6c3722Schristos }
4323b6c3722Schristos
tube_wait(struct tube * tube)4333b6c3722Schristos int tube_wait(struct tube* tube)
4343b6c3722Schristos {
4353b6c3722Schristos return pollit(tube->sr, NULL);
4363b6c3722Schristos }
4373b6c3722Schristos
tube_wait_timeout(struct tube * tube,int msec)438*91f7d55fSchristos int tube_wait_timeout(struct tube* tube, int msec)
439*91f7d55fSchristos {
440*91f7d55fSchristos int ret = 0;
441*91f7d55fSchristos
442*91f7d55fSchristos while(1) {
443*91f7d55fSchristos struct pollfd fds;
444*91f7d55fSchristos memset(&fds, 0, sizeof(fds));
445*91f7d55fSchristos
446*91f7d55fSchristos fds.fd = tube->sr;
447*91f7d55fSchristos fds.events = POLLIN | POLLERR | POLLHUP;
448*91f7d55fSchristos ret = poll(&fds, 1, msec);
449*91f7d55fSchristos
450*91f7d55fSchristos if(ret == -1) {
451*91f7d55fSchristos if(errno == EAGAIN || errno == EINTR)
452*91f7d55fSchristos continue;
453*91f7d55fSchristos return -1;
454*91f7d55fSchristos }
455*91f7d55fSchristos break;
456*91f7d55fSchristos }
457*91f7d55fSchristos
458*91f7d55fSchristos if(ret != 0)
459*91f7d55fSchristos return 1;
460*91f7d55fSchristos return 0;
461*91f7d55fSchristos }
462*91f7d55fSchristos
tube_read_fd(struct tube * tube)4633b6c3722Schristos int tube_read_fd(struct tube* tube)
4643b6c3722Schristos {
4653b6c3722Schristos return tube->sr;
4663b6c3722Schristos }
4673b6c3722Schristos
tube_setup_bg_listen(struct tube * tube,struct comm_base * base,tube_callback_type * cb,void * arg)4683b6c3722Schristos int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
4690cd9f4ecSchristos tube_callback_type* cb, void* arg)
4703b6c3722Schristos {
4713b6c3722Schristos tube->listen_cb = cb;
4723b6c3722Schristos tube->listen_arg = arg;
4733b6c3722Schristos if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
4743b6c3722Schristos 0, tube_handle_listen, tube))) {
4753b6c3722Schristos int err = errno;
4763b6c3722Schristos log_err("tube_setup_bg_l: commpoint creation failed");
4773b6c3722Schristos errno = err;
4783b6c3722Schristos return 0;
4793b6c3722Schristos }
4803b6c3722Schristos return 1;
4813b6c3722Schristos }
4823b6c3722Schristos
tube_setup_bg_write(struct tube * tube,struct comm_base * base)4833b6c3722Schristos int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
4843b6c3722Schristos {
4853b6c3722Schristos if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
4863b6c3722Schristos 1, tube_handle_write, tube))) {
4873b6c3722Schristos int err = errno;
4883b6c3722Schristos log_err("tube_setup_bg_w: commpoint creation failed");
4893b6c3722Schristos errno = err;
4903b6c3722Schristos return 0;
4913b6c3722Schristos }
4923b6c3722Schristos return 1;
4933b6c3722Schristos }
4943b6c3722Schristos
tube_queue_item(struct tube * tube,uint8_t * msg,size_t len)4953b6c3722Schristos int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
4963b6c3722Schristos {
4977cd94d69Schristos struct tube_res_list* item;
4987cd94d69Schristos if(!tube || !tube->res_com) return 0;
4997cd94d69Schristos item = (struct tube_res_list*)malloc(sizeof(*item));
5003b6c3722Schristos if(!item) {
5013b6c3722Schristos free(msg);
5023b6c3722Schristos log_err("out of memory for async answer");
5033b6c3722Schristos return 0;
5043b6c3722Schristos }
5053b6c3722Schristos item->buf = msg;
5063b6c3722Schristos item->len = len;
5073b6c3722Schristos item->next = NULL;
5083b6c3722Schristos /* add at back of list, since the first one may be partially written */
5093b6c3722Schristos if(tube->res_last)
5103b6c3722Schristos tube->res_last->next = item;
5113b6c3722Schristos else tube->res_list = item;
5123b6c3722Schristos tube->res_last = item;
5133b6c3722Schristos if(tube->res_list == tube->res_last) {
5143b6c3722Schristos /* first added item, start the write process */
5153b6c3722Schristos comm_point_start_listening(tube->res_com, -1, -1);
5163b6c3722Schristos }
5173b6c3722Schristos return 1;
5183b6c3722Schristos }
5193b6c3722Schristos
tube_handle_signal(int ATTR_UNUSED (fd),short ATTR_UNUSED (events),void * ATTR_UNUSED (arg))5203b6c3722Schristos void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
5213b6c3722Schristos void* ATTR_UNUSED(arg))
5223b6c3722Schristos {
5233b6c3722Schristos log_assert(0);
5243b6c3722Schristos }
5253b6c3722Schristos
5263b6c3722Schristos #else /* USE_WINSOCK */
5273b6c3722Schristos /* on windows */
5283b6c3722Schristos
5293b6c3722Schristos
tube_create(void)5303b6c3722Schristos struct tube* tube_create(void)
5313b6c3722Schristos {
5323b6c3722Schristos /* windows does not have forks like unix, so we only support
5333b6c3722Schristos * threads on windows. And thus the pipe need only connect
5343b6c3722Schristos * threads. We use a mutex and a list of datagrams. */
5353b6c3722Schristos struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
5363b6c3722Schristos if(!tube) {
5373b6c3722Schristos int err = errno;
5383b6c3722Schristos log_err("tube_create: out of memory");
5393b6c3722Schristos errno = err;
5403b6c3722Schristos return NULL;
5413b6c3722Schristos }
5423b6c3722Schristos tube->event = WSACreateEvent();
5433b6c3722Schristos if(tube->event == WSA_INVALID_EVENT) {
5443b6c3722Schristos free(tube);
5453b6c3722Schristos log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
546*91f7d55fSchristos return NULL;
5473b6c3722Schristos }
5483b6c3722Schristos if(!WSAResetEvent(tube->event)) {
5493b6c3722Schristos log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
5503b6c3722Schristos }
5513b6c3722Schristos lock_basic_init(&tube->res_lock);
5523b6c3722Schristos verbose(VERB_ALGO, "tube created");
5533b6c3722Schristos return tube;
5543b6c3722Schristos }
5553b6c3722Schristos
tube_delete(struct tube * tube)5563b6c3722Schristos void tube_delete(struct tube* tube)
5573b6c3722Schristos {
5583b6c3722Schristos if(!tube) return;
5593b6c3722Schristos tube_remove_bg_listen(tube);
5603b6c3722Schristos tube_remove_bg_write(tube);
5613b6c3722Schristos tube_close_read(tube);
5623b6c3722Schristos tube_close_write(tube);
5633b6c3722Schristos if(!WSACloseEvent(tube->event))
5643b6c3722Schristos log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
5653b6c3722Schristos lock_basic_destroy(&tube->res_lock);
5663b6c3722Schristos verbose(VERB_ALGO, "tube deleted");
5673b6c3722Schristos free(tube);
5683b6c3722Schristos }
5693b6c3722Schristos
tube_close_read(struct tube * ATTR_UNUSED (tube))5703b6c3722Schristos void tube_close_read(struct tube* ATTR_UNUSED(tube))
5713b6c3722Schristos {
5723b6c3722Schristos verbose(VERB_ALGO, "tube close_read");
5733b6c3722Schristos }
5743b6c3722Schristos
tube_close_write(struct tube * ATTR_UNUSED (tube))5753b6c3722Schristos void tube_close_write(struct tube* ATTR_UNUSED(tube))
5763b6c3722Schristos {
5773b6c3722Schristos verbose(VERB_ALGO, "tube close_write");
5783b6c3722Schristos /* wake up waiting reader with an empty queue */
5793b6c3722Schristos if(!WSASetEvent(tube->event)) {
5803b6c3722Schristos log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
5813b6c3722Schristos }
5823b6c3722Schristos }
5833b6c3722Schristos
tube_remove_bg_listen(struct tube * tube)5843b6c3722Schristos void tube_remove_bg_listen(struct tube* tube)
5853b6c3722Schristos {
5863b6c3722Schristos verbose(VERB_ALGO, "tube remove_bg_listen");
5873b6c3722Schristos ub_winsock_unregister_wsaevent(tube->ev_listen);
5883b6c3722Schristos }
5893b6c3722Schristos
tube_remove_bg_write(struct tube * tube)5903b6c3722Schristos void tube_remove_bg_write(struct tube* tube)
5913b6c3722Schristos {
5923b6c3722Schristos verbose(VERB_ALGO, "tube remove_bg_write");
5933b6c3722Schristos if(tube->res_list) {
5943b6c3722Schristos struct tube_res_list* np, *p = tube->res_list;
5953b6c3722Schristos tube->res_list = NULL;
5963b6c3722Schristos tube->res_last = NULL;
5973b6c3722Schristos while(p) {
5983b6c3722Schristos np = p->next;
5993b6c3722Schristos free(p->buf);
6003b6c3722Schristos free(p);
6013b6c3722Schristos p = np;
6023b6c3722Schristos }
6033b6c3722Schristos }
6043b6c3722Schristos }
6053b6c3722Schristos
tube_write_msg(struct tube * tube,uint8_t * buf,uint32_t len,int ATTR_UNUSED (nonblock))6063b6c3722Schristos int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
6073b6c3722Schristos int ATTR_UNUSED(nonblock))
6083b6c3722Schristos {
6093b6c3722Schristos uint8_t* a;
6103b6c3722Schristos verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
6113b6c3722Schristos a = (uint8_t*)memdup(buf, len);
6123b6c3722Schristos if(!a) {
6133b6c3722Schristos log_err("out of memory in tube_write_msg");
6143b6c3722Schristos return 0;
6153b6c3722Schristos }
6163b6c3722Schristos /* always nonblocking, this pipe cannot get full */
6173b6c3722Schristos return tube_queue_item(tube, a, len);
6183b6c3722Schristos }
6193b6c3722Schristos
tube_read_msg(struct tube * tube,uint8_t ** buf,uint32_t * len,int nonblock)6203b6c3722Schristos int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
6213b6c3722Schristos int nonblock)
6223b6c3722Schristos {
6233b6c3722Schristos struct tube_res_list* item = NULL;
6243b6c3722Schristos verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
6253b6c3722Schristos *buf = NULL;
6263b6c3722Schristos if(!tube_poll(tube)) {
6273b6c3722Schristos verbose(VERB_ALGO, "tube read_msg nodata");
6283b6c3722Schristos /* nothing ready right now, wait if we want to */
6293b6c3722Schristos if(nonblock)
6303b6c3722Schristos return -1; /* would block waiting for items */
6313b6c3722Schristos if(!tube_wait(tube))
6323b6c3722Schristos return 0;
6333b6c3722Schristos }
6343b6c3722Schristos lock_basic_lock(&tube->res_lock);
6353b6c3722Schristos if(tube->res_list) {
6363b6c3722Schristos item = tube->res_list;
6373b6c3722Schristos tube->res_list = item->next;
6383b6c3722Schristos if(tube->res_last == item) {
6393b6c3722Schristos /* the list is now empty */
6403b6c3722Schristos tube->res_last = NULL;
6413b6c3722Schristos verbose(VERB_ALGO, "tube read_msg lastdata");
6423b6c3722Schristos if(!WSAResetEvent(tube->event)) {
6433b6c3722Schristos log_err("WSAResetEvent: %s",
6443b6c3722Schristos wsa_strerror(WSAGetLastError()));
6453b6c3722Schristos }
6463b6c3722Schristos }
6473b6c3722Schristos }
6483b6c3722Schristos lock_basic_unlock(&tube->res_lock);
6493b6c3722Schristos if(!item)
6503b6c3722Schristos return 0; /* would block waiting for items */
6513b6c3722Schristos *buf = item->buf;
6523b6c3722Schristos *len = item->len;
6533b6c3722Schristos free(item);
6543b6c3722Schristos verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
6553b6c3722Schristos return 1;
6563b6c3722Schristos }
6573b6c3722Schristos
tube_poll(struct tube * tube)6583b6c3722Schristos int tube_poll(struct tube* tube)
6593b6c3722Schristos {
6603b6c3722Schristos struct tube_res_list* item = NULL;
6613b6c3722Schristos lock_basic_lock(&tube->res_lock);
6623b6c3722Schristos item = tube->res_list;
6633b6c3722Schristos lock_basic_unlock(&tube->res_lock);
6643b6c3722Schristos if(item)
6653b6c3722Schristos return 1;
6663b6c3722Schristos return 0;
6673b6c3722Schristos }
6683b6c3722Schristos
tube_wait(struct tube * tube)6693b6c3722Schristos int tube_wait(struct tube* tube)
6703b6c3722Schristos {
6713b6c3722Schristos /* block on eventhandle */
6723b6c3722Schristos DWORD res = WSAWaitForMultipleEvents(
6733b6c3722Schristos 1 /* one event in array */,
6743b6c3722Schristos &tube->event /* the event to wait for, our pipe signal */,
6753b6c3722Schristos 0 /* wait for all events is false */,
6763b6c3722Schristos WSA_INFINITE /* wait, no timeout */,
6773b6c3722Schristos 0 /* we are not alertable for IO completion routines */
6783b6c3722Schristos );
6793b6c3722Schristos if(res == WSA_WAIT_TIMEOUT) {
6803b6c3722Schristos return 0;
6813b6c3722Schristos }
6823b6c3722Schristos if(res == WSA_WAIT_IO_COMPLETION) {
6833b6c3722Schristos /* a bit unexpected, since we were not alertable */
6843b6c3722Schristos return 0;
6853b6c3722Schristos }
6863b6c3722Schristos return 1;
6873b6c3722Schristos }
6883b6c3722Schristos
tube_wait_timeout(struct tube * tube,int msec)689*91f7d55fSchristos int tube_wait_timeout(struct tube* tube, int msec)
690*91f7d55fSchristos {
691*91f7d55fSchristos /* block on eventhandle */
692*91f7d55fSchristos DWORD res = WSAWaitForMultipleEvents(
693*91f7d55fSchristos 1 /* one event in array */,
694*91f7d55fSchristos &tube->event /* the event to wait for, our pipe signal */,
695*91f7d55fSchristos 0 /* wait for all events is false */,
696*91f7d55fSchristos msec /* wait for timeout */,
697*91f7d55fSchristos 0 /* we are not alertable for IO completion routines */
698*91f7d55fSchristos );
699*91f7d55fSchristos if(res == WSA_WAIT_TIMEOUT) {
700*91f7d55fSchristos return 0;
701*91f7d55fSchristos }
702*91f7d55fSchristos if(res == WSA_WAIT_IO_COMPLETION) {
703*91f7d55fSchristos /* a bit unexpected, since we were not alertable */
704*91f7d55fSchristos return -1;
705*91f7d55fSchristos }
706*91f7d55fSchristos return 1;
707*91f7d55fSchristos }
708*91f7d55fSchristos
tube_read_fd(struct tube * ATTR_UNUSED (tube))7093b6c3722Schristos int tube_read_fd(struct tube* ATTR_UNUSED(tube))
7103b6c3722Schristos {
7113b6c3722Schristos /* nothing sensible on Windows */
7123b6c3722Schristos return -1;
7133b6c3722Schristos }
7143b6c3722Schristos
7153b6c3722Schristos int
tube_handle_listen(struct comm_point * ATTR_UNUSED (c),void * ATTR_UNUSED (arg),int ATTR_UNUSED (error),struct comm_reply * ATTR_UNUSED (reply_info))7163b6c3722Schristos tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
7173b6c3722Schristos int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
7183b6c3722Schristos {
7193b6c3722Schristos log_assert(0);
7203b6c3722Schristos return 0;
7213b6c3722Schristos }
7223b6c3722Schristos
7233b6c3722Schristos int
tube_handle_write(struct comm_point * ATTR_UNUSED (c),void * ATTR_UNUSED (arg),int ATTR_UNUSED (error),struct comm_reply * ATTR_UNUSED (reply_info))7243b6c3722Schristos tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
7253b6c3722Schristos int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
7263b6c3722Schristos {
7273b6c3722Schristos log_assert(0);
7283b6c3722Schristos return 0;
7293b6c3722Schristos }
7303b6c3722Schristos
tube_setup_bg_listen(struct tube * tube,struct comm_base * base,tube_callback_type * cb,void * arg)7313b6c3722Schristos int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
7320cd9f4ecSchristos tube_callback_type* cb, void* arg)
7333b6c3722Schristos {
7343b6c3722Schristos tube->listen_cb = cb;
7353b6c3722Schristos tube->listen_arg = arg;
7363b6c3722Schristos if(!comm_base_internal(base))
7373b6c3722Schristos return 1; /* ignore when no comm base - testing */
7383b6c3722Schristos tube->ev_listen = ub_winsock_register_wsaevent(
7393b6c3722Schristos comm_base_internal(base), tube->event, &tube_handle_signal, tube);
7403b6c3722Schristos return tube->ev_listen ? 1 : 0;
7413b6c3722Schristos }
7423b6c3722Schristos
tube_setup_bg_write(struct tube * ATTR_UNUSED (tube),struct comm_base * ATTR_UNUSED (base))7433b6c3722Schristos int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
7443b6c3722Schristos struct comm_base* ATTR_UNUSED(base))
7453b6c3722Schristos {
7463b6c3722Schristos /* the queue item routine performs the signaling */
7473b6c3722Schristos return 1;
7483b6c3722Schristos }
7493b6c3722Schristos
tube_queue_item(struct tube * tube,uint8_t * msg,size_t len)7503b6c3722Schristos int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
7513b6c3722Schristos {
7527cd94d69Schristos struct tube_res_list* item;
7537cd94d69Schristos if(!tube) return 0;
7547cd94d69Schristos item = (struct tube_res_list*)malloc(sizeof(*item));
7553b6c3722Schristos verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
7563b6c3722Schristos if(!item) {
7573b6c3722Schristos free(msg);
7583b6c3722Schristos log_err("out of memory for async answer");
7593b6c3722Schristos return 0;
7603b6c3722Schristos }
7613b6c3722Schristos item->buf = msg;
7623b6c3722Schristos item->len = len;
7633b6c3722Schristos item->next = NULL;
7643b6c3722Schristos lock_basic_lock(&tube->res_lock);
7653b6c3722Schristos /* add at back of list, since the first one may be partially written */
7663b6c3722Schristos if(tube->res_last)
7673b6c3722Schristos tube->res_last->next = item;
7683b6c3722Schristos else tube->res_list = item;
7693b6c3722Schristos tube->res_last = item;
7703b6c3722Schristos /* signal the eventhandle */
7713b6c3722Schristos if(!WSASetEvent(tube->event)) {
7723b6c3722Schristos log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
7733b6c3722Schristos }
7743b6c3722Schristos lock_basic_unlock(&tube->res_lock);
7753b6c3722Schristos return 1;
7763b6c3722Schristos }
7773b6c3722Schristos
tube_handle_signal(int ATTR_UNUSED (fd),short ATTR_UNUSED (events),void * arg)7783b6c3722Schristos void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
7793b6c3722Schristos void* arg)
7803b6c3722Schristos {
7813b6c3722Schristos struct tube* tube = (struct tube*)arg;
7823b6c3722Schristos uint8_t* buf;
7833b6c3722Schristos uint32_t len = 0;
7843b6c3722Schristos verbose(VERB_ALGO, "tube handle_signal");
7853b6c3722Schristos while(tube_poll(tube)) {
7863b6c3722Schristos if(tube_read_msg(tube, &buf, &len, 1)) {
7873b6c3722Schristos fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
7883b6c3722Schristos (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR,
7893b6c3722Schristos tube->listen_arg);
7903b6c3722Schristos }
7913b6c3722Schristos }
7923b6c3722Schristos }
7933b6c3722Schristos
7943b6c3722Schristos #endif /* USE_WINSOCK */
795