xref: /netbsd-src/external/bsd/ntp/dist/sntp/libevent/bufferevent_async.c (revision eabc0478de71e4e011a5b4e0392741e01d491794)
1*eabc0478Schristos /*	$NetBSD: bufferevent_async.c,v 1.7 2024/08/18 20:47:20 christos Exp $	*/
28585484eSchristos 
38585484eSchristos /*
48585484eSchristos  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
58585484eSchristos  *
68585484eSchristos  * All rights reserved.
78585484eSchristos  *
88585484eSchristos  * Redistribution and use in source and binary forms, with or without
98585484eSchristos  * modification, are permitted provided that the following conditions
108585484eSchristos  * are met:
118585484eSchristos  * 1. Redistributions of source code must retain the above copyright
128585484eSchristos  *    notice, this list of conditions and the following disclaimer.
138585484eSchristos  * 2. Redistributions in binary form must reproduce the above copyright
148585484eSchristos  *    notice, this list of conditions and the following disclaimer in the
158585484eSchristos  *    documentation and/or other materials provided with the distribution.
168585484eSchristos  * 3. The name of the author may not be used to endorse or promote products
178585484eSchristos  *    derived from this software without specific prior written permission.
188585484eSchristos  *
198585484eSchristos  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
208585484eSchristos  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
218585484eSchristos  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
228585484eSchristos  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
238585484eSchristos  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
248585484eSchristos  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
258585484eSchristos  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
268585484eSchristos  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
278585484eSchristos  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
288585484eSchristos  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
298585484eSchristos  */
308585484eSchristos 
318585484eSchristos #include "event2/event-config.h"
328585484eSchristos #include "evconfig-private.h"
338585484eSchristos 
348585484eSchristos #ifdef EVENT__HAVE_SYS_TIME_H
358585484eSchristos #include <sys/time.h>
368585484eSchristos #endif
378585484eSchristos 
388585484eSchristos #include <errno.h>
398585484eSchristos #include <stdio.h>
408585484eSchristos #include <stdlib.h>
418585484eSchristos #include <string.h>
428585484eSchristos #ifdef EVENT__HAVE_STDARG_H
438585484eSchristos #include <stdarg.h>
448585484eSchristos #endif
458585484eSchristos #ifdef EVENT__HAVE_UNISTD_H
468585484eSchristos #include <unistd.h>
478585484eSchristos #endif
488585484eSchristos 
498585484eSchristos #ifdef _WIN32
508585484eSchristos #include <winsock2.h>
51*eabc0478Schristos #include <winerror.h>
528585484eSchristos #include <ws2tcpip.h>
538585484eSchristos #endif
548585484eSchristos 
558585484eSchristos #include <sys/queue.h>
568585484eSchristos 
578585484eSchristos #include "event2/util.h"
588585484eSchristos #include "event2/bufferevent.h"
598585484eSchristos #include "event2/buffer.h"
608585484eSchristos #include "event2/bufferevent_struct.h"
618585484eSchristos #include "event2/event.h"
628585484eSchristos #include "event2/util.h"
638585484eSchristos #include "event-internal.h"
648585484eSchristos #include "log-internal.h"
658585484eSchristos #include "mm-internal.h"
668585484eSchristos #include "bufferevent-internal.h"
678585484eSchristos #include "util-internal.h"
688585484eSchristos #include "iocp-internal.h"
698585484eSchristos 
708585484eSchristos #ifndef SO_UPDATE_CONNECT_CONTEXT
718585484eSchristos /* Mingw is sometimes missing this */
728585484eSchristos #define SO_UPDATE_CONNECT_CONTEXT 0x7010
738585484eSchristos #endif
748585484eSchristos 
758585484eSchristos /* prototypes */
768585484eSchristos static int be_async_enable(struct bufferevent *, short);
778585484eSchristos static int be_async_disable(struct bufferevent *, short);
788585484eSchristos static void be_async_destruct(struct bufferevent *);
798585484eSchristos static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
808585484eSchristos static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
818585484eSchristos 
828585484eSchristos struct bufferevent_async {
838585484eSchristos 	struct bufferevent_private bev;
848585484eSchristos 	struct event_overlapped connect_overlapped;
858585484eSchristos 	struct event_overlapped read_overlapped;
868585484eSchristos 	struct event_overlapped write_overlapped;
878585484eSchristos 	size_t read_in_progress;
888585484eSchristos 	size_t write_in_progress;
898585484eSchristos 	unsigned ok : 1;
908585484eSchristos 	unsigned read_added : 1;
918585484eSchristos 	unsigned write_added : 1;
928585484eSchristos };
938585484eSchristos 
948585484eSchristos const struct bufferevent_ops bufferevent_ops_async = {
958585484eSchristos 	"socket_async",
968585484eSchristos 	evutil_offsetof(struct bufferevent_async, bev.bev),
978585484eSchristos 	be_async_enable,
988585484eSchristos 	be_async_disable,
99b8ecfcfeSchristos 	NULL, /* Unlink */
1008585484eSchristos 	be_async_destruct,
1018585484eSchristos 	bufferevent_generic_adj_timeouts_,
1028585484eSchristos 	be_async_flush,
1038585484eSchristos 	be_async_ctrl,
1048585484eSchristos };
1058585484eSchristos 
106*eabc0478Schristos static inline void
107*eabc0478Schristos be_async_run_eventcb(struct bufferevent *bev, short what, int options)
108*eabc0478Schristos { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
109*eabc0478Schristos 
110*eabc0478Schristos static inline void
111*eabc0478Schristos be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
112*eabc0478Schristos { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
113*eabc0478Schristos 
114*eabc0478Schristos static inline int
115*eabc0478Schristos fatal_error(int err)
116*eabc0478Schristos {
117*eabc0478Schristos 	switch (err) {
118*eabc0478Schristos 		/* We may have already associated this fd with a port.
119*eabc0478Schristos 		 * Let's hope it's this port, and that the error code
120*eabc0478Schristos 		 * for doing this neer changes. */
121*eabc0478Schristos 		case ERROR_INVALID_PARAMETER:
122*eabc0478Schristos 			return 0;
123*eabc0478Schristos 	}
124*eabc0478Schristos 	return 1;
125*eabc0478Schristos }
126*eabc0478Schristos 
1278585484eSchristos static inline struct bufferevent_async *
1288585484eSchristos upcast(struct bufferevent *bev)
1298585484eSchristos {
1308585484eSchristos 	struct bufferevent_async *bev_a;
131*eabc0478Schristos 	if (!BEV_IS_ASYNC(bev))
1328585484eSchristos 		return NULL;
1338585484eSchristos 	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
1348585484eSchristos 	return bev_a;
1358585484eSchristos }
1368585484eSchristos 
1378585484eSchristos static inline struct bufferevent_async *
1388585484eSchristos upcast_connect(struct event_overlapped *eo)
1398585484eSchristos {
1408585484eSchristos 	struct bufferevent_async *bev_a;
1418585484eSchristos 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
1428585484eSchristos 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1438585484eSchristos 	return bev_a;
1448585484eSchristos }
1458585484eSchristos 
1468585484eSchristos static inline struct bufferevent_async *
1478585484eSchristos upcast_read(struct event_overlapped *eo)
1488585484eSchristos {
1498585484eSchristos 	struct bufferevent_async *bev_a;
1508585484eSchristos 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
1518585484eSchristos 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1528585484eSchristos 	return bev_a;
1538585484eSchristos }
1548585484eSchristos 
1558585484eSchristos static inline struct bufferevent_async *
1568585484eSchristos upcast_write(struct event_overlapped *eo)
1578585484eSchristos {
1588585484eSchristos 	struct bufferevent_async *bev_a;
1598585484eSchristos 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
1608585484eSchristos 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1618585484eSchristos 	return bev_a;
1628585484eSchristos }
1638585484eSchristos 
1648585484eSchristos static void
1658585484eSchristos bev_async_del_write(struct bufferevent_async *beva)
1668585484eSchristos {
1678585484eSchristos 	struct bufferevent *bev = &beva->bev.bev;
1688585484eSchristos 
1698585484eSchristos 	if (beva->write_added) {
1708585484eSchristos 		beva->write_added = 0;
1718585484eSchristos 		event_base_del_virtual_(bev->ev_base);
1728585484eSchristos 	}
1738585484eSchristos }
1748585484eSchristos 
1758585484eSchristos static void
1768585484eSchristos bev_async_del_read(struct bufferevent_async *beva)
1778585484eSchristos {
1788585484eSchristos 	struct bufferevent *bev = &beva->bev.bev;
1798585484eSchristos 
1808585484eSchristos 	if (beva->read_added) {
1818585484eSchristos 		beva->read_added = 0;
1828585484eSchristos 		event_base_del_virtual_(bev->ev_base);
1838585484eSchristos 	}
1848585484eSchristos }
1858585484eSchristos 
1868585484eSchristos static void
1878585484eSchristos bev_async_add_write(struct bufferevent_async *beva)
1888585484eSchristos {
1898585484eSchristos 	struct bufferevent *bev = &beva->bev.bev;
1908585484eSchristos 
1918585484eSchristos 	if (!beva->write_added) {
1928585484eSchristos 		beva->write_added = 1;
1938585484eSchristos 		event_base_add_virtual_(bev->ev_base);
1948585484eSchristos 	}
1958585484eSchristos }
1968585484eSchristos 
1978585484eSchristos static void
1988585484eSchristos bev_async_add_read(struct bufferevent_async *beva)
1998585484eSchristos {
2008585484eSchristos 	struct bufferevent *bev = &beva->bev.bev;
2018585484eSchristos 
2028585484eSchristos 	if (!beva->read_added) {
2038585484eSchristos 		beva->read_added = 1;
2048585484eSchristos 		event_base_add_virtual_(bev->ev_base);
2058585484eSchristos 	}
2068585484eSchristos }
2078585484eSchristos 
2088585484eSchristos static void
2098585484eSchristos bev_async_consider_writing(struct bufferevent_async *beva)
2108585484eSchristos {
2118585484eSchristos 	size_t at_most;
2128585484eSchristos 	int limit;
2138585484eSchristos 	struct bufferevent *bev = &beva->bev.bev;
2148585484eSchristos 
2158585484eSchristos 	/* Don't write if there's a write in progress, or we do not
2168585484eSchristos 	 * want to write, or when there's nothing left to write. */
2178585484eSchristos 	if (beva->write_in_progress || beva->bev.connecting)
2188585484eSchristos 		return;
2198585484eSchristos 	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
2208585484eSchristos 	    !evbuffer_get_length(bev->output)) {
2218585484eSchristos 		bev_async_del_write(beva);
2228585484eSchristos 		return;
2238585484eSchristos 	}
2248585484eSchristos 
2258585484eSchristos 	at_most = evbuffer_get_length(bev->output);
2268585484eSchristos 
2278585484eSchristos 	/* This is safe so long as bufferevent_get_write_max never returns
2288585484eSchristos 	 * more than INT_MAX.  That's true for now. XXXX */
2298585484eSchristos 	limit = (int)bufferevent_get_write_max_(&beva->bev);
2308585484eSchristos 	if (at_most >= (size_t)limit && limit >= 0)
2318585484eSchristos 		at_most = limit;
2328585484eSchristos 
2338585484eSchristos 	if (beva->bev.write_suspended) {
2348585484eSchristos 		bev_async_del_write(beva);
2358585484eSchristos 		return;
2368585484eSchristos 	}
2378585484eSchristos 
2388585484eSchristos 	/*  XXXX doesn't respect low-water mark very well. */
2398585484eSchristos 	bufferevent_incref_(bev);
2408585484eSchristos 	if (evbuffer_launch_write_(bev->output, at_most,
2418585484eSchristos 	    &beva->write_overlapped)) {
2428585484eSchristos 		bufferevent_decref_(bev);
2438585484eSchristos 		beva->ok = 0;
244*eabc0478Schristos 		be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
2458585484eSchristos 	} else {
2468585484eSchristos 		beva->write_in_progress = at_most;
2478585484eSchristos 		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
2488585484eSchristos 		bev_async_add_write(beva);
2498585484eSchristos 	}
2508585484eSchristos }
2518585484eSchristos 
2528585484eSchristos static void
2538585484eSchristos bev_async_consider_reading(struct bufferevent_async *beva)
2548585484eSchristos {
2558585484eSchristos 	size_t cur_size;
2568585484eSchristos 	size_t read_high;
2578585484eSchristos 	size_t at_most;
2588585484eSchristos 	int limit;
2598585484eSchristos 	struct bufferevent *bev = &beva->bev.bev;
2608585484eSchristos 
2618585484eSchristos 	/* Don't read if there is a read in progress, or we do not
2628585484eSchristos 	 * want to read. */
2638585484eSchristos 	if (beva->read_in_progress || beva->bev.connecting)
2648585484eSchristos 		return;
2658585484eSchristos 	if (!beva->ok || !(bev->enabled&EV_READ)) {
2668585484eSchristos 		bev_async_del_read(beva);
2678585484eSchristos 		return;
2688585484eSchristos 	}
2698585484eSchristos 
2708585484eSchristos 	/* Don't read if we're full */
2718585484eSchristos 	cur_size = evbuffer_get_length(bev->input);
2728585484eSchristos 	read_high = bev->wm_read.high;
2738585484eSchristos 	if (read_high) {
2748585484eSchristos 		if (cur_size >= read_high) {
2758585484eSchristos 			bev_async_del_read(beva);
2768585484eSchristos 			return;
2778585484eSchristos 		}
2788585484eSchristos 		at_most = read_high - cur_size;
2798585484eSchristos 	} else {
2808585484eSchristos 		at_most = 16384; /* FIXME totally magic. */
2818585484eSchristos 	}
2828585484eSchristos 
2838585484eSchristos 	/* XXXX This over-commits. */
2848585484eSchristos 	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
2858585484eSchristos 	limit = (int)bufferevent_get_read_max_(&beva->bev);
2868585484eSchristos 	if (at_most >= (size_t)limit && limit >= 0)
2878585484eSchristos 		at_most = limit;
2888585484eSchristos 
2898585484eSchristos 	if (beva->bev.read_suspended) {
2908585484eSchristos 		bev_async_del_read(beva);
2918585484eSchristos 		return;
2928585484eSchristos 	}
2938585484eSchristos 
2948585484eSchristos 	bufferevent_incref_(bev);
2958585484eSchristos 	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
2968585484eSchristos 		beva->ok = 0;
297*eabc0478Schristos 		be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
2988585484eSchristos 		bufferevent_decref_(bev);
2998585484eSchristos 	} else {
3008585484eSchristos 		beva->read_in_progress = at_most;
3018585484eSchristos 		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
3028585484eSchristos 		bev_async_add_read(beva);
3038585484eSchristos 	}
3048585484eSchristos 
3058585484eSchristos 	return;
3068585484eSchristos }
3078585484eSchristos 
3088585484eSchristos static void
3098585484eSchristos be_async_outbuf_callback(struct evbuffer *buf,
3108585484eSchristos     const struct evbuffer_cb_info *cbinfo,
3118585484eSchristos     void *arg)
3128585484eSchristos {
3138585484eSchristos 	struct bufferevent *bev = arg;
3148585484eSchristos 	struct bufferevent_async *bev_async = upcast(bev);
3158585484eSchristos 
3168585484eSchristos 	/* If we added data to the outbuf and were not writing before,
3178585484eSchristos 	 * we may want to write now. */
3188585484eSchristos 
3198585484eSchristos 	bufferevent_incref_and_lock_(bev);
3208585484eSchristos 
3218585484eSchristos 	if (cbinfo->n_added)
3228585484eSchristos 		bev_async_consider_writing(bev_async);
3238585484eSchristos 
3248585484eSchristos 	bufferevent_decref_and_unlock_(bev);
3258585484eSchristos }
3268585484eSchristos 
3278585484eSchristos static void
3288585484eSchristos be_async_inbuf_callback(struct evbuffer *buf,
3298585484eSchristos     const struct evbuffer_cb_info *cbinfo,
3308585484eSchristos     void *arg)
3318585484eSchristos {
3328585484eSchristos 	struct bufferevent *bev = arg;
3338585484eSchristos 	struct bufferevent_async *bev_async = upcast(bev);
3348585484eSchristos 
3358585484eSchristos 	/* If we drained data from the inbuf and were not reading before,
3368585484eSchristos 	 * we may want to read now */
3378585484eSchristos 
3388585484eSchristos 	bufferevent_incref_and_lock_(bev);
3398585484eSchristos 
3408585484eSchristos 	if (cbinfo->n_deleted)
3418585484eSchristos 		bev_async_consider_reading(bev_async);
3428585484eSchristos 
3438585484eSchristos 	bufferevent_decref_and_unlock_(bev);
3448585484eSchristos }
3458585484eSchristos 
3468585484eSchristos static int
3478585484eSchristos be_async_enable(struct bufferevent *buf, short what)
3488585484eSchristos {
3498585484eSchristos 	struct bufferevent_async *bev_async = upcast(buf);
3508585484eSchristos 
3518585484eSchristos 	if (!bev_async->ok)
3528585484eSchristos 		return -1;
3538585484eSchristos 
3548585484eSchristos 	if (bev_async->bev.connecting) {
3558585484eSchristos 		/* Don't launch anything during connection attempts. */
3568585484eSchristos 		return 0;
3578585484eSchristos 	}
3588585484eSchristos 
3598585484eSchristos 	if (what & EV_READ)
3608585484eSchristos 		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
3618585484eSchristos 	if (what & EV_WRITE)
3628585484eSchristos 		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
3638585484eSchristos 
3648585484eSchristos 	/* If we newly enable reading or writing, and we aren't reading or
3658585484eSchristos 	   writing already, consider launching a new read or write. */
3668585484eSchristos 
3678585484eSchristos 	if (what & EV_READ)
3688585484eSchristos 		bev_async_consider_reading(bev_async);
3698585484eSchristos 	if (what & EV_WRITE)
3708585484eSchristos 		bev_async_consider_writing(bev_async);
3718585484eSchristos 	return 0;
3728585484eSchristos }
3738585484eSchristos 
3748585484eSchristos static int
3758585484eSchristos be_async_disable(struct bufferevent *bev, short what)
3768585484eSchristos {
3778585484eSchristos 	struct bufferevent_async *bev_async = upcast(bev);
3788585484eSchristos 	/* XXXX If we disable reading or writing, we may want to consider
3798585484eSchristos 	 * canceling any in-progress read or write operation, though it might
3808585484eSchristos 	 * not work. */
3818585484eSchristos 
3828585484eSchristos 	if (what & EV_READ) {
3838585484eSchristos 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
3848585484eSchristos 		bev_async_del_read(bev_async);
3858585484eSchristos 	}
3868585484eSchristos 	if (what & EV_WRITE) {
3878585484eSchristos 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
3888585484eSchristos 		bev_async_del_write(bev_async);
3898585484eSchristos 	}
3908585484eSchristos 
3918585484eSchristos 	return 0;
3928585484eSchristos }
3938585484eSchristos 
3948585484eSchristos static void
3958585484eSchristos be_async_destruct(struct bufferevent *bev)
3968585484eSchristos {
3978585484eSchristos 	struct bufferevent_async *bev_async = upcast(bev);
3988585484eSchristos 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
3998585484eSchristos 	evutil_socket_t fd;
4008585484eSchristos 
4018585484eSchristos 	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
4028585484eSchristos 			!upcast(bev)->read_in_progress);
4038585484eSchristos 
4048585484eSchristos 	bev_async_del_read(bev_async);
4058585484eSchristos 	bev_async_del_write(bev_async);
4068585484eSchristos 
4078585484eSchristos 	fd = evbuffer_overlapped_get_fd_(bev->input);
408*eabc0478Schristos 	if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
4097476e6e4Schristos 		(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
4108585484eSchristos 		evutil_closesocket(fd);
411*eabc0478Schristos 		evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
4128585484eSchristos 	}
4138585484eSchristos }
4148585484eSchristos 
4158585484eSchristos /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
4168585484eSchristos  * we use WSAGetOverlappedResult to translate. */
4178585484eSchristos static void
4188585484eSchristos bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
4198585484eSchristos {
4208585484eSchristos 	DWORD bytes, flags;
4218585484eSchristos 	evutil_socket_t fd;
4228585484eSchristos 
4238585484eSchristos 	fd = evbuffer_overlapped_get_fd_(bev->input);
4248585484eSchristos 	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
4258585484eSchristos }
4268585484eSchristos 
4278585484eSchristos static int
4288585484eSchristos be_async_flush(struct bufferevent *bev, short what,
4298585484eSchristos     enum bufferevent_flush_mode mode)
4308585484eSchristos {
4318585484eSchristos 	return 0;
4328585484eSchristos }
4338585484eSchristos 
4348585484eSchristos static void
4358585484eSchristos connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
4368585484eSchristos     ev_ssize_t nbytes, int ok)
4378585484eSchristos {
4388585484eSchristos 	struct bufferevent_async *bev_a = upcast_connect(eo);
4398585484eSchristos 	struct bufferevent *bev = &bev_a->bev.bev;
4408585484eSchristos 	evutil_socket_t sock;
4418585484eSchristos 
4428585484eSchristos 	BEV_LOCK(bev);
4438585484eSchristos 
4448585484eSchristos 	EVUTIL_ASSERT(bev_a->bev.connecting);
4458585484eSchristos 	bev_a->bev.connecting = 0;
4468585484eSchristos 	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
4478585484eSchristos 	/* XXXX Handle error? */
4488585484eSchristos 	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
4498585484eSchristos 
4508585484eSchristos 	if (ok)
4518585484eSchristos 		bufferevent_async_set_connected_(bev);
4528585484eSchristos 	else
4538585484eSchristos 		bev_async_set_wsa_error(bev, eo);
4548585484eSchristos 
455*eabc0478Schristos 	be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
4568585484eSchristos 
4578585484eSchristos 	event_base_del_virtual_(bev->ev_base);
4588585484eSchristos 
4598585484eSchristos 	bufferevent_decref_and_unlock_(bev);
4608585484eSchristos }
4618585484eSchristos 
4628585484eSchristos static void
4638585484eSchristos read_complete(struct event_overlapped *eo, ev_uintptr_t key,
4648585484eSchristos     ev_ssize_t nbytes, int ok)
4658585484eSchristos {
4668585484eSchristos 	struct bufferevent_async *bev_a = upcast_read(eo);
4678585484eSchristos 	struct bufferevent *bev = &bev_a->bev.bev;
4688585484eSchristos 	short what = BEV_EVENT_READING;
4698585484eSchristos 	ev_ssize_t amount_unread;
4708585484eSchristos 	BEV_LOCK(bev);
4718585484eSchristos 	EVUTIL_ASSERT(bev_a->read_in_progress);
4728585484eSchristos 
4738585484eSchristos 	amount_unread = bev_a->read_in_progress - nbytes;
4748585484eSchristos 	evbuffer_commit_read_(bev->input, nbytes);
4758585484eSchristos 	bev_a->read_in_progress = 0;
4768585484eSchristos 	if (amount_unread)
4778585484eSchristos 		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
4788585484eSchristos 
4798585484eSchristos 	if (!ok)
4808585484eSchristos 		bev_async_set_wsa_error(bev, eo);
4818585484eSchristos 
4828585484eSchristos 	if (bev_a->ok) {
4838585484eSchristos 		if (ok && nbytes) {
4848585484eSchristos 			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
485*eabc0478Schristos 			be_async_trigger_nolock(bev, EV_READ, 0);
4868585484eSchristos 			bev_async_consider_reading(bev_a);
4878585484eSchristos 		} else if (!ok) {
4888585484eSchristos 			what |= BEV_EVENT_ERROR;
4898585484eSchristos 			bev_a->ok = 0;
490*eabc0478Schristos 			be_async_run_eventcb(bev, what, 0);
4918585484eSchristos 		} else if (!nbytes) {
4928585484eSchristos 			what |= BEV_EVENT_EOF;
4938585484eSchristos 			bev_a->ok = 0;
494*eabc0478Schristos 			be_async_run_eventcb(bev, what, 0);
4958585484eSchristos 		}
4968585484eSchristos 	}
4978585484eSchristos 
4988585484eSchristos 	bufferevent_decref_and_unlock_(bev);
4998585484eSchristos }
5008585484eSchristos 
5018585484eSchristos static void
5028585484eSchristos write_complete(struct event_overlapped *eo, ev_uintptr_t key,
5038585484eSchristos     ev_ssize_t nbytes, int ok)
5048585484eSchristos {
5058585484eSchristos 	struct bufferevent_async *bev_a = upcast_write(eo);
5068585484eSchristos 	struct bufferevent *bev = &bev_a->bev.bev;
5078585484eSchristos 	short what = BEV_EVENT_WRITING;
5088585484eSchristos 	ev_ssize_t amount_unwritten;
5098585484eSchristos 
5108585484eSchristos 	BEV_LOCK(bev);
5118585484eSchristos 	EVUTIL_ASSERT(bev_a->write_in_progress);
5128585484eSchristos 
5138585484eSchristos 	amount_unwritten = bev_a->write_in_progress - nbytes;
5148585484eSchristos 	evbuffer_commit_write_(bev->output, nbytes);
5158585484eSchristos 	bev_a->write_in_progress = 0;
5168585484eSchristos 
5178585484eSchristos 	if (amount_unwritten)
5188585484eSchristos 		bufferevent_decrement_write_buckets_(&bev_a->bev,
5198585484eSchristos 		                                     -amount_unwritten);
5208585484eSchristos 
5218585484eSchristos 
5228585484eSchristos 	if (!ok)
5238585484eSchristos 		bev_async_set_wsa_error(bev, eo);
5248585484eSchristos 
5258585484eSchristos 	if (bev_a->ok) {
5268585484eSchristos 		if (ok && nbytes) {
5278585484eSchristos 			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
528*eabc0478Schristos 			be_async_trigger_nolock(bev, EV_WRITE, 0);
5298585484eSchristos 			bev_async_consider_writing(bev_a);
5308585484eSchristos 		} else if (!ok) {
5318585484eSchristos 			what |= BEV_EVENT_ERROR;
5328585484eSchristos 			bev_a->ok = 0;
533*eabc0478Schristos 			be_async_run_eventcb(bev, what, 0);
5348585484eSchristos 		} else if (!nbytes) {
5358585484eSchristos 			what |= BEV_EVENT_EOF;
5368585484eSchristos 			bev_a->ok = 0;
537*eabc0478Schristos 			be_async_run_eventcb(bev, what, 0);
5388585484eSchristos 		}
5398585484eSchristos 	}
5408585484eSchristos 
5418585484eSchristos 	bufferevent_decref_and_unlock_(bev);
5428585484eSchristos }
5438585484eSchristos 
5448585484eSchristos struct bufferevent *
5458585484eSchristos bufferevent_async_new_(struct event_base *base,
5468585484eSchristos     evutil_socket_t fd, int options)
5478585484eSchristos {
5488585484eSchristos 	struct bufferevent_async *bev_a;
5498585484eSchristos 	struct bufferevent *bev;
5508585484eSchristos 	struct event_iocp_port *iocp;
5518585484eSchristos 
5528585484eSchristos 	options |= BEV_OPT_THREADSAFE;
5538585484eSchristos 
5548585484eSchristos 	if (!(iocp = event_base_get_iocp_(base)))
5558585484eSchristos 		return NULL;
5568585484eSchristos 
5578585484eSchristos 	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
558*eabc0478Schristos 		if (fatal_error(GetLastError()))
5598585484eSchristos 			return NULL;
5608585484eSchristos 	}
5618585484eSchristos 
5628585484eSchristos 	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
5638585484eSchristos 		return NULL;
5648585484eSchristos 
5658585484eSchristos 	bev = &bev_a->bev.bev;
5668585484eSchristos 	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
5678585484eSchristos 		mm_free(bev_a);
5688585484eSchristos 		return NULL;
5698585484eSchristos 	}
5708585484eSchristos 	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
5718585484eSchristos 		evbuffer_free(bev->input);
5728585484eSchristos 		mm_free(bev_a);
5738585484eSchristos 		return NULL;
5748585484eSchristos 	}
5758585484eSchristos 
5768585484eSchristos 	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
5778585484eSchristos 		options)<0)
5788585484eSchristos 		goto err;
5798585484eSchristos 
5808585484eSchristos 	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
5818585484eSchristos 	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
5828585484eSchristos 
5838585484eSchristos 	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
5848585484eSchristos 	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
5858585484eSchristos 	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
5868585484eSchristos 
5878585484eSchristos 	bufferevent_init_generic_timeout_cbs_(bev);
5888585484eSchristos 
5897476e6e4Schristos 	bev_a->ok = fd >= 0;
5907476e6e4Schristos 
5918585484eSchristos 	return bev;
5928585484eSchristos err:
5938585484eSchristos 	bufferevent_free(&bev_a->bev.bev);
5948585484eSchristos 	return NULL;
5958585484eSchristos }
5968585484eSchristos 
5978585484eSchristos void
5988585484eSchristos bufferevent_async_set_connected_(struct bufferevent *bev)
5998585484eSchristos {
6008585484eSchristos 	struct bufferevent_async *bev_async = upcast(bev);
6018585484eSchristos 	bev_async->ok = 1;
6028585484eSchristos 	/* Now's a good time to consider reading/writing */
6038585484eSchristos 	be_async_enable(bev, bev->enabled);
6048585484eSchristos }
6058585484eSchristos 
6068585484eSchristos int
6078585484eSchristos bufferevent_async_can_connect_(struct bufferevent *bev)
6088585484eSchristos {
6098585484eSchristos 	const struct win32_extension_fns *ext =
6108585484eSchristos 	    event_get_win32_extension_fns_();
6118585484eSchristos 
6128585484eSchristos 	if (BEV_IS_ASYNC(bev) &&
6138585484eSchristos 	    event_base_get_iocp_(bev->ev_base) &&
6148585484eSchristos 	    ext && ext->ConnectEx)
6158585484eSchristos 		return 1;
6168585484eSchristos 
6178585484eSchristos 	return 0;
6188585484eSchristos }
6198585484eSchristos 
6208585484eSchristos int
6218585484eSchristos bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
6228585484eSchristos 	const struct sockaddr *sa, int socklen)
6238585484eSchristos {
6248585484eSchristos 	BOOL rc;
6258585484eSchristos 	struct bufferevent_async *bev_async = upcast(bev);
6268585484eSchristos 	struct sockaddr_storage ss;
6278585484eSchristos 	const struct win32_extension_fns *ext =
6288585484eSchristos 	    event_get_win32_extension_fns_();
6298585484eSchristos 
6308585484eSchristos 	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
6318585484eSchristos 
6328585484eSchristos 	/* ConnectEx() requires that the socket be bound to an address
6338585484eSchristos 	 * with bind() before using, otherwise it will fail. We attempt
6348585484eSchristos 	 * to issue a bind() here, taking into account that the error
6358585484eSchristos 	 * code is set to WSAEINVAL when the socket is already bound. */
6368585484eSchristos 	memset(&ss, 0, sizeof(ss));
6378585484eSchristos 	if (sa->sa_family == AF_INET) {
6388585484eSchristos 		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
6398585484eSchristos 		sin->sin_family = AF_INET;
6408585484eSchristos 		sin->sin_addr.s_addr = INADDR_ANY;
6418585484eSchristos 	} else if (sa->sa_family == AF_INET6) {
6428585484eSchristos 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
6438585484eSchristos 		sin6->sin6_family = AF_INET6;
6448585484eSchristos 		sin6->sin6_addr = in6addr_any;
6458585484eSchristos 	} else {
6468585484eSchristos 		/* Well, the user will have to bind() */
6478585484eSchristos 		return -1;
6488585484eSchristos 	}
6498585484eSchristos 	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
6508585484eSchristos 	    WSAGetLastError() != WSAEINVAL)
6518585484eSchristos 		return -1;
6528585484eSchristos 
6538585484eSchristos 	event_base_add_virtual_(bev->ev_base);
6548585484eSchristos 	bufferevent_incref_(bev);
6558585484eSchristos 	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
6568585484eSchristos 			    &bev_async->connect_overlapped.overlapped);
6578585484eSchristos 	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
6588585484eSchristos 		return 0;
6598585484eSchristos 
6608585484eSchristos 	event_base_del_virtual_(bev->ev_base);
6618585484eSchristos 	bufferevent_decref_(bev);
6628585484eSchristos 
6638585484eSchristos 	return -1;
6648585484eSchristos }
6658585484eSchristos 
6668585484eSchristos static int
6678585484eSchristos be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
6688585484eSchristos     union bufferevent_ctrl_data *data)
6698585484eSchristos {
6708585484eSchristos 	switch (op) {
6718585484eSchristos 	case BEV_CTRL_GET_FD:
6728585484eSchristos 		data->fd = evbuffer_overlapped_get_fd_(bev->input);
6738585484eSchristos 		return 0;
6748585484eSchristos 	case BEV_CTRL_SET_FD: {
675*eabc0478Schristos 		struct bufferevent_async *bev_a = upcast(bev);
6768585484eSchristos 		struct event_iocp_port *iocp;
6778585484eSchristos 
6788585484eSchristos 		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
6798585484eSchristos 			return 0;
6808585484eSchristos 		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
6818585484eSchristos 			return -1;
682*eabc0478Schristos 		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
683*eabc0478Schristos 			if (fatal_error(GetLastError()))
6848585484eSchristos 				return -1;
685*eabc0478Schristos 		}
6868585484eSchristos 		evbuffer_overlapped_set_fd_(bev->input, data->fd);
6878585484eSchristos 		evbuffer_overlapped_set_fd_(bev->output, data->fd);
688*eabc0478Schristos 		bev_a->ok = data->fd >= 0;
6898585484eSchristos 		return 0;
6908585484eSchristos 	}
6918585484eSchristos 	case BEV_CTRL_CANCEL_ALL: {
6928585484eSchristos 		struct bufferevent_async *bev_a = upcast(bev);
6938585484eSchristos 		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
694*eabc0478Schristos 		if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
6958585484eSchristos 		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
6968585484eSchristos 			closesocket(fd);
697*eabc0478Schristos 			evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
6988585484eSchristos 		}
6998585484eSchristos 		bev_a->ok = 0;
7008585484eSchristos 		return 0;
7018585484eSchristos 	}
7028585484eSchristos 	case BEV_CTRL_GET_UNDERLYING:
7038585484eSchristos 	default:
7048585484eSchristos 		return -1;
7058585484eSchristos 	}
7068585484eSchristos }
7078585484eSchristos 
7088585484eSchristos 
709