xref: /netbsd-src/external/bsd/libevent/dist/bufferevent_async.c (revision 657871a79c9a2060a6255a242fa1a1ef76b56ec6)
1*657871a7Schristos /*	$NetBSD: bufferevent_async.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $	*/
26ecf6635Schristos /*
36ecf6635Schristos  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
46ecf6635Schristos  *
56ecf6635Schristos  * All rights reserved.
66ecf6635Schristos  *
76ecf6635Schristos  * Redistribution and use in source and binary forms, with or without
86ecf6635Schristos  * modification, are permitted provided that the following conditions
96ecf6635Schristos  * are met:
106ecf6635Schristos  * 1. Redistributions of source code must retain the above copyright
116ecf6635Schristos  *    notice, this list of conditions and the following disclaimer.
126ecf6635Schristos  * 2. Redistributions in binary form must reproduce the above copyright
136ecf6635Schristos  *    notice, this list of conditions and the following disclaimer in the
146ecf6635Schristos  *    documentation and/or other materials provided with the distribution.
156ecf6635Schristos  * 3. The name of the author may not be used to endorse or promote products
166ecf6635Schristos  *    derived from this software without specific prior written permission.
176ecf6635Schristos  *
186ecf6635Schristos  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
196ecf6635Schristos  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
206ecf6635Schristos  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
216ecf6635Schristos  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
226ecf6635Schristos  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
236ecf6635Schristos  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
246ecf6635Schristos  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
256ecf6635Schristos  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
266ecf6635Schristos  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
276ecf6635Schristos  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
286ecf6635Schristos  */
296ecf6635Schristos 
306ecf6635Schristos #include "event2/event-config.h"
316ecf6635Schristos #include <sys/cdefs.h>
32*657871a7Schristos __RCSID("$NetBSD: bufferevent_async.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $");
33805a1ce9Schristos #include "evconfig-private.h"
346ecf6635Schristos 
35805a1ce9Schristos #ifdef EVENT__HAVE_SYS_TIME_H
366ecf6635Schristos #include <sys/time.h>
376ecf6635Schristos #endif
386ecf6635Schristos 
396ecf6635Schristos #include <errno.h>
406ecf6635Schristos #include <stdio.h>
416ecf6635Schristos #include <stdlib.h>
426ecf6635Schristos #include <string.h>
43805a1ce9Schristos #ifdef EVENT__HAVE_STDARG_H
446ecf6635Schristos #include <stdarg.h>
456ecf6635Schristos #endif
46805a1ce9Schristos #ifdef EVENT__HAVE_UNISTD_H
476ecf6635Schristos #include <unistd.h>
486ecf6635Schristos #endif
496ecf6635Schristos 
50805a1ce9Schristos #ifdef _WIN32
516ecf6635Schristos #include <winsock2.h>
52*657871a7Schristos #include <winerror.h>
536ecf6635Schristos #include <ws2tcpip.h>
546ecf6635Schristos #endif
556ecf6635Schristos 
566ecf6635Schristos #include <sys/queue.h>
576ecf6635Schristos 
586ecf6635Schristos #include "event2/util.h"
596ecf6635Schristos #include "event2/bufferevent.h"
606ecf6635Schristos #include "event2/buffer.h"
616ecf6635Schristos #include "event2/bufferevent_struct.h"
626ecf6635Schristos #include "event2/event.h"
636ecf6635Schristos #include "event2/util.h"
646ecf6635Schristos #include "event-internal.h"
656ecf6635Schristos #include "log-internal.h"
666ecf6635Schristos #include "mm-internal.h"
676ecf6635Schristos #include "bufferevent-internal.h"
686ecf6635Schristos #include "util-internal.h"
696ecf6635Schristos #include "iocp-internal.h"
706ecf6635Schristos 
716ecf6635Schristos #ifndef SO_UPDATE_CONNECT_CONTEXT
726ecf6635Schristos /* Mingw is sometimes missing this */
736ecf6635Schristos #define SO_UPDATE_CONNECT_CONTEXT 0x7010
746ecf6635Schristos #endif
756ecf6635Schristos 
766ecf6635Schristos /* prototypes */
776ecf6635Schristos static int be_async_enable(struct bufferevent *, short);
786ecf6635Schristos static int be_async_disable(struct bufferevent *, short);
796ecf6635Schristos static void be_async_destruct(struct bufferevent *);
806ecf6635Schristos static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
816ecf6635Schristos static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
826ecf6635Schristos 
836ecf6635Schristos struct bufferevent_async {
846ecf6635Schristos 	struct bufferevent_private bev;
856ecf6635Schristos 	struct event_overlapped connect_overlapped;
866ecf6635Schristos 	struct event_overlapped read_overlapped;
876ecf6635Schristos 	struct event_overlapped write_overlapped;
886ecf6635Schristos 	size_t read_in_progress;
896ecf6635Schristos 	size_t write_in_progress;
906ecf6635Schristos 	unsigned ok : 1;
916ecf6635Schristos 	unsigned read_added : 1;
926ecf6635Schristos 	unsigned write_added : 1;
936ecf6635Schristos };
946ecf6635Schristos 
956ecf6635Schristos const struct bufferevent_ops bufferevent_ops_async = {
966ecf6635Schristos 	"socket_async",
976ecf6635Schristos 	evutil_offsetof(struct bufferevent_async, bev.bev),
986ecf6635Schristos 	be_async_enable,
996ecf6635Schristos 	be_async_disable,
100805a1ce9Schristos 	NULL, /* Unlink */
1016ecf6635Schristos 	be_async_destruct,
102805a1ce9Schristos 	bufferevent_generic_adj_timeouts_,
1036ecf6635Schristos 	be_async_flush,
1046ecf6635Schristos 	be_async_ctrl,
1056ecf6635Schristos };
1066ecf6635Schristos 
107*657871a7Schristos static inline void
be_async_run_eventcb(struct bufferevent * bev,short what,int options)108*657871a7Schristos be_async_run_eventcb(struct bufferevent *bev, short what, int options)
109*657871a7Schristos { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
110*657871a7Schristos 
111*657871a7Schristos static inline void
be_async_trigger_nolock(struct bufferevent * bev,short what,int options)112*657871a7Schristos be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
113*657871a7Schristos { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
114*657871a7Schristos 
115*657871a7Schristos static inline int
fatal_error(int err)116*657871a7Schristos fatal_error(int err)
117*657871a7Schristos {
118*657871a7Schristos 	switch (err) {
119*657871a7Schristos 		/* We may have already associated this fd with a port.
120*657871a7Schristos 		 * Let's hope it's this port, and that the error code
121*657871a7Schristos 		 * for doing this neer changes. */
122*657871a7Schristos 		case ERROR_INVALID_PARAMETER:
123*657871a7Schristos 			return 0;
124*657871a7Schristos 	}
125*657871a7Schristos 	return 1;
126*657871a7Schristos }
127*657871a7Schristos 
1286ecf6635Schristos static inline struct bufferevent_async *
upcast(struct bufferevent * bev)1296ecf6635Schristos upcast(struct bufferevent *bev)
1306ecf6635Schristos {
1316ecf6635Schristos 	struct bufferevent_async *bev_a;
132*657871a7Schristos 	if (!BEV_IS_ASYNC(bev))
1336ecf6635Schristos 		return NULL;
1346ecf6635Schristos 	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
1356ecf6635Schristos 	return bev_a;
1366ecf6635Schristos }
1376ecf6635Schristos 
1386ecf6635Schristos static inline struct bufferevent_async *
upcast_connect(struct event_overlapped * eo)1396ecf6635Schristos upcast_connect(struct event_overlapped *eo)
1406ecf6635Schristos {
1416ecf6635Schristos 	struct bufferevent_async *bev_a;
1426ecf6635Schristos 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
1436ecf6635Schristos 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1446ecf6635Schristos 	return bev_a;
1456ecf6635Schristos }
1466ecf6635Schristos 
1476ecf6635Schristos static inline struct bufferevent_async *
upcast_read(struct event_overlapped * eo)1486ecf6635Schristos upcast_read(struct event_overlapped *eo)
1496ecf6635Schristos {
1506ecf6635Schristos 	struct bufferevent_async *bev_a;
1516ecf6635Schristos 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
1526ecf6635Schristos 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1536ecf6635Schristos 	return bev_a;
1546ecf6635Schristos }
1556ecf6635Schristos 
1566ecf6635Schristos static inline struct bufferevent_async *
upcast_write(struct event_overlapped * eo)1576ecf6635Schristos upcast_write(struct event_overlapped *eo)
1586ecf6635Schristos {
1596ecf6635Schristos 	struct bufferevent_async *bev_a;
1606ecf6635Schristos 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
1616ecf6635Schristos 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1626ecf6635Schristos 	return bev_a;
1636ecf6635Schristos }
1646ecf6635Schristos 
1656ecf6635Schristos static void
bev_async_del_write(struct bufferevent_async * beva)1666ecf6635Schristos bev_async_del_write(struct bufferevent_async *beva)
1676ecf6635Schristos {
1686ecf6635Schristos 	struct bufferevent *bev = &beva->bev.bev;
1696ecf6635Schristos 
1706ecf6635Schristos 	if (beva->write_added) {
1716ecf6635Schristos 		beva->write_added = 0;
172805a1ce9Schristos 		event_base_del_virtual_(bev->ev_base);
1736ecf6635Schristos 	}
1746ecf6635Schristos }
1756ecf6635Schristos 
1766ecf6635Schristos static void
bev_async_del_read(struct bufferevent_async * beva)1776ecf6635Schristos bev_async_del_read(struct bufferevent_async *beva)
1786ecf6635Schristos {
1796ecf6635Schristos 	struct bufferevent *bev = &beva->bev.bev;
1806ecf6635Schristos 
1816ecf6635Schristos 	if (beva->read_added) {
1826ecf6635Schristos 		beva->read_added = 0;
183805a1ce9Schristos 		event_base_del_virtual_(bev->ev_base);
1846ecf6635Schristos 	}
1856ecf6635Schristos }
1866ecf6635Schristos 
1876ecf6635Schristos static void
bev_async_add_write(struct bufferevent_async * beva)1886ecf6635Schristos bev_async_add_write(struct bufferevent_async *beva)
1896ecf6635Schristos {
1906ecf6635Schristos 	struct bufferevent *bev = &beva->bev.bev;
1916ecf6635Schristos 
1926ecf6635Schristos 	if (!beva->write_added) {
1936ecf6635Schristos 		beva->write_added = 1;
194805a1ce9Schristos 		event_base_add_virtual_(bev->ev_base);
1956ecf6635Schristos 	}
1966ecf6635Schristos }
1976ecf6635Schristos 
1986ecf6635Schristos static void
bev_async_add_read(struct bufferevent_async * beva)1996ecf6635Schristos bev_async_add_read(struct bufferevent_async *beva)
2006ecf6635Schristos {
2016ecf6635Schristos 	struct bufferevent *bev = &beva->bev.bev;
2026ecf6635Schristos 
2036ecf6635Schristos 	if (!beva->read_added) {
2046ecf6635Schristos 		beva->read_added = 1;
205805a1ce9Schristos 		event_base_add_virtual_(bev->ev_base);
2066ecf6635Schristos 	}
2076ecf6635Schristos }
2086ecf6635Schristos 
2096ecf6635Schristos static void
bev_async_consider_writing(struct bufferevent_async * beva)2106ecf6635Schristos bev_async_consider_writing(struct bufferevent_async *beva)
2116ecf6635Schristos {
2126ecf6635Schristos 	size_t at_most;
2136ecf6635Schristos 	int limit;
2146ecf6635Schristos 	struct bufferevent *bev = &beva->bev.bev;
2156ecf6635Schristos 
2166ecf6635Schristos 	/* Don't write if there's a write in progress, or we do not
2176ecf6635Schristos 	 * want to write, or when there's nothing left to write. */
2186ecf6635Schristos 	if (beva->write_in_progress || beva->bev.connecting)
2196ecf6635Schristos 		return;
2206ecf6635Schristos 	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
2216ecf6635Schristos 	    !evbuffer_get_length(bev->output)) {
2226ecf6635Schristos 		bev_async_del_write(beva);
2236ecf6635Schristos 		return;
2246ecf6635Schristos 	}
2256ecf6635Schristos 
2266ecf6635Schristos 	at_most = evbuffer_get_length(bev->output);
2276ecf6635Schristos 
2286ecf6635Schristos 	/* This is safe so long as bufferevent_get_write_max never returns
2296ecf6635Schristos 	 * more than INT_MAX.  That's true for now. XXXX */
230805a1ce9Schristos 	limit = (int)bufferevent_get_write_max_(&beva->bev);
2316ecf6635Schristos 	if (at_most >= (size_t)limit && limit >= 0)
2326ecf6635Schristos 		at_most = limit;
2336ecf6635Schristos 
2346ecf6635Schristos 	if (beva->bev.write_suspended) {
2356ecf6635Schristos 		bev_async_del_write(beva);
2366ecf6635Schristos 		return;
2376ecf6635Schristos 	}
2386ecf6635Schristos 
2396ecf6635Schristos 	/*  XXXX doesn't respect low-water mark very well. */
240805a1ce9Schristos 	bufferevent_incref_(bev);
241805a1ce9Schristos 	if (evbuffer_launch_write_(bev->output, at_most,
2426ecf6635Schristos 	    &beva->write_overlapped)) {
243805a1ce9Schristos 		bufferevent_decref_(bev);
2446ecf6635Schristos 		beva->ok = 0;
245*657871a7Schristos 		be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
2466ecf6635Schristos 	} else {
2476ecf6635Schristos 		beva->write_in_progress = at_most;
248805a1ce9Schristos 		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
2496ecf6635Schristos 		bev_async_add_write(beva);
2506ecf6635Schristos 	}
2516ecf6635Schristos }
2526ecf6635Schristos 
2536ecf6635Schristos static void
bev_async_consider_reading(struct bufferevent_async * beva)2546ecf6635Schristos bev_async_consider_reading(struct bufferevent_async *beva)
2556ecf6635Schristos {
2566ecf6635Schristos 	size_t cur_size;
2576ecf6635Schristos 	size_t read_high;
2586ecf6635Schristos 	size_t at_most;
2596ecf6635Schristos 	int limit;
2606ecf6635Schristos 	struct bufferevent *bev = &beva->bev.bev;
2616ecf6635Schristos 
2626ecf6635Schristos 	/* Don't read if there is a read in progress, or we do not
2636ecf6635Schristos 	 * want to read. */
2646ecf6635Schristos 	if (beva->read_in_progress || beva->bev.connecting)
2656ecf6635Schristos 		return;
2666ecf6635Schristos 	if (!beva->ok || !(bev->enabled&EV_READ)) {
2676ecf6635Schristos 		bev_async_del_read(beva);
2686ecf6635Schristos 		return;
2696ecf6635Schristos 	}
2706ecf6635Schristos 
2716ecf6635Schristos 	/* Don't read if we're full */
2726ecf6635Schristos 	cur_size = evbuffer_get_length(bev->input);
2736ecf6635Schristos 	read_high = bev->wm_read.high;
2746ecf6635Schristos 	if (read_high) {
2756ecf6635Schristos 		if (cur_size >= read_high) {
2766ecf6635Schristos 			bev_async_del_read(beva);
2776ecf6635Schristos 			return;
2786ecf6635Schristos 		}
2796ecf6635Schristos 		at_most = read_high - cur_size;
2806ecf6635Schristos 	} else {
2816ecf6635Schristos 		at_most = 16384; /* FIXME totally magic. */
2826ecf6635Schristos 	}
2836ecf6635Schristos 
2846ecf6635Schristos 	/* XXXX This over-commits. */
285805a1ce9Schristos 	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
286805a1ce9Schristos 	limit = (int)bufferevent_get_read_max_(&beva->bev);
2876ecf6635Schristos 	if (at_most >= (size_t)limit && limit >= 0)
2886ecf6635Schristos 		at_most = limit;
2896ecf6635Schristos 
2906ecf6635Schristos 	if (beva->bev.read_suspended) {
2916ecf6635Schristos 		bev_async_del_read(beva);
2926ecf6635Schristos 		return;
2936ecf6635Schristos 	}
2946ecf6635Schristos 
295805a1ce9Schristos 	bufferevent_incref_(bev);
296805a1ce9Schristos 	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
2976ecf6635Schristos 		beva->ok = 0;
298*657871a7Schristos 		be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
299805a1ce9Schristos 		bufferevent_decref_(bev);
3006ecf6635Schristos 	} else {
3016ecf6635Schristos 		beva->read_in_progress = at_most;
302805a1ce9Schristos 		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
3036ecf6635Schristos 		bev_async_add_read(beva);
3046ecf6635Schristos 	}
3056ecf6635Schristos 
3066ecf6635Schristos 	return;
3076ecf6635Schristos }
3086ecf6635Schristos 
3096ecf6635Schristos static void
be_async_outbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)3106ecf6635Schristos be_async_outbuf_callback(struct evbuffer *buf,
3116ecf6635Schristos     const struct evbuffer_cb_info *cbinfo,
3126ecf6635Schristos     void *arg)
3136ecf6635Schristos {
3146ecf6635Schristos 	struct bufferevent *bev = arg;
3156ecf6635Schristos 	struct bufferevent_async *bev_async = upcast(bev);
3166ecf6635Schristos 
3176ecf6635Schristos 	/* If we added data to the outbuf and were not writing before,
3186ecf6635Schristos 	 * we may want to write now. */
3196ecf6635Schristos 
320805a1ce9Schristos 	bufferevent_incref_and_lock_(bev);
3216ecf6635Schristos 
3226ecf6635Schristos 	if (cbinfo->n_added)
3236ecf6635Schristos 		bev_async_consider_writing(bev_async);
3246ecf6635Schristos 
325805a1ce9Schristos 	bufferevent_decref_and_unlock_(bev);
3266ecf6635Schristos }
3276ecf6635Schristos 
3286ecf6635Schristos static void
be_async_inbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)3296ecf6635Schristos be_async_inbuf_callback(struct evbuffer *buf,
3306ecf6635Schristos     const struct evbuffer_cb_info *cbinfo,
3316ecf6635Schristos     void *arg)
3326ecf6635Schristos {
3336ecf6635Schristos 	struct bufferevent *bev = arg;
3346ecf6635Schristos 	struct bufferevent_async *bev_async = upcast(bev);
3356ecf6635Schristos 
3366ecf6635Schristos 	/* If we drained data from the inbuf and were not reading before,
3376ecf6635Schristos 	 * we may want to read now */
3386ecf6635Schristos 
339805a1ce9Schristos 	bufferevent_incref_and_lock_(bev);
3406ecf6635Schristos 
3416ecf6635Schristos 	if (cbinfo->n_deleted)
3426ecf6635Schristos 		bev_async_consider_reading(bev_async);
3436ecf6635Schristos 
344805a1ce9Schristos 	bufferevent_decref_and_unlock_(bev);
3456ecf6635Schristos }
3466ecf6635Schristos 
3476ecf6635Schristos static int
be_async_enable(struct bufferevent * buf,short what)3486ecf6635Schristos be_async_enable(struct bufferevent *buf, short what)
3496ecf6635Schristos {
3506ecf6635Schristos 	struct bufferevent_async *bev_async = upcast(buf);
3516ecf6635Schristos 
3526ecf6635Schristos 	if (!bev_async->ok)
3536ecf6635Schristos 		return -1;
3546ecf6635Schristos 
3556ecf6635Schristos 	if (bev_async->bev.connecting) {
3566ecf6635Schristos 		/* Don't launch anything during connection attempts. */
3576ecf6635Schristos 		return 0;
3586ecf6635Schristos 	}
3596ecf6635Schristos 
3606ecf6635Schristos 	if (what & EV_READ)
3616ecf6635Schristos 		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
3626ecf6635Schristos 	if (what & EV_WRITE)
3636ecf6635Schristos 		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
3646ecf6635Schristos 
3656ecf6635Schristos 	/* If we newly enable reading or writing, and we aren't reading or
3666ecf6635Schristos 	   writing already, consider launching a new read or write. */
3676ecf6635Schristos 
3686ecf6635Schristos 	if (what & EV_READ)
3696ecf6635Schristos 		bev_async_consider_reading(bev_async);
3706ecf6635Schristos 	if (what & EV_WRITE)
3716ecf6635Schristos 		bev_async_consider_writing(bev_async);
3726ecf6635Schristos 	return 0;
3736ecf6635Schristos }
3746ecf6635Schristos 
3756ecf6635Schristos static int
be_async_disable(struct bufferevent * bev,short what)3766ecf6635Schristos be_async_disable(struct bufferevent *bev, short what)
3776ecf6635Schristos {
3786ecf6635Schristos 	struct bufferevent_async *bev_async = upcast(bev);
3796ecf6635Schristos 	/* XXXX If we disable reading or writing, we may want to consider
3806ecf6635Schristos 	 * canceling any in-progress read or write operation, though it might
3816ecf6635Schristos 	 * not work. */
3826ecf6635Schristos 
3836ecf6635Schristos 	if (what & EV_READ) {
3846ecf6635Schristos 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
3856ecf6635Schristos 		bev_async_del_read(bev_async);
3866ecf6635Schristos 	}
3876ecf6635Schristos 	if (what & EV_WRITE) {
3886ecf6635Schristos 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
3896ecf6635Schristos 		bev_async_del_write(bev_async);
3906ecf6635Schristos 	}
3916ecf6635Schristos 
3926ecf6635Schristos 	return 0;
3936ecf6635Schristos }
3946ecf6635Schristos 
3956ecf6635Schristos static void
be_async_destruct(struct bufferevent * bev)3966ecf6635Schristos be_async_destruct(struct bufferevent *bev)
3976ecf6635Schristos {
3986ecf6635Schristos 	struct bufferevent_async *bev_async = upcast(bev);
3996ecf6635Schristos 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
4006ecf6635Schristos 	evutil_socket_t fd;
4016ecf6635Schristos 
4026ecf6635Schristos 	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
4036ecf6635Schristos 			!upcast(bev)->read_in_progress);
4046ecf6635Schristos 
4056ecf6635Schristos 	bev_async_del_read(bev_async);
4066ecf6635Schristos 	bev_async_del_write(bev_async);
4076ecf6635Schristos 
408805a1ce9Schristos 	fd = evbuffer_overlapped_get_fd_(bev->input);
409*657871a7Schristos 	if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
410805a1ce9Schristos 		(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
4116ecf6635Schristos 		evutil_closesocket(fd);
412*657871a7Schristos 		evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
4136ecf6635Schristos 	}
4146ecf6635Schristos }
4156ecf6635Schristos 
4166ecf6635Schristos /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
4176ecf6635Schristos  * we use WSAGetOverlappedResult to translate. */
4186ecf6635Schristos static void
bev_async_set_wsa_error(struct bufferevent * bev,struct event_overlapped * eo)4196ecf6635Schristos bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
4206ecf6635Schristos {
4216ecf6635Schristos 	DWORD bytes, flags;
4226ecf6635Schristos 	evutil_socket_t fd;
4236ecf6635Schristos 
424805a1ce9Schristos 	fd = evbuffer_overlapped_get_fd_(bev->input);
4256ecf6635Schristos 	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
4266ecf6635Schristos }
4276ecf6635Schristos 
4286ecf6635Schristos static int
be_async_flush(struct bufferevent * bev,short what,enum bufferevent_flush_mode mode)4296ecf6635Schristos be_async_flush(struct bufferevent *bev, short what,
4306ecf6635Schristos     enum bufferevent_flush_mode mode)
4316ecf6635Schristos {
4326ecf6635Schristos 	return 0;
4336ecf6635Schristos }
4346ecf6635Schristos 
4356ecf6635Schristos static void
connect_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)4366ecf6635Schristos connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
4376ecf6635Schristos     ev_ssize_t nbytes, int ok)
4386ecf6635Schristos {
4396ecf6635Schristos 	struct bufferevent_async *bev_a = upcast_connect(eo);
4406ecf6635Schristos 	struct bufferevent *bev = &bev_a->bev.bev;
4416ecf6635Schristos 	evutil_socket_t sock;
4426ecf6635Schristos 
4436ecf6635Schristos 	BEV_LOCK(bev);
4446ecf6635Schristos 
4456ecf6635Schristos 	EVUTIL_ASSERT(bev_a->bev.connecting);
4466ecf6635Schristos 	bev_a->bev.connecting = 0;
447805a1ce9Schristos 	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
4486ecf6635Schristos 	/* XXXX Handle error? */
4496ecf6635Schristos 	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
4506ecf6635Schristos 
4516ecf6635Schristos 	if (ok)
452805a1ce9Schristos 		bufferevent_async_set_connected_(bev);
4536ecf6635Schristos 	else
4546ecf6635Schristos 		bev_async_set_wsa_error(bev, eo);
4556ecf6635Schristos 
456*657871a7Schristos 	be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
4576ecf6635Schristos 
458805a1ce9Schristos 	event_base_del_virtual_(bev->ev_base);
4596ecf6635Schristos 
460805a1ce9Schristos 	bufferevent_decref_and_unlock_(bev);
4616ecf6635Schristos }
4626ecf6635Schristos 
4636ecf6635Schristos static void
read_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)4646ecf6635Schristos read_complete(struct event_overlapped *eo, ev_uintptr_t key,
4656ecf6635Schristos     ev_ssize_t nbytes, int ok)
4666ecf6635Schristos {
4676ecf6635Schristos 	struct bufferevent_async *bev_a = upcast_read(eo);
4686ecf6635Schristos 	struct bufferevent *bev = &bev_a->bev.bev;
4696ecf6635Schristos 	short what = BEV_EVENT_READING;
4706ecf6635Schristos 	ev_ssize_t amount_unread;
4716ecf6635Schristos 	BEV_LOCK(bev);
4726ecf6635Schristos 	EVUTIL_ASSERT(bev_a->read_in_progress);
4736ecf6635Schristos 
4746ecf6635Schristos 	amount_unread = bev_a->read_in_progress - nbytes;
475805a1ce9Schristos 	evbuffer_commit_read_(bev->input, nbytes);
4766ecf6635Schristos 	bev_a->read_in_progress = 0;
4776ecf6635Schristos 	if (amount_unread)
478805a1ce9Schristos 		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
4796ecf6635Schristos 
4806ecf6635Schristos 	if (!ok)
4816ecf6635Schristos 		bev_async_set_wsa_error(bev, eo);
4826ecf6635Schristos 
4836ecf6635Schristos 	if (bev_a->ok) {
4846ecf6635Schristos 		if (ok && nbytes) {
4856ecf6635Schristos 			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
486*657871a7Schristos 			be_async_trigger_nolock(bev, EV_READ, 0);
4876ecf6635Schristos 			bev_async_consider_reading(bev_a);
4886ecf6635Schristos 		} else if (!ok) {
4896ecf6635Schristos 			what |= BEV_EVENT_ERROR;
4906ecf6635Schristos 			bev_a->ok = 0;
491*657871a7Schristos 			be_async_run_eventcb(bev, what, 0);
4926ecf6635Schristos 		} else if (!nbytes) {
4936ecf6635Schristos 			what |= BEV_EVENT_EOF;
4946ecf6635Schristos 			bev_a->ok = 0;
495*657871a7Schristos 			be_async_run_eventcb(bev, what, 0);
4966ecf6635Schristos 		}
4976ecf6635Schristos 	}
4986ecf6635Schristos 
499805a1ce9Schristos 	bufferevent_decref_and_unlock_(bev);
5006ecf6635Schristos }
5016ecf6635Schristos 
5026ecf6635Schristos static void
write_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)5036ecf6635Schristos write_complete(struct event_overlapped *eo, ev_uintptr_t key,
5046ecf6635Schristos     ev_ssize_t nbytes, int ok)
5056ecf6635Schristos {
5066ecf6635Schristos 	struct bufferevent_async *bev_a = upcast_write(eo);
5076ecf6635Schristos 	struct bufferevent *bev = &bev_a->bev.bev;
5086ecf6635Schristos 	short what = BEV_EVENT_WRITING;
5096ecf6635Schristos 	ev_ssize_t amount_unwritten;
5106ecf6635Schristos 
5116ecf6635Schristos 	BEV_LOCK(bev);
5126ecf6635Schristos 	EVUTIL_ASSERT(bev_a->write_in_progress);
5136ecf6635Schristos 
5146ecf6635Schristos 	amount_unwritten = bev_a->write_in_progress - nbytes;
515805a1ce9Schristos 	evbuffer_commit_write_(bev->output, nbytes);
5166ecf6635Schristos 	bev_a->write_in_progress = 0;
5176ecf6635Schristos 
5186ecf6635Schristos 	if (amount_unwritten)
519805a1ce9Schristos 		bufferevent_decrement_write_buckets_(&bev_a->bev,
5206ecf6635Schristos 		                                     -amount_unwritten);
5216ecf6635Schristos 
5226ecf6635Schristos 
5236ecf6635Schristos 	if (!ok)
5246ecf6635Schristos 		bev_async_set_wsa_error(bev, eo);
5256ecf6635Schristos 
5266ecf6635Schristos 	if (bev_a->ok) {
5276ecf6635Schristos 		if (ok && nbytes) {
5286ecf6635Schristos 			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
529*657871a7Schristos 			be_async_trigger_nolock(bev, EV_WRITE, 0);
5306ecf6635Schristos 			bev_async_consider_writing(bev_a);
5316ecf6635Schristos 		} else if (!ok) {
5326ecf6635Schristos 			what |= BEV_EVENT_ERROR;
5336ecf6635Schristos 			bev_a->ok = 0;
534*657871a7Schristos 			be_async_run_eventcb(bev, what, 0);
5356ecf6635Schristos 		} else if (!nbytes) {
5366ecf6635Schristos 			what |= BEV_EVENT_EOF;
5376ecf6635Schristos 			bev_a->ok = 0;
538*657871a7Schristos 			be_async_run_eventcb(bev, what, 0);
5396ecf6635Schristos 		}
5406ecf6635Schristos 	}
5416ecf6635Schristos 
542805a1ce9Schristos 	bufferevent_decref_and_unlock_(bev);
5436ecf6635Schristos }
5446ecf6635Schristos 
5456ecf6635Schristos struct bufferevent *
bufferevent_async_new_(struct event_base * base,evutil_socket_t fd,int options)546805a1ce9Schristos bufferevent_async_new_(struct event_base *base,
5476ecf6635Schristos     evutil_socket_t fd, int options)
5486ecf6635Schristos {
5496ecf6635Schristos 	struct bufferevent_async *bev_a;
5506ecf6635Schristos 	struct bufferevent *bev;
5516ecf6635Schristos 	struct event_iocp_port *iocp;
5526ecf6635Schristos 
5536ecf6635Schristos 	options |= BEV_OPT_THREADSAFE;
5546ecf6635Schristos 
555805a1ce9Schristos 	if (!(iocp = event_base_get_iocp_(base)))
5566ecf6635Schristos 		return NULL;
5576ecf6635Schristos 
558805a1ce9Schristos 	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
559*657871a7Schristos 		if (fatal_error(GetLastError()))
5606ecf6635Schristos 			return NULL;
5616ecf6635Schristos 	}
5626ecf6635Schristos 
5636ecf6635Schristos 	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
5646ecf6635Schristos 		return NULL;
5656ecf6635Schristos 
5666ecf6635Schristos 	bev = &bev_a->bev.bev;
567805a1ce9Schristos 	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
5686ecf6635Schristos 		mm_free(bev_a);
5696ecf6635Schristos 		return NULL;
5706ecf6635Schristos 	}
571805a1ce9Schristos 	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
5726ecf6635Schristos 		evbuffer_free(bev->input);
5736ecf6635Schristos 		mm_free(bev_a);
5746ecf6635Schristos 		return NULL;
5756ecf6635Schristos 	}
5766ecf6635Schristos 
577805a1ce9Schristos 	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
5786ecf6635Schristos 		options)<0)
5796ecf6635Schristos 		goto err;
5806ecf6635Schristos 
5816ecf6635Schristos 	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
5826ecf6635Schristos 	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
5836ecf6635Schristos 
584805a1ce9Schristos 	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
585805a1ce9Schristos 	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
586805a1ce9Schristos 	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
587805a1ce9Schristos 
588805a1ce9Schristos 	bufferevent_init_generic_timeout_cbs_(bev);
5896ecf6635Schristos 
5906ecf6635Schristos 	bev_a->ok = fd >= 0;
5916ecf6635Schristos 
5926ecf6635Schristos 	return bev;
5936ecf6635Schristos err:
5946ecf6635Schristos 	bufferevent_free(&bev_a->bev.bev);
5956ecf6635Schristos 	return NULL;
5966ecf6635Schristos }
5976ecf6635Schristos 
5986ecf6635Schristos void
bufferevent_async_set_connected_(struct bufferevent * bev)599805a1ce9Schristos bufferevent_async_set_connected_(struct bufferevent *bev)
6006ecf6635Schristos {
6016ecf6635Schristos 	struct bufferevent_async *bev_async = upcast(bev);
6026ecf6635Schristos 	bev_async->ok = 1;
6036ecf6635Schristos 	/* Now's a good time to consider reading/writing */
6046ecf6635Schristos 	be_async_enable(bev, bev->enabled);
6056ecf6635Schristos }
6066ecf6635Schristos 
6076ecf6635Schristos int
bufferevent_async_can_connect_(struct bufferevent * bev)608805a1ce9Schristos bufferevent_async_can_connect_(struct bufferevent *bev)
6096ecf6635Schristos {
6106ecf6635Schristos 	const struct win32_extension_fns *ext =
611805a1ce9Schristos 	    event_get_win32_extension_fns_();
6126ecf6635Schristos 
6136ecf6635Schristos 	if (BEV_IS_ASYNC(bev) &&
614805a1ce9Schristos 	    event_base_get_iocp_(bev->ev_base) &&
6156ecf6635Schristos 	    ext && ext->ConnectEx)
6166ecf6635Schristos 		return 1;
6176ecf6635Schristos 
6186ecf6635Schristos 	return 0;
6196ecf6635Schristos }
6206ecf6635Schristos 
6216ecf6635Schristos int
bufferevent_async_connect_(struct bufferevent * bev,evutil_socket_t fd,const struct sockaddr * sa,int socklen)622805a1ce9Schristos bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
6236ecf6635Schristos 	const struct sockaddr *sa, int socklen)
6246ecf6635Schristos {
6256ecf6635Schristos 	BOOL rc;
6266ecf6635Schristos 	struct bufferevent_async *bev_async = upcast(bev);
6276ecf6635Schristos 	struct sockaddr_storage ss;
6286ecf6635Schristos 	const struct win32_extension_fns *ext =
629805a1ce9Schristos 	    event_get_win32_extension_fns_();
6306ecf6635Schristos 
6316ecf6635Schristos 	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
6326ecf6635Schristos 
6336ecf6635Schristos 	/* ConnectEx() requires that the socket be bound to an address
6346ecf6635Schristos 	 * with bind() before using, otherwise it will fail. We attempt
6356ecf6635Schristos 	 * to issue a bind() here, taking into account that the error
6366ecf6635Schristos 	 * code is set to WSAEINVAL when the socket is already bound. */
6376ecf6635Schristos 	memset(&ss, 0, sizeof(ss));
6386ecf6635Schristos 	if (sa->sa_family == AF_INET) {
6396ecf6635Schristos 		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
6406ecf6635Schristos 		sin->sin_family = AF_INET;
6416ecf6635Schristos 		sin->sin_addr.s_addr = INADDR_ANY;
6426ecf6635Schristos 	} else if (sa->sa_family == AF_INET6) {
6436ecf6635Schristos 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
6446ecf6635Schristos 		sin6->sin6_family = AF_INET6;
6456ecf6635Schristos 		sin6->sin6_addr = in6addr_any;
6466ecf6635Schristos 	} else {
6476ecf6635Schristos 		/* Well, the user will have to bind() */
6486ecf6635Schristos 		return -1;
6496ecf6635Schristos 	}
6506ecf6635Schristos 	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
6516ecf6635Schristos 	    WSAGetLastError() != WSAEINVAL)
6526ecf6635Schristos 		return -1;
6536ecf6635Schristos 
654805a1ce9Schristos 	event_base_add_virtual_(bev->ev_base);
655805a1ce9Schristos 	bufferevent_incref_(bev);
6566ecf6635Schristos 	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
6576ecf6635Schristos 			    &bev_async->connect_overlapped.overlapped);
6586ecf6635Schristos 	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
6596ecf6635Schristos 		return 0;
6606ecf6635Schristos 
661805a1ce9Schristos 	event_base_del_virtual_(bev->ev_base);
662805a1ce9Schristos 	bufferevent_decref_(bev);
6636ecf6635Schristos 
6646ecf6635Schristos 	return -1;
6656ecf6635Schristos }
6666ecf6635Schristos 
6676ecf6635Schristos static int
be_async_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)6686ecf6635Schristos be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
6696ecf6635Schristos     union bufferevent_ctrl_data *data)
6706ecf6635Schristos {
6716ecf6635Schristos 	switch (op) {
6726ecf6635Schristos 	case BEV_CTRL_GET_FD:
673805a1ce9Schristos 		data->fd = evbuffer_overlapped_get_fd_(bev->input);
6746ecf6635Schristos 		return 0;
6756ecf6635Schristos 	case BEV_CTRL_SET_FD: {
676*657871a7Schristos 		struct bufferevent_async *bev_a = upcast(bev);
6776ecf6635Schristos 		struct event_iocp_port *iocp;
6786ecf6635Schristos 
679805a1ce9Schristos 		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
6806ecf6635Schristos 			return 0;
681805a1ce9Schristos 		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
6826ecf6635Schristos 			return -1;
683*657871a7Schristos 		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
684*657871a7Schristos 			if (fatal_error(GetLastError()))
6856ecf6635Schristos 				return -1;
686*657871a7Schristos 		}
687805a1ce9Schristos 		evbuffer_overlapped_set_fd_(bev->input, data->fd);
688805a1ce9Schristos 		evbuffer_overlapped_set_fd_(bev->output, data->fd);
689*657871a7Schristos 		bev_a->ok = data->fd >= 0;
6906ecf6635Schristos 		return 0;
6916ecf6635Schristos 	}
6926ecf6635Schristos 	case BEV_CTRL_CANCEL_ALL: {
6936ecf6635Schristos 		struct bufferevent_async *bev_a = upcast(bev);
694805a1ce9Schristos 		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
695*657871a7Schristos 		if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
6966ecf6635Schristos 		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
6976ecf6635Schristos 			closesocket(fd);
698*657871a7Schristos 			evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
6996ecf6635Schristos 		}
7006ecf6635Schristos 		bev_a->ok = 0;
7016ecf6635Schristos 		return 0;
7026ecf6635Schristos 	}
7036ecf6635Schristos 	case BEV_CTRL_GET_UNDERLYING:
7046ecf6635Schristos 	default:
7056ecf6635Schristos 		return -1;
7066ecf6635Schristos 	}
7076ecf6635Schristos }
7086ecf6635Schristos 
7096ecf6635Schristos 
710