1*eabc0478Schristos /* $NetBSD: bufferevent_pair.c,v 1.7 2024/08/18 20:47:21 christos Exp $ */ 28585484eSchristos 38585484eSchristos /* 48585484eSchristos * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson 58585484eSchristos * 68585484eSchristos * Redistribution and use in source and binary forms, with or without 78585484eSchristos * modification, are permitted provided that the following conditions 88585484eSchristos * are met: 98585484eSchristos * 1. Redistributions of source code must retain the above copyright 108585484eSchristos * notice, this list of conditions and the following disclaimer. 118585484eSchristos * 2. Redistributions in binary form must reproduce the above copyright 128585484eSchristos * notice, this list of conditions and the following disclaimer in the 138585484eSchristos * documentation and/or other materials provided with the distribution. 148585484eSchristos * 3. The name of the author may not be used to endorse or promote products 158585484eSchristos * derived from this software without specific prior written permission. 168585484eSchristos * 178585484eSchristos * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 188585484eSchristos * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 198585484eSchristos * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 208585484eSchristos * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 218585484eSchristos * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 228585484eSchristos * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 238585484eSchristos * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 248585484eSchristos * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 258585484eSchristos * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 268585484eSchristos * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 278585484eSchristos */ 288585484eSchristos #include "event2/event-config.h" 298585484eSchristos #include "evconfig-private.h" 308585484eSchristos 318585484eSchristos #include <sys/types.h> 328585484eSchristos 338585484eSchristos #ifdef _WIN32 348585484eSchristos #include <winsock2.h> 358585484eSchristos #endif 368585484eSchristos 378585484eSchristos #include "event2/util.h" 388585484eSchristos #include "event2/buffer.h" 398585484eSchristos #include "event2/bufferevent.h" 408585484eSchristos #include "event2/bufferevent_struct.h" 418585484eSchristos #include "event2/event.h" 428585484eSchristos #include "defer-internal.h" 438585484eSchristos #include "bufferevent-internal.h" 448585484eSchristos #include "mm-internal.h" 458585484eSchristos #include "util-internal.h" 468585484eSchristos 478585484eSchristos struct bufferevent_pair { 488585484eSchristos struct bufferevent_private bev; 498585484eSchristos struct bufferevent_pair *partner; 507476e6e4Schristos /* For ->destruct() lock checking */ 517476e6e4Schristos struct bufferevent_pair *unlinked_partner; 528585484eSchristos }; 538585484eSchristos 548585484eSchristos 558585484eSchristos /* Given a bufferevent that's really a bev part of a bufferevent_pair, 568585484eSchristos * return that bufferevent_filtered. Returns NULL otherwise.*/ 578585484eSchristos static inline struct bufferevent_pair * 588585484eSchristos upcast(struct bufferevent *bev) 598585484eSchristos { 608585484eSchristos struct bufferevent_pair *bev_p; 61*eabc0478Schristos if (!BEV_IS_PAIR(bev)) 628585484eSchristos return NULL; 638585484eSchristos bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev); 64*eabc0478Schristos EVUTIL_ASSERT(BEV_IS_PAIR(&bev_p->bev.bev)); 658585484eSchristos return bev_p; 668585484eSchristos } 678585484eSchristos 688585484eSchristos #define downcast(bev_pair) (&(bev_pair)->bev.bev) 698585484eSchristos 708585484eSchristos static inline void 718585484eSchristos incref_and_lock(struct bufferevent *b) 728585484eSchristos { 738585484eSchristos struct bufferevent_pair *bevp; 748585484eSchristos bufferevent_incref_and_lock_(b); 758585484eSchristos bevp = upcast(b); 768585484eSchristos if (bevp->partner) 778585484eSchristos bufferevent_incref_and_lock_(downcast(bevp->partner)); 788585484eSchristos } 798585484eSchristos 808585484eSchristos static inline void 818585484eSchristos decref_and_unlock(struct bufferevent *b) 828585484eSchristos { 838585484eSchristos struct bufferevent_pair *bevp = upcast(b); 848585484eSchristos if (bevp->partner) 858585484eSchristos bufferevent_decref_and_unlock_(downcast(bevp->partner)); 868585484eSchristos bufferevent_decref_and_unlock_(b); 878585484eSchristos } 888585484eSchristos 898585484eSchristos /* XXX Handle close */ 908585484eSchristos 918585484eSchristos static void be_pair_outbuf_cb(struct evbuffer *, 928585484eSchristos const struct evbuffer_cb_info *, void *); 938585484eSchristos 948585484eSchristos static struct bufferevent_pair * 958585484eSchristos bufferevent_pair_elt_new(struct event_base *base, 968585484eSchristos int options) 978585484eSchristos { 988585484eSchristos struct bufferevent_pair *bufev; 998585484eSchristos if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair)))) 1008585484eSchristos return NULL; 1018585484eSchristos if (bufferevent_init_common_(&bufev->bev, base, &bufferevent_ops_pair, 1028585484eSchristos options)) { 1038585484eSchristos mm_free(bufev); 1048585484eSchristos return NULL; 1058585484eSchristos } 1068585484eSchristos if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) { 1078585484eSchristos bufferevent_free(downcast(bufev)); 1088585484eSchristos return NULL; 1098585484eSchristos } 1108585484eSchristos 1118585484eSchristos bufferevent_init_generic_timeout_cbs_(&bufev->bev.bev); 1128585484eSchristos 1138585484eSchristos return bufev; 1148585484eSchristos } 1158585484eSchristos 1168585484eSchristos int 1178585484eSchristos bufferevent_pair_new(struct event_base *base, int options, 1188585484eSchristos struct bufferevent *pair[2]) 1198585484eSchristos { 1208585484eSchristos struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL; 1218585484eSchristos int tmp_options; 1228585484eSchristos 1238585484eSchristos options |= BEV_OPT_DEFER_CALLBACKS; 1248585484eSchristos tmp_options = options & ~BEV_OPT_THREADSAFE; 1258585484eSchristos 1268585484eSchristos bufev1 = bufferevent_pair_elt_new(base, options); 1278585484eSchristos if (!bufev1) 1288585484eSchristos return -1; 1298585484eSchristos bufev2 = bufferevent_pair_elt_new(base, tmp_options); 1308585484eSchristos if (!bufev2) { 1318585484eSchristos bufferevent_free(downcast(bufev1)); 1328585484eSchristos return -1; 1338585484eSchristos } 1348585484eSchristos 1358585484eSchristos if (options & BEV_OPT_THREADSAFE) { 1368585484eSchristos /*XXXX check return */ 1378585484eSchristos bufferevent_enable_locking_(downcast(bufev2), bufev1->bev.lock); 1388585484eSchristos } 1398585484eSchristos 1408585484eSchristos bufev1->partner = bufev2; 1418585484eSchristos bufev2->partner = bufev1; 1428585484eSchristos 1438585484eSchristos evbuffer_freeze(downcast(bufev1)->input, 0); 1448585484eSchristos evbuffer_freeze(downcast(bufev1)->output, 1); 1458585484eSchristos evbuffer_freeze(downcast(bufev2)->input, 0); 1468585484eSchristos evbuffer_freeze(downcast(bufev2)->output, 1); 1478585484eSchristos 1488585484eSchristos pair[0] = downcast(bufev1); 1498585484eSchristos pair[1] = downcast(bufev2); 1508585484eSchristos 1518585484eSchristos return 0; 1528585484eSchristos } 1538585484eSchristos 1548585484eSchristos static void 1558585484eSchristos be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, 1568585484eSchristos int ignore_wm) 1578585484eSchristos { 158b8ecfcfeSchristos size_t dst_size; 1598585484eSchristos size_t n; 1608585484eSchristos 1618585484eSchristos evbuffer_unfreeze(src->output, 1); 1628585484eSchristos evbuffer_unfreeze(dst->input, 0); 1638585484eSchristos 1648585484eSchristos if (dst->wm_read.high) { 1658585484eSchristos dst_size = evbuffer_get_length(dst->input); 1668585484eSchristos if (dst_size < dst->wm_read.high) { 1678585484eSchristos n = dst->wm_read.high - dst_size; 1688585484eSchristos evbuffer_remove_buffer(src->output, dst->input, n); 1698585484eSchristos } else { 1708585484eSchristos if (!ignore_wm) 1718585484eSchristos goto done; 1728585484eSchristos n = evbuffer_get_length(src->output); 1738585484eSchristos evbuffer_add_buffer(dst->input, src->output); 1748585484eSchristos } 1758585484eSchristos } else { 1768585484eSchristos n = evbuffer_get_length(src->output); 1778585484eSchristos evbuffer_add_buffer(dst->input, src->output); 1788585484eSchristos } 1798585484eSchristos 1808585484eSchristos if (n) { 1818585484eSchristos BEV_RESET_GENERIC_READ_TIMEOUT(dst); 1828585484eSchristos 1838585484eSchristos if (evbuffer_get_length(dst->output)) 1848585484eSchristos BEV_RESET_GENERIC_WRITE_TIMEOUT(dst); 1858585484eSchristos else 1868585484eSchristos BEV_DEL_GENERIC_WRITE_TIMEOUT(dst); 1878585484eSchristos } 1888585484eSchristos 189b8ecfcfeSchristos bufferevent_trigger_nolock_(dst, EV_READ, 0); 190b8ecfcfeSchristos bufferevent_trigger_nolock_(src, EV_WRITE, 0); 1918585484eSchristos done: 1928585484eSchristos evbuffer_freeze(src->output, 1); 1938585484eSchristos evbuffer_freeze(dst->input, 0); 1948585484eSchristos } 1958585484eSchristos 1968585484eSchristos static inline int 1978585484eSchristos be_pair_wants_to_talk(struct bufferevent_pair *src, 1988585484eSchristos struct bufferevent_pair *dst) 1998585484eSchristos { 2008585484eSchristos return (downcast(src)->enabled & EV_WRITE) && 2018585484eSchristos (downcast(dst)->enabled & EV_READ) && 2028585484eSchristos !dst->bev.read_suspended && 2038585484eSchristos evbuffer_get_length(downcast(src)->output); 2048585484eSchristos } 2058585484eSchristos 2068585484eSchristos static void 2078585484eSchristos be_pair_outbuf_cb(struct evbuffer *outbuf, 2088585484eSchristos const struct evbuffer_cb_info *info, void *arg) 2098585484eSchristos { 2108585484eSchristos struct bufferevent_pair *bev_pair = arg; 2118585484eSchristos struct bufferevent_pair *partner = bev_pair->partner; 2128585484eSchristos 2138585484eSchristos incref_and_lock(downcast(bev_pair)); 2148585484eSchristos 2158585484eSchristos if (info->n_added > info->n_deleted && partner) { 2168585484eSchristos /* We got more data. If the other side's reading, then 2178585484eSchristos hand it over. */ 2188585484eSchristos if (be_pair_wants_to_talk(bev_pair, partner)) { 2198585484eSchristos be_pair_transfer(downcast(bev_pair), downcast(partner), 0); 2208585484eSchristos } 2218585484eSchristos } 2228585484eSchristos 2238585484eSchristos decref_and_unlock(downcast(bev_pair)); 2248585484eSchristos } 2258585484eSchristos 2268585484eSchristos static int 2278585484eSchristos be_pair_enable(struct bufferevent *bufev, short events) 2288585484eSchristos { 2298585484eSchristos struct bufferevent_pair *bev_p = upcast(bufev); 2308585484eSchristos struct bufferevent_pair *partner = bev_p->partner; 2318585484eSchristos 2328585484eSchristos incref_and_lock(bufev); 2338585484eSchristos 2348585484eSchristos if (events & EV_READ) { 2358585484eSchristos BEV_RESET_GENERIC_READ_TIMEOUT(bufev); 2368585484eSchristos } 2378585484eSchristos if ((events & EV_WRITE) && evbuffer_get_length(bufev->output)) 2388585484eSchristos BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); 2398585484eSchristos 2408585484eSchristos /* We're starting to read! Does the other side have anything to write?*/ 2418585484eSchristos if ((events & EV_READ) && partner && 2428585484eSchristos be_pair_wants_to_talk(partner, bev_p)) { 2438585484eSchristos be_pair_transfer(downcast(partner), bufev, 0); 2448585484eSchristos } 2458585484eSchristos /* We're starting to write! Does the other side want to read? */ 2468585484eSchristos if ((events & EV_WRITE) && partner && 2478585484eSchristos be_pair_wants_to_talk(bev_p, partner)) { 2488585484eSchristos be_pair_transfer(bufev, downcast(partner), 0); 2498585484eSchristos } 2508585484eSchristos decref_and_unlock(bufev); 2518585484eSchristos return 0; 2528585484eSchristos } 2538585484eSchristos 2548585484eSchristos static int 2558585484eSchristos be_pair_disable(struct bufferevent *bev, short events) 2568585484eSchristos { 2578585484eSchristos if (events & EV_READ) { 2588585484eSchristos BEV_DEL_GENERIC_READ_TIMEOUT(bev); 2598585484eSchristos } 2608585484eSchristos if (events & EV_WRITE) { 2618585484eSchristos BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 2628585484eSchristos } 2638585484eSchristos return 0; 2648585484eSchristos } 2658585484eSchristos 2668585484eSchristos static void 267b8ecfcfeSchristos be_pair_unlink(struct bufferevent *bev) 2688585484eSchristos { 2698585484eSchristos struct bufferevent_pair *bev_p = upcast(bev); 2708585484eSchristos 2718585484eSchristos if (bev_p->partner) { 2727476e6e4Schristos bev_p->unlinked_partner = bev_p->partner; 2738585484eSchristos bev_p->partner->partner = NULL; 2748585484eSchristos bev_p->partner = NULL; 2758585484eSchristos } 2768585484eSchristos } 2778585484eSchristos 2787476e6e4Schristos /* Free *shared* lock in the latest be (since we share it between two of them). */ 2797476e6e4Schristos static void 2807476e6e4Schristos be_pair_destruct(struct bufferevent *bev) 2817476e6e4Schristos { 2827476e6e4Schristos struct bufferevent_pair *bev_p = upcast(bev); 2837476e6e4Schristos 2847476e6e4Schristos /* Transfer ownership of the lock into partner, otherwise we will use 2857476e6e4Schristos * already free'd lock during freeing second bev, see next example: 2867476e6e4Schristos * 2877476e6e4Schristos * bev1->own_lock = 1 2887476e6e4Schristos * bev2->own_lock = 0 2897476e6e4Schristos * bev2->lock = bev1->lock 2907476e6e4Schristos * 2917476e6e4Schristos * bufferevent_free(bev1) # refcnt == 0 -> unlink 2927476e6e4Schristos * bufferevent_free(bev2) # refcnt == 0 -> unlink 2937476e6e4Schristos * 2947476e6e4Schristos * event_base_free() -> finilizers -> EVTHREAD_FREE_LOCK(bev1->lock) 2957476e6e4Schristos * -> BEV_LOCK(bev2->lock) <-- already freed 2967476e6e4Schristos * 2977476e6e4Schristos * Where bev1 == pair[0], bev2 == pair[1]. 2987476e6e4Schristos */ 2997476e6e4Schristos if (bev_p->unlinked_partner && bev_p->bev.own_lock) { 3007476e6e4Schristos bev_p->unlinked_partner->bev.own_lock = 1; 3017476e6e4Schristos bev_p->bev.own_lock = 0; 3027476e6e4Schristos } 3037476e6e4Schristos bev_p->unlinked_partner = NULL; 3047476e6e4Schristos } 3057476e6e4Schristos 3068585484eSchristos static int 3078585484eSchristos be_pair_flush(struct bufferevent *bev, short iotype, 3088585484eSchristos enum bufferevent_flush_mode mode) 3098585484eSchristos { 3108585484eSchristos struct bufferevent_pair *bev_p = upcast(bev); 3118585484eSchristos struct bufferevent *partner; 312*eabc0478Schristos 3138585484eSchristos if (!bev_p->partner) 3148585484eSchristos return -1; 3158585484eSchristos 3168585484eSchristos if (mode == BEV_NORMAL) 3178585484eSchristos return 0; 3188585484eSchristos 319*eabc0478Schristos incref_and_lock(bev); 320*eabc0478Schristos 321*eabc0478Schristos partner = downcast(bev_p->partner); 322*eabc0478Schristos 3238585484eSchristos if ((iotype & EV_READ) != 0) 3248585484eSchristos be_pair_transfer(partner, bev, 1); 3258585484eSchristos 3268585484eSchristos if ((iotype & EV_WRITE) != 0) 3278585484eSchristos be_pair_transfer(bev, partner, 1); 3288585484eSchristos 3298585484eSchristos if (mode == BEV_FINISHED) { 330*eabc0478Schristos short what = BEV_EVENT_EOF; 331*eabc0478Schristos if (iotype & EV_READ) 332*eabc0478Schristos what |= BEV_EVENT_WRITING; 333*eabc0478Schristos if (iotype & EV_WRITE) 334*eabc0478Schristos what |= BEV_EVENT_READING; 335*eabc0478Schristos bufferevent_run_eventcb_(partner, what, 0); 3368585484eSchristos } 3378585484eSchristos decref_and_unlock(bev); 3388585484eSchristos return 0; 3398585484eSchristos } 3408585484eSchristos 3418585484eSchristos struct bufferevent * 3428585484eSchristos bufferevent_pair_get_partner(struct bufferevent *bev) 3438585484eSchristos { 3448585484eSchristos struct bufferevent_pair *bev_p; 345b8ecfcfeSchristos struct bufferevent *partner = NULL; 3468585484eSchristos bev_p = upcast(bev); 3478585484eSchristos if (! bev_p) 3488585484eSchristos return NULL; 3498585484eSchristos 3508585484eSchristos incref_and_lock(bev); 351b8ecfcfeSchristos if (bev_p->partner) 3528585484eSchristos partner = downcast(bev_p->partner); 3538585484eSchristos decref_and_unlock(bev); 3548585484eSchristos return partner; 3558585484eSchristos } 3568585484eSchristos 3578585484eSchristos const struct bufferevent_ops bufferevent_ops_pair = { 3588585484eSchristos "pair_elt", 3598585484eSchristos evutil_offsetof(struct bufferevent_pair, bev.bev), 3608585484eSchristos be_pair_enable, 3618585484eSchristos be_pair_disable, 362b8ecfcfeSchristos be_pair_unlink, 3637476e6e4Schristos be_pair_destruct, 3648585484eSchristos bufferevent_generic_adj_timeouts_, 3658585484eSchristos be_pair_flush, 3668585484eSchristos NULL, /* ctrl */ 3678585484eSchristos }; 368