1*eabc0478Schristos /* $NetBSD: bufferevent_async.c,v 1.7 2024/08/18 20:47:20 christos Exp $ */ 28585484eSchristos 38585484eSchristos /* 48585484eSchristos * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 58585484eSchristos * 68585484eSchristos * All rights reserved. 78585484eSchristos * 88585484eSchristos * Redistribution and use in source and binary forms, with or without 98585484eSchristos * modification, are permitted provided that the following conditions 108585484eSchristos * are met: 118585484eSchristos * 1. Redistributions of source code must retain the above copyright 128585484eSchristos * notice, this list of conditions and the following disclaimer. 138585484eSchristos * 2. Redistributions in binary form must reproduce the above copyright 148585484eSchristos * notice, this list of conditions and the following disclaimer in the 158585484eSchristos * documentation and/or other materials provided with the distribution. 168585484eSchristos * 3. The name of the author may not be used to endorse or promote products 178585484eSchristos * derived from this software without specific prior written permission. 188585484eSchristos * 198585484eSchristos * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 208585484eSchristos * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 218585484eSchristos * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 228585484eSchristos * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 238585484eSchristos * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 248585484eSchristos * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 258585484eSchristos * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 268585484eSchristos * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 278585484eSchristos * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 288585484eSchristos * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 298585484eSchristos */ 308585484eSchristos 318585484eSchristos #include "event2/event-config.h" 328585484eSchristos #include "evconfig-private.h" 338585484eSchristos 348585484eSchristos #ifdef EVENT__HAVE_SYS_TIME_H 358585484eSchristos #include <sys/time.h> 368585484eSchristos #endif 378585484eSchristos 388585484eSchristos #include <errno.h> 398585484eSchristos #include <stdio.h> 408585484eSchristos #include <stdlib.h> 418585484eSchristos #include <string.h> 428585484eSchristos #ifdef EVENT__HAVE_STDARG_H 438585484eSchristos #include <stdarg.h> 448585484eSchristos #endif 458585484eSchristos #ifdef EVENT__HAVE_UNISTD_H 468585484eSchristos #include <unistd.h> 478585484eSchristos #endif 488585484eSchristos 498585484eSchristos #ifdef _WIN32 508585484eSchristos #include <winsock2.h> 51*eabc0478Schristos #include <winerror.h> 528585484eSchristos #include <ws2tcpip.h> 538585484eSchristos #endif 548585484eSchristos 558585484eSchristos #include <sys/queue.h> 568585484eSchristos 578585484eSchristos #include "event2/util.h" 588585484eSchristos #include "event2/bufferevent.h" 598585484eSchristos #include "event2/buffer.h" 608585484eSchristos #include "event2/bufferevent_struct.h" 618585484eSchristos #include "event2/event.h" 628585484eSchristos #include "event2/util.h" 638585484eSchristos #include "event-internal.h" 648585484eSchristos #include "log-internal.h" 658585484eSchristos #include "mm-internal.h" 668585484eSchristos #include "bufferevent-internal.h" 678585484eSchristos #include "util-internal.h" 688585484eSchristos #include "iocp-internal.h" 698585484eSchristos 708585484eSchristos #ifndef SO_UPDATE_CONNECT_CONTEXT 718585484eSchristos /* Mingw is sometimes missing this */ 728585484eSchristos #define SO_UPDATE_CONNECT_CONTEXT 0x7010 738585484eSchristos #endif 748585484eSchristos 758585484eSchristos /* prototypes */ 768585484eSchristos static int be_async_enable(struct bufferevent *, short); 778585484eSchristos static int be_async_disable(struct bufferevent *, short); 788585484eSchristos static void be_async_destruct(struct bufferevent *); 798585484eSchristos static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 808585484eSchristos static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 818585484eSchristos 828585484eSchristos struct bufferevent_async { 838585484eSchristos struct bufferevent_private bev; 848585484eSchristos struct event_overlapped connect_overlapped; 858585484eSchristos struct event_overlapped read_overlapped; 868585484eSchristos struct event_overlapped write_overlapped; 878585484eSchristos size_t read_in_progress; 888585484eSchristos size_t write_in_progress; 898585484eSchristos unsigned ok : 1; 908585484eSchristos unsigned read_added : 1; 918585484eSchristos unsigned write_added : 1; 928585484eSchristos }; 938585484eSchristos 948585484eSchristos const struct bufferevent_ops bufferevent_ops_async = { 958585484eSchristos "socket_async", 968585484eSchristos evutil_offsetof(struct bufferevent_async, bev.bev), 978585484eSchristos be_async_enable, 988585484eSchristos be_async_disable, 99b8ecfcfeSchristos NULL, /* Unlink */ 1008585484eSchristos be_async_destruct, 1018585484eSchristos bufferevent_generic_adj_timeouts_, 1028585484eSchristos be_async_flush, 1038585484eSchristos be_async_ctrl, 1048585484eSchristos }; 1058585484eSchristos 106*eabc0478Schristos static inline void 107*eabc0478Schristos be_async_run_eventcb(struct bufferevent *bev, short what, int options) 108*eabc0478Schristos { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } 109*eabc0478Schristos 110*eabc0478Schristos static inline void 111*eabc0478Schristos be_async_trigger_nolock(struct bufferevent *bev, short what, int options) 112*eabc0478Schristos { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } 113*eabc0478Schristos 114*eabc0478Schristos static inline int 115*eabc0478Schristos fatal_error(int err) 116*eabc0478Schristos { 117*eabc0478Schristos switch (err) { 118*eabc0478Schristos /* We may have already associated this fd with a port. 119*eabc0478Schristos * Let's hope it's this port, and that the error code 120*eabc0478Schristos * for doing this neer changes. */ 121*eabc0478Schristos case ERROR_INVALID_PARAMETER: 122*eabc0478Schristos return 0; 123*eabc0478Schristos } 124*eabc0478Schristos return 1; 125*eabc0478Schristos } 126*eabc0478Schristos 1278585484eSchristos static inline struct bufferevent_async * 1288585484eSchristos upcast(struct bufferevent *bev) 1298585484eSchristos { 1308585484eSchristos struct bufferevent_async *bev_a; 131*eabc0478Schristos if (!BEV_IS_ASYNC(bev)) 1328585484eSchristos return NULL; 1338585484eSchristos bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 1348585484eSchristos return bev_a; 1358585484eSchristos } 1368585484eSchristos 1378585484eSchristos static inline struct bufferevent_async * 1388585484eSchristos upcast_connect(struct event_overlapped *eo) 1398585484eSchristos { 1408585484eSchristos struct bufferevent_async *bev_a; 1418585484eSchristos bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 1428585484eSchristos EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 1438585484eSchristos return bev_a; 1448585484eSchristos } 1458585484eSchristos 1468585484eSchristos static inline struct bufferevent_async * 1478585484eSchristos upcast_read(struct event_overlapped *eo) 1488585484eSchristos { 1498585484eSchristos struct bufferevent_async *bev_a; 1508585484eSchristos bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 1518585484eSchristos EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 1528585484eSchristos return bev_a; 1538585484eSchristos } 1548585484eSchristos 1558585484eSchristos static inline struct bufferevent_async * 1568585484eSchristos upcast_write(struct event_overlapped *eo) 1578585484eSchristos { 1588585484eSchristos struct bufferevent_async *bev_a; 1598585484eSchristos bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 1608585484eSchristos EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 1618585484eSchristos return bev_a; 1628585484eSchristos } 1638585484eSchristos 1648585484eSchristos static void 1658585484eSchristos bev_async_del_write(struct bufferevent_async *beva) 1668585484eSchristos { 1678585484eSchristos struct bufferevent *bev = &beva->bev.bev; 1688585484eSchristos 1698585484eSchristos if (beva->write_added) { 1708585484eSchristos beva->write_added = 0; 1718585484eSchristos event_base_del_virtual_(bev->ev_base); 1728585484eSchristos } 1738585484eSchristos } 1748585484eSchristos 1758585484eSchristos static void 1768585484eSchristos bev_async_del_read(struct bufferevent_async *beva) 1778585484eSchristos { 1788585484eSchristos struct bufferevent *bev = &beva->bev.bev; 1798585484eSchristos 1808585484eSchristos if (beva->read_added) { 1818585484eSchristos beva->read_added = 0; 1828585484eSchristos event_base_del_virtual_(bev->ev_base); 1838585484eSchristos } 1848585484eSchristos } 1858585484eSchristos 1868585484eSchristos static void 1878585484eSchristos bev_async_add_write(struct bufferevent_async *beva) 1888585484eSchristos { 1898585484eSchristos struct bufferevent *bev = &beva->bev.bev; 1908585484eSchristos 1918585484eSchristos if (!beva->write_added) { 1928585484eSchristos beva->write_added = 1; 1938585484eSchristos event_base_add_virtual_(bev->ev_base); 1948585484eSchristos } 1958585484eSchristos } 1968585484eSchristos 1978585484eSchristos static void 1988585484eSchristos bev_async_add_read(struct bufferevent_async *beva) 1998585484eSchristos { 2008585484eSchristos struct bufferevent *bev = &beva->bev.bev; 2018585484eSchristos 2028585484eSchristos if (!beva->read_added) { 2038585484eSchristos beva->read_added = 1; 2048585484eSchristos event_base_add_virtual_(bev->ev_base); 2058585484eSchristos } 2068585484eSchristos } 2078585484eSchristos 2088585484eSchristos static void 2098585484eSchristos bev_async_consider_writing(struct bufferevent_async *beva) 2108585484eSchristos { 2118585484eSchristos size_t at_most; 2128585484eSchristos int limit; 2138585484eSchristos struct bufferevent *bev = &beva->bev.bev; 2148585484eSchristos 2158585484eSchristos /* Don't write if there's a write in progress, or we do not 2168585484eSchristos * want to write, or when there's nothing left to write. */ 2178585484eSchristos if (beva->write_in_progress || beva->bev.connecting) 2188585484eSchristos return; 2198585484eSchristos if (!beva->ok || !(bev->enabled&EV_WRITE) || 2208585484eSchristos !evbuffer_get_length(bev->output)) { 2218585484eSchristos bev_async_del_write(beva); 2228585484eSchristos return; 2238585484eSchristos } 2248585484eSchristos 2258585484eSchristos at_most = evbuffer_get_length(bev->output); 2268585484eSchristos 2278585484eSchristos /* This is safe so long as bufferevent_get_write_max never returns 2288585484eSchristos * more than INT_MAX. That's true for now. XXXX */ 2298585484eSchristos limit = (int)bufferevent_get_write_max_(&beva->bev); 2308585484eSchristos if (at_most >= (size_t)limit && limit >= 0) 2318585484eSchristos at_most = limit; 2328585484eSchristos 2338585484eSchristos if (beva->bev.write_suspended) { 2348585484eSchristos bev_async_del_write(beva); 2358585484eSchristos return; 2368585484eSchristos } 2378585484eSchristos 2388585484eSchristos /* XXXX doesn't respect low-water mark very well. */ 2398585484eSchristos bufferevent_incref_(bev); 2408585484eSchristos if (evbuffer_launch_write_(bev->output, at_most, 2418585484eSchristos &beva->write_overlapped)) { 2428585484eSchristos bufferevent_decref_(bev); 2438585484eSchristos beva->ok = 0; 244*eabc0478Schristos be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); 2458585484eSchristos } else { 2468585484eSchristos beva->write_in_progress = at_most; 2478585484eSchristos bufferevent_decrement_write_buckets_(&beva->bev, at_most); 2488585484eSchristos bev_async_add_write(beva); 2498585484eSchristos } 2508585484eSchristos } 2518585484eSchristos 2528585484eSchristos static void 2538585484eSchristos bev_async_consider_reading(struct bufferevent_async *beva) 2548585484eSchristos { 2558585484eSchristos size_t cur_size; 2568585484eSchristos size_t read_high; 2578585484eSchristos size_t at_most; 2588585484eSchristos int limit; 2598585484eSchristos struct bufferevent *bev = &beva->bev.bev; 2608585484eSchristos 2618585484eSchristos /* Don't read if there is a read in progress, or we do not 2628585484eSchristos * want to read. */ 2638585484eSchristos if (beva->read_in_progress || beva->bev.connecting) 2648585484eSchristos return; 2658585484eSchristos if (!beva->ok || !(bev->enabled&EV_READ)) { 2668585484eSchristos bev_async_del_read(beva); 2678585484eSchristos return; 2688585484eSchristos } 2698585484eSchristos 2708585484eSchristos /* Don't read if we're full */ 2718585484eSchristos cur_size = evbuffer_get_length(bev->input); 2728585484eSchristos read_high = bev->wm_read.high; 2738585484eSchristos if (read_high) { 2748585484eSchristos if (cur_size >= read_high) { 2758585484eSchristos bev_async_del_read(beva); 2768585484eSchristos return; 2778585484eSchristos } 2788585484eSchristos at_most = read_high - cur_size; 2798585484eSchristos } else { 2808585484eSchristos at_most = 16384; /* FIXME totally magic. */ 2818585484eSchristos } 2828585484eSchristos 2838585484eSchristos /* XXXX This over-commits. */ 2848585484eSchristos /* XXXX see also not above on cast on bufferevent_get_write_max_() */ 2858585484eSchristos limit = (int)bufferevent_get_read_max_(&beva->bev); 2868585484eSchristos if (at_most >= (size_t)limit && limit >= 0) 2878585484eSchristos at_most = limit; 2888585484eSchristos 2898585484eSchristos if (beva->bev.read_suspended) { 2908585484eSchristos bev_async_del_read(beva); 2918585484eSchristos return; 2928585484eSchristos } 2938585484eSchristos 2948585484eSchristos bufferevent_incref_(bev); 2958585484eSchristos if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { 2968585484eSchristos beva->ok = 0; 297*eabc0478Schristos be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); 2988585484eSchristos bufferevent_decref_(bev); 2998585484eSchristos } else { 3008585484eSchristos beva->read_in_progress = at_most; 3018585484eSchristos bufferevent_decrement_read_buckets_(&beva->bev, at_most); 3028585484eSchristos bev_async_add_read(beva); 3038585484eSchristos } 3048585484eSchristos 3058585484eSchristos return; 3068585484eSchristos } 3078585484eSchristos 3088585484eSchristos static void 3098585484eSchristos be_async_outbuf_callback(struct evbuffer *buf, 3108585484eSchristos const struct evbuffer_cb_info *cbinfo, 3118585484eSchristos void *arg) 3128585484eSchristos { 3138585484eSchristos struct bufferevent *bev = arg; 3148585484eSchristos struct bufferevent_async *bev_async = upcast(bev); 3158585484eSchristos 3168585484eSchristos /* If we added data to the outbuf and were not writing before, 3178585484eSchristos * we may want to write now. */ 3188585484eSchristos 3198585484eSchristos bufferevent_incref_and_lock_(bev); 3208585484eSchristos 3218585484eSchristos if (cbinfo->n_added) 3228585484eSchristos bev_async_consider_writing(bev_async); 3238585484eSchristos 3248585484eSchristos bufferevent_decref_and_unlock_(bev); 3258585484eSchristos } 3268585484eSchristos 3278585484eSchristos static void 3288585484eSchristos be_async_inbuf_callback(struct evbuffer *buf, 3298585484eSchristos const struct evbuffer_cb_info *cbinfo, 3308585484eSchristos void *arg) 3318585484eSchristos { 3328585484eSchristos struct bufferevent *bev = arg; 3338585484eSchristos struct bufferevent_async *bev_async = upcast(bev); 3348585484eSchristos 3358585484eSchristos /* If we drained data from the inbuf and were not reading before, 3368585484eSchristos * we may want to read now */ 3378585484eSchristos 3388585484eSchristos bufferevent_incref_and_lock_(bev); 3398585484eSchristos 3408585484eSchristos if (cbinfo->n_deleted) 3418585484eSchristos bev_async_consider_reading(bev_async); 3428585484eSchristos 3438585484eSchristos bufferevent_decref_and_unlock_(bev); 3448585484eSchristos } 3458585484eSchristos 3468585484eSchristos static int 3478585484eSchristos be_async_enable(struct bufferevent *buf, short what) 3488585484eSchristos { 3498585484eSchristos struct bufferevent_async *bev_async = upcast(buf); 3508585484eSchristos 3518585484eSchristos if (!bev_async->ok) 3528585484eSchristos return -1; 3538585484eSchristos 3548585484eSchristos if (bev_async->bev.connecting) { 3558585484eSchristos /* Don't launch anything during connection attempts. */ 3568585484eSchristos return 0; 3578585484eSchristos } 3588585484eSchristos 3598585484eSchristos if (what & EV_READ) 3608585484eSchristos BEV_RESET_GENERIC_READ_TIMEOUT(buf); 3618585484eSchristos if (what & EV_WRITE) 3628585484eSchristos BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 3638585484eSchristos 3648585484eSchristos /* If we newly enable reading or writing, and we aren't reading or 3658585484eSchristos writing already, consider launching a new read or write. */ 3668585484eSchristos 3678585484eSchristos if (what & EV_READ) 3688585484eSchristos bev_async_consider_reading(bev_async); 3698585484eSchristos if (what & EV_WRITE) 3708585484eSchristos bev_async_consider_writing(bev_async); 3718585484eSchristos return 0; 3728585484eSchristos } 3738585484eSchristos 3748585484eSchristos static int 3758585484eSchristos be_async_disable(struct bufferevent *bev, short what) 3768585484eSchristos { 3778585484eSchristos struct bufferevent_async *bev_async = upcast(bev); 3788585484eSchristos /* XXXX If we disable reading or writing, we may want to consider 3798585484eSchristos * canceling any in-progress read or write operation, though it might 3808585484eSchristos * not work. */ 3818585484eSchristos 3828585484eSchristos if (what & EV_READ) { 3838585484eSchristos BEV_DEL_GENERIC_READ_TIMEOUT(bev); 3848585484eSchristos bev_async_del_read(bev_async); 3858585484eSchristos } 3868585484eSchristos if (what & EV_WRITE) { 3878585484eSchristos BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 3888585484eSchristos bev_async_del_write(bev_async); 3898585484eSchristos } 3908585484eSchristos 3918585484eSchristos return 0; 3928585484eSchristos } 3938585484eSchristos 3948585484eSchristos static void 3958585484eSchristos be_async_destruct(struct bufferevent *bev) 3968585484eSchristos { 3978585484eSchristos struct bufferevent_async *bev_async = upcast(bev); 3988585484eSchristos struct bufferevent_private *bev_p = BEV_UPCAST(bev); 3998585484eSchristos evutil_socket_t fd; 4008585484eSchristos 4018585484eSchristos EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 4028585484eSchristos !upcast(bev)->read_in_progress); 4038585484eSchristos 4048585484eSchristos bev_async_del_read(bev_async); 4058585484eSchristos bev_async_del_write(bev_async); 4068585484eSchristos 4078585484eSchristos fd = evbuffer_overlapped_get_fd_(bev->input); 408*eabc0478Schristos if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && 4097476e6e4Schristos (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { 4108585484eSchristos evutil_closesocket(fd); 411*eabc0478Schristos evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); 4128585484eSchristos } 4138585484eSchristos } 4148585484eSchristos 4158585484eSchristos /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 4168585484eSchristos * we use WSAGetOverlappedResult to translate. */ 4178585484eSchristos static void 4188585484eSchristos bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 4198585484eSchristos { 4208585484eSchristos DWORD bytes, flags; 4218585484eSchristos evutil_socket_t fd; 4228585484eSchristos 4238585484eSchristos fd = evbuffer_overlapped_get_fd_(bev->input); 4248585484eSchristos WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 4258585484eSchristos } 4268585484eSchristos 4278585484eSchristos static int 4288585484eSchristos be_async_flush(struct bufferevent *bev, short what, 4298585484eSchristos enum bufferevent_flush_mode mode) 4308585484eSchristos { 4318585484eSchristos return 0; 4328585484eSchristos } 4338585484eSchristos 4348585484eSchristos static void 4358585484eSchristos connect_complete(struct event_overlapped *eo, ev_uintptr_t key, 4368585484eSchristos ev_ssize_t nbytes, int ok) 4378585484eSchristos { 4388585484eSchristos struct bufferevent_async *bev_a = upcast_connect(eo); 4398585484eSchristos struct bufferevent *bev = &bev_a->bev.bev; 4408585484eSchristos evutil_socket_t sock; 4418585484eSchristos 4428585484eSchristos BEV_LOCK(bev); 4438585484eSchristos 4448585484eSchristos EVUTIL_ASSERT(bev_a->bev.connecting); 4458585484eSchristos bev_a->bev.connecting = 0; 4468585484eSchristos sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); 4478585484eSchristos /* XXXX Handle error? */ 4488585484eSchristos setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 4498585484eSchristos 4508585484eSchristos if (ok) 4518585484eSchristos bufferevent_async_set_connected_(bev); 4528585484eSchristos else 4538585484eSchristos bev_async_set_wsa_error(bev, eo); 4548585484eSchristos 455*eabc0478Schristos be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); 4568585484eSchristos 4578585484eSchristos event_base_del_virtual_(bev->ev_base); 4588585484eSchristos 4598585484eSchristos bufferevent_decref_and_unlock_(bev); 4608585484eSchristos } 4618585484eSchristos 4628585484eSchristos static void 4638585484eSchristos read_complete(struct event_overlapped *eo, ev_uintptr_t key, 4648585484eSchristos ev_ssize_t nbytes, int ok) 4658585484eSchristos { 4668585484eSchristos struct bufferevent_async *bev_a = upcast_read(eo); 4678585484eSchristos struct bufferevent *bev = &bev_a->bev.bev; 4688585484eSchristos short what = BEV_EVENT_READING; 4698585484eSchristos ev_ssize_t amount_unread; 4708585484eSchristos BEV_LOCK(bev); 4718585484eSchristos EVUTIL_ASSERT(bev_a->read_in_progress); 4728585484eSchristos 4738585484eSchristos amount_unread = bev_a->read_in_progress - nbytes; 4748585484eSchristos evbuffer_commit_read_(bev->input, nbytes); 4758585484eSchristos bev_a->read_in_progress = 0; 4768585484eSchristos if (amount_unread) 4778585484eSchristos bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); 4788585484eSchristos 4798585484eSchristos if (!ok) 4808585484eSchristos bev_async_set_wsa_error(bev, eo); 4818585484eSchristos 4828585484eSchristos if (bev_a->ok) { 4838585484eSchristos if (ok && nbytes) { 4848585484eSchristos BEV_RESET_GENERIC_READ_TIMEOUT(bev); 485*eabc0478Schristos be_async_trigger_nolock(bev, EV_READ, 0); 4868585484eSchristos bev_async_consider_reading(bev_a); 4878585484eSchristos } else if (!ok) { 4888585484eSchristos what |= BEV_EVENT_ERROR; 4898585484eSchristos bev_a->ok = 0; 490*eabc0478Schristos be_async_run_eventcb(bev, what, 0); 4918585484eSchristos } else if (!nbytes) { 4928585484eSchristos what |= BEV_EVENT_EOF; 4938585484eSchristos bev_a->ok = 0; 494*eabc0478Schristos be_async_run_eventcb(bev, what, 0); 4958585484eSchristos } 4968585484eSchristos } 4978585484eSchristos 4988585484eSchristos bufferevent_decref_and_unlock_(bev); 4998585484eSchristos } 5008585484eSchristos 5018585484eSchristos static void 5028585484eSchristos write_complete(struct event_overlapped *eo, ev_uintptr_t key, 5038585484eSchristos ev_ssize_t nbytes, int ok) 5048585484eSchristos { 5058585484eSchristos struct bufferevent_async *bev_a = upcast_write(eo); 5068585484eSchristos struct bufferevent *bev = &bev_a->bev.bev; 5078585484eSchristos short what = BEV_EVENT_WRITING; 5088585484eSchristos ev_ssize_t amount_unwritten; 5098585484eSchristos 5108585484eSchristos BEV_LOCK(bev); 5118585484eSchristos EVUTIL_ASSERT(bev_a->write_in_progress); 5128585484eSchristos 5138585484eSchristos amount_unwritten = bev_a->write_in_progress - nbytes; 5148585484eSchristos evbuffer_commit_write_(bev->output, nbytes); 5158585484eSchristos bev_a->write_in_progress = 0; 5168585484eSchristos 5178585484eSchristos if (amount_unwritten) 5188585484eSchristos bufferevent_decrement_write_buckets_(&bev_a->bev, 5198585484eSchristos -amount_unwritten); 5208585484eSchristos 5218585484eSchristos 5228585484eSchristos if (!ok) 5238585484eSchristos bev_async_set_wsa_error(bev, eo); 5248585484eSchristos 5258585484eSchristos if (bev_a->ok) { 5268585484eSchristos if (ok && nbytes) { 5278585484eSchristos BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 528*eabc0478Schristos be_async_trigger_nolock(bev, EV_WRITE, 0); 5298585484eSchristos bev_async_consider_writing(bev_a); 5308585484eSchristos } else if (!ok) { 5318585484eSchristos what |= BEV_EVENT_ERROR; 5328585484eSchristos bev_a->ok = 0; 533*eabc0478Schristos be_async_run_eventcb(bev, what, 0); 5348585484eSchristos } else if (!nbytes) { 5358585484eSchristos what |= BEV_EVENT_EOF; 5368585484eSchristos bev_a->ok = 0; 537*eabc0478Schristos be_async_run_eventcb(bev, what, 0); 5388585484eSchristos } 5398585484eSchristos } 5408585484eSchristos 5418585484eSchristos bufferevent_decref_and_unlock_(bev); 5428585484eSchristos } 5438585484eSchristos 5448585484eSchristos struct bufferevent * 5458585484eSchristos bufferevent_async_new_(struct event_base *base, 5468585484eSchristos evutil_socket_t fd, int options) 5478585484eSchristos { 5488585484eSchristos struct bufferevent_async *bev_a; 5498585484eSchristos struct bufferevent *bev; 5508585484eSchristos struct event_iocp_port *iocp; 5518585484eSchristos 5528585484eSchristos options |= BEV_OPT_THREADSAFE; 5538585484eSchristos 5548585484eSchristos if (!(iocp = event_base_get_iocp_(base))) 5558585484eSchristos return NULL; 5568585484eSchristos 5578585484eSchristos if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { 558*eabc0478Schristos if (fatal_error(GetLastError())) 5598585484eSchristos return NULL; 5608585484eSchristos } 5618585484eSchristos 5628585484eSchristos if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 5638585484eSchristos return NULL; 5648585484eSchristos 5658585484eSchristos bev = &bev_a->bev.bev; 5668585484eSchristos if (!(bev->input = evbuffer_overlapped_new_(fd))) { 5678585484eSchristos mm_free(bev_a); 5688585484eSchristos return NULL; 5698585484eSchristos } 5708585484eSchristos if (!(bev->output = evbuffer_overlapped_new_(fd))) { 5718585484eSchristos evbuffer_free(bev->input); 5728585484eSchristos mm_free(bev_a); 5738585484eSchristos return NULL; 5748585484eSchristos } 5758585484eSchristos 5768585484eSchristos if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, 5778585484eSchristos options)<0) 5788585484eSchristos goto err; 5798585484eSchristos 5808585484eSchristos evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 5818585484eSchristos evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 5828585484eSchristos 5838585484eSchristos event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); 5848585484eSchristos event_overlapped_init_(&bev_a->read_overlapped, read_complete); 5858585484eSchristos event_overlapped_init_(&bev_a->write_overlapped, write_complete); 5868585484eSchristos 5878585484eSchristos bufferevent_init_generic_timeout_cbs_(bev); 5888585484eSchristos 5897476e6e4Schristos bev_a->ok = fd >= 0; 5907476e6e4Schristos 5918585484eSchristos return bev; 5928585484eSchristos err: 5938585484eSchristos bufferevent_free(&bev_a->bev.bev); 5948585484eSchristos return NULL; 5958585484eSchristos } 5968585484eSchristos 5978585484eSchristos void 5988585484eSchristos bufferevent_async_set_connected_(struct bufferevent *bev) 5998585484eSchristos { 6008585484eSchristos struct bufferevent_async *bev_async = upcast(bev); 6018585484eSchristos bev_async->ok = 1; 6028585484eSchristos /* Now's a good time to consider reading/writing */ 6038585484eSchristos be_async_enable(bev, bev->enabled); 6048585484eSchristos } 6058585484eSchristos 6068585484eSchristos int 6078585484eSchristos bufferevent_async_can_connect_(struct bufferevent *bev) 6088585484eSchristos { 6098585484eSchristos const struct win32_extension_fns *ext = 6108585484eSchristos event_get_win32_extension_fns_(); 6118585484eSchristos 6128585484eSchristos if (BEV_IS_ASYNC(bev) && 6138585484eSchristos event_base_get_iocp_(bev->ev_base) && 6148585484eSchristos ext && ext->ConnectEx) 6158585484eSchristos return 1; 6168585484eSchristos 6178585484eSchristos return 0; 6188585484eSchristos } 6198585484eSchristos 6208585484eSchristos int 6218585484eSchristos bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, 6228585484eSchristos const struct sockaddr *sa, int socklen) 6238585484eSchristos { 6248585484eSchristos BOOL rc; 6258585484eSchristos struct bufferevent_async *bev_async = upcast(bev); 6268585484eSchristos struct sockaddr_storage ss; 6278585484eSchristos const struct win32_extension_fns *ext = 6288585484eSchristos event_get_win32_extension_fns_(); 6298585484eSchristos 6308585484eSchristos EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 6318585484eSchristos 6328585484eSchristos /* ConnectEx() requires that the socket be bound to an address 6338585484eSchristos * with bind() before using, otherwise it will fail. We attempt 6348585484eSchristos * to issue a bind() here, taking into account that the error 6358585484eSchristos * code is set to WSAEINVAL when the socket is already bound. */ 6368585484eSchristos memset(&ss, 0, sizeof(ss)); 6378585484eSchristos if (sa->sa_family == AF_INET) { 6388585484eSchristos struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 6398585484eSchristos sin->sin_family = AF_INET; 6408585484eSchristos sin->sin_addr.s_addr = INADDR_ANY; 6418585484eSchristos } else if (sa->sa_family == AF_INET6) { 6428585484eSchristos struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 6438585484eSchristos sin6->sin6_family = AF_INET6; 6448585484eSchristos sin6->sin6_addr = in6addr_any; 6458585484eSchristos } else { 6468585484eSchristos /* Well, the user will have to bind() */ 6478585484eSchristos return -1; 6488585484eSchristos } 6498585484eSchristos if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 6508585484eSchristos WSAGetLastError() != WSAEINVAL) 6518585484eSchristos return -1; 6528585484eSchristos 6538585484eSchristos event_base_add_virtual_(bev->ev_base); 6548585484eSchristos bufferevent_incref_(bev); 6558585484eSchristos rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 6568585484eSchristos &bev_async->connect_overlapped.overlapped); 6578585484eSchristos if (rc || WSAGetLastError() == ERROR_IO_PENDING) 6588585484eSchristos return 0; 6598585484eSchristos 6608585484eSchristos event_base_del_virtual_(bev->ev_base); 6618585484eSchristos bufferevent_decref_(bev); 6628585484eSchristos 6638585484eSchristos return -1; 6648585484eSchristos } 6658585484eSchristos 6668585484eSchristos static int 6678585484eSchristos be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 6688585484eSchristos union bufferevent_ctrl_data *data) 6698585484eSchristos { 6708585484eSchristos switch (op) { 6718585484eSchristos case BEV_CTRL_GET_FD: 6728585484eSchristos data->fd = evbuffer_overlapped_get_fd_(bev->input); 6738585484eSchristos return 0; 6748585484eSchristos case BEV_CTRL_SET_FD: { 675*eabc0478Schristos struct bufferevent_async *bev_a = upcast(bev); 6768585484eSchristos struct event_iocp_port *iocp; 6778585484eSchristos 6788585484eSchristos if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) 6798585484eSchristos return 0; 6808585484eSchristos if (!(iocp = event_base_get_iocp_(bev->ev_base))) 6818585484eSchristos return -1; 682*eabc0478Schristos if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) { 683*eabc0478Schristos if (fatal_error(GetLastError())) 6848585484eSchristos return -1; 685*eabc0478Schristos } 6868585484eSchristos evbuffer_overlapped_set_fd_(bev->input, data->fd); 6878585484eSchristos evbuffer_overlapped_set_fd_(bev->output, data->fd); 688*eabc0478Schristos bev_a->ok = data->fd >= 0; 6898585484eSchristos return 0; 6908585484eSchristos } 6918585484eSchristos case BEV_CTRL_CANCEL_ALL: { 6928585484eSchristos struct bufferevent_async *bev_a = upcast(bev); 6938585484eSchristos evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); 694*eabc0478Schristos if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && 6958585484eSchristos (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 6968585484eSchristos closesocket(fd); 697*eabc0478Schristos evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); 6988585484eSchristos } 6998585484eSchristos bev_a->ok = 0; 7008585484eSchristos return 0; 7018585484eSchristos } 7028585484eSchristos case BEV_CTRL_GET_UNDERLYING: 7038585484eSchristos default: 7048585484eSchristos return -1; 7058585484eSchristos } 7068585484eSchristos } 7078585484eSchristos 7088585484eSchristos 709