1*657871a7Schristos /* $NetBSD: bufferevent_async.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $ */
26ecf6635Schristos /*
36ecf6635Schristos * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
46ecf6635Schristos *
56ecf6635Schristos * All rights reserved.
66ecf6635Schristos *
76ecf6635Schristos * Redistribution and use in source and binary forms, with or without
86ecf6635Schristos * modification, are permitted provided that the following conditions
96ecf6635Schristos * are met:
106ecf6635Schristos * 1. Redistributions of source code must retain the above copyright
116ecf6635Schristos * notice, this list of conditions and the following disclaimer.
126ecf6635Schristos * 2. Redistributions in binary form must reproduce the above copyright
136ecf6635Schristos * notice, this list of conditions and the following disclaimer in the
146ecf6635Schristos * documentation and/or other materials provided with the distribution.
156ecf6635Schristos * 3. The name of the author may not be used to endorse or promote products
166ecf6635Schristos * derived from this software without specific prior written permission.
176ecf6635Schristos *
186ecf6635Schristos * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
196ecf6635Schristos * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
206ecf6635Schristos * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
216ecf6635Schristos * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
226ecf6635Schristos * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
236ecf6635Schristos * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
246ecf6635Schristos * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
256ecf6635Schristos * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
266ecf6635Schristos * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
276ecf6635Schristos * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
286ecf6635Schristos */
296ecf6635Schristos
306ecf6635Schristos #include "event2/event-config.h"
316ecf6635Schristos #include <sys/cdefs.h>
32*657871a7Schristos __RCSID("$NetBSD: bufferevent_async.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $");
33805a1ce9Schristos #include "evconfig-private.h"
346ecf6635Schristos
35805a1ce9Schristos #ifdef EVENT__HAVE_SYS_TIME_H
366ecf6635Schristos #include <sys/time.h>
376ecf6635Schristos #endif
386ecf6635Schristos
396ecf6635Schristos #include <errno.h>
406ecf6635Schristos #include <stdio.h>
416ecf6635Schristos #include <stdlib.h>
426ecf6635Schristos #include <string.h>
43805a1ce9Schristos #ifdef EVENT__HAVE_STDARG_H
446ecf6635Schristos #include <stdarg.h>
456ecf6635Schristos #endif
46805a1ce9Schristos #ifdef EVENT__HAVE_UNISTD_H
476ecf6635Schristos #include <unistd.h>
486ecf6635Schristos #endif
496ecf6635Schristos
50805a1ce9Schristos #ifdef _WIN32
516ecf6635Schristos #include <winsock2.h>
52*657871a7Schristos #include <winerror.h>
536ecf6635Schristos #include <ws2tcpip.h>
546ecf6635Schristos #endif
556ecf6635Schristos
566ecf6635Schristos #include <sys/queue.h>
576ecf6635Schristos
586ecf6635Schristos #include "event2/util.h"
596ecf6635Schristos #include "event2/bufferevent.h"
606ecf6635Schristos #include "event2/buffer.h"
616ecf6635Schristos #include "event2/bufferevent_struct.h"
626ecf6635Schristos #include "event2/event.h"
636ecf6635Schristos #include "event2/util.h"
646ecf6635Schristos #include "event-internal.h"
656ecf6635Schristos #include "log-internal.h"
666ecf6635Schristos #include "mm-internal.h"
676ecf6635Schristos #include "bufferevent-internal.h"
686ecf6635Schristos #include "util-internal.h"
696ecf6635Schristos #include "iocp-internal.h"
706ecf6635Schristos
716ecf6635Schristos #ifndef SO_UPDATE_CONNECT_CONTEXT
726ecf6635Schristos /* Mingw is sometimes missing this */
736ecf6635Schristos #define SO_UPDATE_CONNECT_CONTEXT 0x7010
746ecf6635Schristos #endif
756ecf6635Schristos
766ecf6635Schristos /* prototypes */
776ecf6635Schristos static int be_async_enable(struct bufferevent *, short);
786ecf6635Schristos static int be_async_disable(struct bufferevent *, short);
796ecf6635Schristos static void be_async_destruct(struct bufferevent *);
806ecf6635Schristos static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
816ecf6635Schristos static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
826ecf6635Schristos
836ecf6635Schristos struct bufferevent_async {
846ecf6635Schristos struct bufferevent_private bev;
856ecf6635Schristos struct event_overlapped connect_overlapped;
866ecf6635Schristos struct event_overlapped read_overlapped;
876ecf6635Schristos struct event_overlapped write_overlapped;
886ecf6635Schristos size_t read_in_progress;
896ecf6635Schristos size_t write_in_progress;
906ecf6635Schristos unsigned ok : 1;
916ecf6635Schristos unsigned read_added : 1;
926ecf6635Schristos unsigned write_added : 1;
936ecf6635Schristos };
946ecf6635Schristos
956ecf6635Schristos const struct bufferevent_ops bufferevent_ops_async = {
966ecf6635Schristos "socket_async",
976ecf6635Schristos evutil_offsetof(struct bufferevent_async, bev.bev),
986ecf6635Schristos be_async_enable,
996ecf6635Schristos be_async_disable,
100805a1ce9Schristos NULL, /* Unlink */
1016ecf6635Schristos be_async_destruct,
102805a1ce9Schristos bufferevent_generic_adj_timeouts_,
1036ecf6635Schristos be_async_flush,
1046ecf6635Schristos be_async_ctrl,
1056ecf6635Schristos };
1066ecf6635Schristos
107*657871a7Schristos static inline void
be_async_run_eventcb(struct bufferevent * bev,short what,int options)108*657871a7Schristos be_async_run_eventcb(struct bufferevent *bev, short what, int options)
109*657871a7Schristos { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
110*657871a7Schristos
111*657871a7Schristos static inline void
be_async_trigger_nolock(struct bufferevent * bev,short what,int options)112*657871a7Schristos be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
113*657871a7Schristos { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
114*657871a7Schristos
115*657871a7Schristos static inline int
fatal_error(int err)116*657871a7Schristos fatal_error(int err)
117*657871a7Schristos {
118*657871a7Schristos switch (err) {
119*657871a7Schristos /* We may have already associated this fd with a port.
120*657871a7Schristos * Let's hope it's this port, and that the error code
121*657871a7Schristos * for doing this neer changes. */
122*657871a7Schristos case ERROR_INVALID_PARAMETER:
123*657871a7Schristos return 0;
124*657871a7Schristos }
125*657871a7Schristos return 1;
126*657871a7Schristos }
127*657871a7Schristos
1286ecf6635Schristos static inline struct bufferevent_async *
upcast(struct bufferevent * bev)1296ecf6635Schristos upcast(struct bufferevent *bev)
1306ecf6635Schristos {
1316ecf6635Schristos struct bufferevent_async *bev_a;
132*657871a7Schristos if (!BEV_IS_ASYNC(bev))
1336ecf6635Schristos return NULL;
1346ecf6635Schristos bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
1356ecf6635Schristos return bev_a;
1366ecf6635Schristos }
1376ecf6635Schristos
1386ecf6635Schristos static inline struct bufferevent_async *
upcast_connect(struct event_overlapped * eo)1396ecf6635Schristos upcast_connect(struct event_overlapped *eo)
1406ecf6635Schristos {
1416ecf6635Schristos struct bufferevent_async *bev_a;
1426ecf6635Schristos bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
1436ecf6635Schristos EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1446ecf6635Schristos return bev_a;
1456ecf6635Schristos }
1466ecf6635Schristos
1476ecf6635Schristos static inline struct bufferevent_async *
upcast_read(struct event_overlapped * eo)1486ecf6635Schristos upcast_read(struct event_overlapped *eo)
1496ecf6635Schristos {
1506ecf6635Schristos struct bufferevent_async *bev_a;
1516ecf6635Schristos bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
1526ecf6635Schristos EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1536ecf6635Schristos return bev_a;
1546ecf6635Schristos }
1556ecf6635Schristos
1566ecf6635Schristos static inline struct bufferevent_async *
upcast_write(struct event_overlapped * eo)1576ecf6635Schristos upcast_write(struct event_overlapped *eo)
1586ecf6635Schristos {
1596ecf6635Schristos struct bufferevent_async *bev_a;
1606ecf6635Schristos bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
1616ecf6635Schristos EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1626ecf6635Schristos return bev_a;
1636ecf6635Schristos }
1646ecf6635Schristos
1656ecf6635Schristos static void
bev_async_del_write(struct bufferevent_async * beva)1666ecf6635Schristos bev_async_del_write(struct bufferevent_async *beva)
1676ecf6635Schristos {
1686ecf6635Schristos struct bufferevent *bev = &beva->bev.bev;
1696ecf6635Schristos
1706ecf6635Schristos if (beva->write_added) {
1716ecf6635Schristos beva->write_added = 0;
172805a1ce9Schristos event_base_del_virtual_(bev->ev_base);
1736ecf6635Schristos }
1746ecf6635Schristos }
1756ecf6635Schristos
1766ecf6635Schristos static void
bev_async_del_read(struct bufferevent_async * beva)1776ecf6635Schristos bev_async_del_read(struct bufferevent_async *beva)
1786ecf6635Schristos {
1796ecf6635Schristos struct bufferevent *bev = &beva->bev.bev;
1806ecf6635Schristos
1816ecf6635Schristos if (beva->read_added) {
1826ecf6635Schristos beva->read_added = 0;
183805a1ce9Schristos event_base_del_virtual_(bev->ev_base);
1846ecf6635Schristos }
1856ecf6635Schristos }
1866ecf6635Schristos
1876ecf6635Schristos static void
bev_async_add_write(struct bufferevent_async * beva)1886ecf6635Schristos bev_async_add_write(struct bufferevent_async *beva)
1896ecf6635Schristos {
1906ecf6635Schristos struct bufferevent *bev = &beva->bev.bev;
1916ecf6635Schristos
1926ecf6635Schristos if (!beva->write_added) {
1936ecf6635Schristos beva->write_added = 1;
194805a1ce9Schristos event_base_add_virtual_(bev->ev_base);
1956ecf6635Schristos }
1966ecf6635Schristos }
1976ecf6635Schristos
1986ecf6635Schristos static void
bev_async_add_read(struct bufferevent_async * beva)1996ecf6635Schristos bev_async_add_read(struct bufferevent_async *beva)
2006ecf6635Schristos {
2016ecf6635Schristos struct bufferevent *bev = &beva->bev.bev;
2026ecf6635Schristos
2036ecf6635Schristos if (!beva->read_added) {
2046ecf6635Schristos beva->read_added = 1;
205805a1ce9Schristos event_base_add_virtual_(bev->ev_base);
2066ecf6635Schristos }
2076ecf6635Schristos }
2086ecf6635Schristos
2096ecf6635Schristos static void
bev_async_consider_writing(struct bufferevent_async * beva)2106ecf6635Schristos bev_async_consider_writing(struct bufferevent_async *beva)
2116ecf6635Schristos {
2126ecf6635Schristos size_t at_most;
2136ecf6635Schristos int limit;
2146ecf6635Schristos struct bufferevent *bev = &beva->bev.bev;
2156ecf6635Schristos
2166ecf6635Schristos /* Don't write if there's a write in progress, or we do not
2176ecf6635Schristos * want to write, or when there's nothing left to write. */
2186ecf6635Schristos if (beva->write_in_progress || beva->bev.connecting)
2196ecf6635Schristos return;
2206ecf6635Schristos if (!beva->ok || !(bev->enabled&EV_WRITE) ||
2216ecf6635Schristos !evbuffer_get_length(bev->output)) {
2226ecf6635Schristos bev_async_del_write(beva);
2236ecf6635Schristos return;
2246ecf6635Schristos }
2256ecf6635Schristos
2266ecf6635Schristos at_most = evbuffer_get_length(bev->output);
2276ecf6635Schristos
2286ecf6635Schristos /* This is safe so long as bufferevent_get_write_max never returns
2296ecf6635Schristos * more than INT_MAX. That's true for now. XXXX */
230805a1ce9Schristos limit = (int)bufferevent_get_write_max_(&beva->bev);
2316ecf6635Schristos if (at_most >= (size_t)limit && limit >= 0)
2326ecf6635Schristos at_most = limit;
2336ecf6635Schristos
2346ecf6635Schristos if (beva->bev.write_suspended) {
2356ecf6635Schristos bev_async_del_write(beva);
2366ecf6635Schristos return;
2376ecf6635Schristos }
2386ecf6635Schristos
2396ecf6635Schristos /* XXXX doesn't respect low-water mark very well. */
240805a1ce9Schristos bufferevent_incref_(bev);
241805a1ce9Schristos if (evbuffer_launch_write_(bev->output, at_most,
2426ecf6635Schristos &beva->write_overlapped)) {
243805a1ce9Schristos bufferevent_decref_(bev);
2446ecf6635Schristos beva->ok = 0;
245*657871a7Schristos be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
2466ecf6635Schristos } else {
2476ecf6635Schristos beva->write_in_progress = at_most;
248805a1ce9Schristos bufferevent_decrement_write_buckets_(&beva->bev, at_most);
2496ecf6635Schristos bev_async_add_write(beva);
2506ecf6635Schristos }
2516ecf6635Schristos }
2526ecf6635Schristos
2536ecf6635Schristos static void
bev_async_consider_reading(struct bufferevent_async * beva)2546ecf6635Schristos bev_async_consider_reading(struct bufferevent_async *beva)
2556ecf6635Schristos {
2566ecf6635Schristos size_t cur_size;
2576ecf6635Schristos size_t read_high;
2586ecf6635Schristos size_t at_most;
2596ecf6635Schristos int limit;
2606ecf6635Schristos struct bufferevent *bev = &beva->bev.bev;
2616ecf6635Schristos
2626ecf6635Schristos /* Don't read if there is a read in progress, or we do not
2636ecf6635Schristos * want to read. */
2646ecf6635Schristos if (beva->read_in_progress || beva->bev.connecting)
2656ecf6635Schristos return;
2666ecf6635Schristos if (!beva->ok || !(bev->enabled&EV_READ)) {
2676ecf6635Schristos bev_async_del_read(beva);
2686ecf6635Schristos return;
2696ecf6635Schristos }
2706ecf6635Schristos
2716ecf6635Schristos /* Don't read if we're full */
2726ecf6635Schristos cur_size = evbuffer_get_length(bev->input);
2736ecf6635Schristos read_high = bev->wm_read.high;
2746ecf6635Schristos if (read_high) {
2756ecf6635Schristos if (cur_size >= read_high) {
2766ecf6635Schristos bev_async_del_read(beva);
2776ecf6635Schristos return;
2786ecf6635Schristos }
2796ecf6635Schristos at_most = read_high - cur_size;
2806ecf6635Schristos } else {
2816ecf6635Schristos at_most = 16384; /* FIXME totally magic. */
2826ecf6635Schristos }
2836ecf6635Schristos
2846ecf6635Schristos /* XXXX This over-commits. */
285805a1ce9Schristos /* XXXX see also not above on cast on bufferevent_get_write_max_() */
286805a1ce9Schristos limit = (int)bufferevent_get_read_max_(&beva->bev);
2876ecf6635Schristos if (at_most >= (size_t)limit && limit >= 0)
2886ecf6635Schristos at_most = limit;
2896ecf6635Schristos
2906ecf6635Schristos if (beva->bev.read_suspended) {
2916ecf6635Schristos bev_async_del_read(beva);
2926ecf6635Schristos return;
2936ecf6635Schristos }
2946ecf6635Schristos
295805a1ce9Schristos bufferevent_incref_(bev);
296805a1ce9Schristos if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
2976ecf6635Schristos beva->ok = 0;
298*657871a7Schristos be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
299805a1ce9Schristos bufferevent_decref_(bev);
3006ecf6635Schristos } else {
3016ecf6635Schristos beva->read_in_progress = at_most;
302805a1ce9Schristos bufferevent_decrement_read_buckets_(&beva->bev, at_most);
3036ecf6635Schristos bev_async_add_read(beva);
3046ecf6635Schristos }
3056ecf6635Schristos
3066ecf6635Schristos return;
3076ecf6635Schristos }
3086ecf6635Schristos
3096ecf6635Schristos static void
be_async_outbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)3106ecf6635Schristos be_async_outbuf_callback(struct evbuffer *buf,
3116ecf6635Schristos const struct evbuffer_cb_info *cbinfo,
3126ecf6635Schristos void *arg)
3136ecf6635Schristos {
3146ecf6635Schristos struct bufferevent *bev = arg;
3156ecf6635Schristos struct bufferevent_async *bev_async = upcast(bev);
3166ecf6635Schristos
3176ecf6635Schristos /* If we added data to the outbuf and were not writing before,
3186ecf6635Schristos * we may want to write now. */
3196ecf6635Schristos
320805a1ce9Schristos bufferevent_incref_and_lock_(bev);
3216ecf6635Schristos
3226ecf6635Schristos if (cbinfo->n_added)
3236ecf6635Schristos bev_async_consider_writing(bev_async);
3246ecf6635Schristos
325805a1ce9Schristos bufferevent_decref_and_unlock_(bev);
3266ecf6635Schristos }
3276ecf6635Schristos
3286ecf6635Schristos static void
be_async_inbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)3296ecf6635Schristos be_async_inbuf_callback(struct evbuffer *buf,
3306ecf6635Schristos const struct evbuffer_cb_info *cbinfo,
3316ecf6635Schristos void *arg)
3326ecf6635Schristos {
3336ecf6635Schristos struct bufferevent *bev = arg;
3346ecf6635Schristos struct bufferevent_async *bev_async = upcast(bev);
3356ecf6635Schristos
3366ecf6635Schristos /* If we drained data from the inbuf and were not reading before,
3376ecf6635Schristos * we may want to read now */
3386ecf6635Schristos
339805a1ce9Schristos bufferevent_incref_and_lock_(bev);
3406ecf6635Schristos
3416ecf6635Schristos if (cbinfo->n_deleted)
3426ecf6635Schristos bev_async_consider_reading(bev_async);
3436ecf6635Schristos
344805a1ce9Schristos bufferevent_decref_and_unlock_(bev);
3456ecf6635Schristos }
3466ecf6635Schristos
3476ecf6635Schristos static int
be_async_enable(struct bufferevent * buf,short what)3486ecf6635Schristos be_async_enable(struct bufferevent *buf, short what)
3496ecf6635Schristos {
3506ecf6635Schristos struct bufferevent_async *bev_async = upcast(buf);
3516ecf6635Schristos
3526ecf6635Schristos if (!bev_async->ok)
3536ecf6635Schristos return -1;
3546ecf6635Schristos
3556ecf6635Schristos if (bev_async->bev.connecting) {
3566ecf6635Schristos /* Don't launch anything during connection attempts. */
3576ecf6635Schristos return 0;
3586ecf6635Schristos }
3596ecf6635Schristos
3606ecf6635Schristos if (what & EV_READ)
3616ecf6635Schristos BEV_RESET_GENERIC_READ_TIMEOUT(buf);
3626ecf6635Schristos if (what & EV_WRITE)
3636ecf6635Schristos BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
3646ecf6635Schristos
3656ecf6635Schristos /* If we newly enable reading or writing, and we aren't reading or
3666ecf6635Schristos writing already, consider launching a new read or write. */
3676ecf6635Schristos
3686ecf6635Schristos if (what & EV_READ)
3696ecf6635Schristos bev_async_consider_reading(bev_async);
3706ecf6635Schristos if (what & EV_WRITE)
3716ecf6635Schristos bev_async_consider_writing(bev_async);
3726ecf6635Schristos return 0;
3736ecf6635Schristos }
3746ecf6635Schristos
3756ecf6635Schristos static int
be_async_disable(struct bufferevent * bev,short what)3766ecf6635Schristos be_async_disable(struct bufferevent *bev, short what)
3776ecf6635Schristos {
3786ecf6635Schristos struct bufferevent_async *bev_async = upcast(bev);
3796ecf6635Schristos /* XXXX If we disable reading or writing, we may want to consider
3806ecf6635Schristos * canceling any in-progress read or write operation, though it might
3816ecf6635Schristos * not work. */
3826ecf6635Schristos
3836ecf6635Schristos if (what & EV_READ) {
3846ecf6635Schristos BEV_DEL_GENERIC_READ_TIMEOUT(bev);
3856ecf6635Schristos bev_async_del_read(bev_async);
3866ecf6635Schristos }
3876ecf6635Schristos if (what & EV_WRITE) {
3886ecf6635Schristos BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
3896ecf6635Schristos bev_async_del_write(bev_async);
3906ecf6635Schristos }
3916ecf6635Schristos
3926ecf6635Schristos return 0;
3936ecf6635Schristos }
3946ecf6635Schristos
3956ecf6635Schristos static void
be_async_destruct(struct bufferevent * bev)3966ecf6635Schristos be_async_destruct(struct bufferevent *bev)
3976ecf6635Schristos {
3986ecf6635Schristos struct bufferevent_async *bev_async = upcast(bev);
3996ecf6635Schristos struct bufferevent_private *bev_p = BEV_UPCAST(bev);
4006ecf6635Schristos evutil_socket_t fd;
4016ecf6635Schristos
4026ecf6635Schristos EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
4036ecf6635Schristos !upcast(bev)->read_in_progress);
4046ecf6635Schristos
4056ecf6635Schristos bev_async_del_read(bev_async);
4066ecf6635Schristos bev_async_del_write(bev_async);
4076ecf6635Schristos
408805a1ce9Schristos fd = evbuffer_overlapped_get_fd_(bev->input);
409*657871a7Schristos if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
410805a1ce9Schristos (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
4116ecf6635Schristos evutil_closesocket(fd);
412*657871a7Schristos evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
4136ecf6635Schristos }
4146ecf6635Schristos }
4156ecf6635Schristos
4166ecf6635Schristos /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
4176ecf6635Schristos * we use WSAGetOverlappedResult to translate. */
4186ecf6635Schristos static void
bev_async_set_wsa_error(struct bufferevent * bev,struct event_overlapped * eo)4196ecf6635Schristos bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
4206ecf6635Schristos {
4216ecf6635Schristos DWORD bytes, flags;
4226ecf6635Schristos evutil_socket_t fd;
4236ecf6635Schristos
424805a1ce9Schristos fd = evbuffer_overlapped_get_fd_(bev->input);
4256ecf6635Schristos WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
4266ecf6635Schristos }
4276ecf6635Schristos
4286ecf6635Schristos static int
be_async_flush(struct bufferevent * bev,short what,enum bufferevent_flush_mode mode)4296ecf6635Schristos be_async_flush(struct bufferevent *bev, short what,
4306ecf6635Schristos enum bufferevent_flush_mode mode)
4316ecf6635Schristos {
4326ecf6635Schristos return 0;
4336ecf6635Schristos }
4346ecf6635Schristos
4356ecf6635Schristos static void
connect_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)4366ecf6635Schristos connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
4376ecf6635Schristos ev_ssize_t nbytes, int ok)
4386ecf6635Schristos {
4396ecf6635Schristos struct bufferevent_async *bev_a = upcast_connect(eo);
4406ecf6635Schristos struct bufferevent *bev = &bev_a->bev.bev;
4416ecf6635Schristos evutil_socket_t sock;
4426ecf6635Schristos
4436ecf6635Schristos BEV_LOCK(bev);
4446ecf6635Schristos
4456ecf6635Schristos EVUTIL_ASSERT(bev_a->bev.connecting);
4466ecf6635Schristos bev_a->bev.connecting = 0;
447805a1ce9Schristos sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
4486ecf6635Schristos /* XXXX Handle error? */
4496ecf6635Schristos setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
4506ecf6635Schristos
4516ecf6635Schristos if (ok)
452805a1ce9Schristos bufferevent_async_set_connected_(bev);
4536ecf6635Schristos else
4546ecf6635Schristos bev_async_set_wsa_error(bev, eo);
4556ecf6635Schristos
456*657871a7Schristos be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
4576ecf6635Schristos
458805a1ce9Schristos event_base_del_virtual_(bev->ev_base);
4596ecf6635Schristos
460805a1ce9Schristos bufferevent_decref_and_unlock_(bev);
4616ecf6635Schristos }
4626ecf6635Schristos
4636ecf6635Schristos static void
read_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)4646ecf6635Schristos read_complete(struct event_overlapped *eo, ev_uintptr_t key,
4656ecf6635Schristos ev_ssize_t nbytes, int ok)
4666ecf6635Schristos {
4676ecf6635Schristos struct bufferevent_async *bev_a = upcast_read(eo);
4686ecf6635Schristos struct bufferevent *bev = &bev_a->bev.bev;
4696ecf6635Schristos short what = BEV_EVENT_READING;
4706ecf6635Schristos ev_ssize_t amount_unread;
4716ecf6635Schristos BEV_LOCK(bev);
4726ecf6635Schristos EVUTIL_ASSERT(bev_a->read_in_progress);
4736ecf6635Schristos
4746ecf6635Schristos amount_unread = bev_a->read_in_progress - nbytes;
475805a1ce9Schristos evbuffer_commit_read_(bev->input, nbytes);
4766ecf6635Schristos bev_a->read_in_progress = 0;
4776ecf6635Schristos if (amount_unread)
478805a1ce9Schristos bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
4796ecf6635Schristos
4806ecf6635Schristos if (!ok)
4816ecf6635Schristos bev_async_set_wsa_error(bev, eo);
4826ecf6635Schristos
4836ecf6635Schristos if (bev_a->ok) {
4846ecf6635Schristos if (ok && nbytes) {
4856ecf6635Schristos BEV_RESET_GENERIC_READ_TIMEOUT(bev);
486*657871a7Schristos be_async_trigger_nolock(bev, EV_READ, 0);
4876ecf6635Schristos bev_async_consider_reading(bev_a);
4886ecf6635Schristos } else if (!ok) {
4896ecf6635Schristos what |= BEV_EVENT_ERROR;
4906ecf6635Schristos bev_a->ok = 0;
491*657871a7Schristos be_async_run_eventcb(bev, what, 0);
4926ecf6635Schristos } else if (!nbytes) {
4936ecf6635Schristos what |= BEV_EVENT_EOF;
4946ecf6635Schristos bev_a->ok = 0;
495*657871a7Schristos be_async_run_eventcb(bev, what, 0);
4966ecf6635Schristos }
4976ecf6635Schristos }
4986ecf6635Schristos
499805a1ce9Schristos bufferevent_decref_and_unlock_(bev);
5006ecf6635Schristos }
5016ecf6635Schristos
5026ecf6635Schristos static void
write_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)5036ecf6635Schristos write_complete(struct event_overlapped *eo, ev_uintptr_t key,
5046ecf6635Schristos ev_ssize_t nbytes, int ok)
5056ecf6635Schristos {
5066ecf6635Schristos struct bufferevent_async *bev_a = upcast_write(eo);
5076ecf6635Schristos struct bufferevent *bev = &bev_a->bev.bev;
5086ecf6635Schristos short what = BEV_EVENT_WRITING;
5096ecf6635Schristos ev_ssize_t amount_unwritten;
5106ecf6635Schristos
5116ecf6635Schristos BEV_LOCK(bev);
5126ecf6635Schristos EVUTIL_ASSERT(bev_a->write_in_progress);
5136ecf6635Schristos
5146ecf6635Schristos amount_unwritten = bev_a->write_in_progress - nbytes;
515805a1ce9Schristos evbuffer_commit_write_(bev->output, nbytes);
5166ecf6635Schristos bev_a->write_in_progress = 0;
5176ecf6635Schristos
5186ecf6635Schristos if (amount_unwritten)
519805a1ce9Schristos bufferevent_decrement_write_buckets_(&bev_a->bev,
5206ecf6635Schristos -amount_unwritten);
5216ecf6635Schristos
5226ecf6635Schristos
5236ecf6635Schristos if (!ok)
5246ecf6635Schristos bev_async_set_wsa_error(bev, eo);
5256ecf6635Schristos
5266ecf6635Schristos if (bev_a->ok) {
5276ecf6635Schristos if (ok && nbytes) {
5286ecf6635Schristos BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
529*657871a7Schristos be_async_trigger_nolock(bev, EV_WRITE, 0);
5306ecf6635Schristos bev_async_consider_writing(bev_a);
5316ecf6635Schristos } else if (!ok) {
5326ecf6635Schristos what |= BEV_EVENT_ERROR;
5336ecf6635Schristos bev_a->ok = 0;
534*657871a7Schristos be_async_run_eventcb(bev, what, 0);
5356ecf6635Schristos } else if (!nbytes) {
5366ecf6635Schristos what |= BEV_EVENT_EOF;
5376ecf6635Schristos bev_a->ok = 0;
538*657871a7Schristos be_async_run_eventcb(bev, what, 0);
5396ecf6635Schristos }
5406ecf6635Schristos }
5416ecf6635Schristos
542805a1ce9Schristos bufferevent_decref_and_unlock_(bev);
5436ecf6635Schristos }
5446ecf6635Schristos
5456ecf6635Schristos struct bufferevent *
bufferevent_async_new_(struct event_base * base,evutil_socket_t fd,int options)546805a1ce9Schristos bufferevent_async_new_(struct event_base *base,
5476ecf6635Schristos evutil_socket_t fd, int options)
5486ecf6635Schristos {
5496ecf6635Schristos struct bufferevent_async *bev_a;
5506ecf6635Schristos struct bufferevent *bev;
5516ecf6635Schristos struct event_iocp_port *iocp;
5526ecf6635Schristos
5536ecf6635Schristos options |= BEV_OPT_THREADSAFE;
5546ecf6635Schristos
555805a1ce9Schristos if (!(iocp = event_base_get_iocp_(base)))
5566ecf6635Schristos return NULL;
5576ecf6635Schristos
558805a1ce9Schristos if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
559*657871a7Schristos if (fatal_error(GetLastError()))
5606ecf6635Schristos return NULL;
5616ecf6635Schristos }
5626ecf6635Schristos
5636ecf6635Schristos if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
5646ecf6635Schristos return NULL;
5656ecf6635Schristos
5666ecf6635Schristos bev = &bev_a->bev.bev;
567805a1ce9Schristos if (!(bev->input = evbuffer_overlapped_new_(fd))) {
5686ecf6635Schristos mm_free(bev_a);
5696ecf6635Schristos return NULL;
5706ecf6635Schristos }
571805a1ce9Schristos if (!(bev->output = evbuffer_overlapped_new_(fd))) {
5726ecf6635Schristos evbuffer_free(bev->input);
5736ecf6635Schristos mm_free(bev_a);
5746ecf6635Schristos return NULL;
5756ecf6635Schristos }
5766ecf6635Schristos
577805a1ce9Schristos if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
5786ecf6635Schristos options)<0)
5796ecf6635Schristos goto err;
5806ecf6635Schristos
5816ecf6635Schristos evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
5826ecf6635Schristos evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
5836ecf6635Schristos
584805a1ce9Schristos event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
585805a1ce9Schristos event_overlapped_init_(&bev_a->read_overlapped, read_complete);
586805a1ce9Schristos event_overlapped_init_(&bev_a->write_overlapped, write_complete);
587805a1ce9Schristos
588805a1ce9Schristos bufferevent_init_generic_timeout_cbs_(bev);
5896ecf6635Schristos
5906ecf6635Schristos bev_a->ok = fd >= 0;
5916ecf6635Schristos
5926ecf6635Schristos return bev;
5936ecf6635Schristos err:
5946ecf6635Schristos bufferevent_free(&bev_a->bev.bev);
5956ecf6635Schristos return NULL;
5966ecf6635Schristos }
5976ecf6635Schristos
5986ecf6635Schristos void
bufferevent_async_set_connected_(struct bufferevent * bev)599805a1ce9Schristos bufferevent_async_set_connected_(struct bufferevent *bev)
6006ecf6635Schristos {
6016ecf6635Schristos struct bufferevent_async *bev_async = upcast(bev);
6026ecf6635Schristos bev_async->ok = 1;
6036ecf6635Schristos /* Now's a good time to consider reading/writing */
6046ecf6635Schristos be_async_enable(bev, bev->enabled);
6056ecf6635Schristos }
6066ecf6635Schristos
6076ecf6635Schristos int
bufferevent_async_can_connect_(struct bufferevent * bev)608805a1ce9Schristos bufferevent_async_can_connect_(struct bufferevent *bev)
6096ecf6635Schristos {
6106ecf6635Schristos const struct win32_extension_fns *ext =
611805a1ce9Schristos event_get_win32_extension_fns_();
6126ecf6635Schristos
6136ecf6635Schristos if (BEV_IS_ASYNC(bev) &&
614805a1ce9Schristos event_base_get_iocp_(bev->ev_base) &&
6156ecf6635Schristos ext && ext->ConnectEx)
6166ecf6635Schristos return 1;
6176ecf6635Schristos
6186ecf6635Schristos return 0;
6196ecf6635Schristos }
6206ecf6635Schristos
6216ecf6635Schristos int
bufferevent_async_connect_(struct bufferevent * bev,evutil_socket_t fd,const struct sockaddr * sa,int socklen)622805a1ce9Schristos bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
6236ecf6635Schristos const struct sockaddr *sa, int socklen)
6246ecf6635Schristos {
6256ecf6635Schristos BOOL rc;
6266ecf6635Schristos struct bufferevent_async *bev_async = upcast(bev);
6276ecf6635Schristos struct sockaddr_storage ss;
6286ecf6635Schristos const struct win32_extension_fns *ext =
629805a1ce9Schristos event_get_win32_extension_fns_();
6306ecf6635Schristos
6316ecf6635Schristos EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
6326ecf6635Schristos
6336ecf6635Schristos /* ConnectEx() requires that the socket be bound to an address
6346ecf6635Schristos * with bind() before using, otherwise it will fail. We attempt
6356ecf6635Schristos * to issue a bind() here, taking into account that the error
6366ecf6635Schristos * code is set to WSAEINVAL when the socket is already bound. */
6376ecf6635Schristos memset(&ss, 0, sizeof(ss));
6386ecf6635Schristos if (sa->sa_family == AF_INET) {
6396ecf6635Schristos struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
6406ecf6635Schristos sin->sin_family = AF_INET;
6416ecf6635Schristos sin->sin_addr.s_addr = INADDR_ANY;
6426ecf6635Schristos } else if (sa->sa_family == AF_INET6) {
6436ecf6635Schristos struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
6446ecf6635Schristos sin6->sin6_family = AF_INET6;
6456ecf6635Schristos sin6->sin6_addr = in6addr_any;
6466ecf6635Schristos } else {
6476ecf6635Schristos /* Well, the user will have to bind() */
6486ecf6635Schristos return -1;
6496ecf6635Schristos }
6506ecf6635Schristos if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
6516ecf6635Schristos WSAGetLastError() != WSAEINVAL)
6526ecf6635Schristos return -1;
6536ecf6635Schristos
654805a1ce9Schristos event_base_add_virtual_(bev->ev_base);
655805a1ce9Schristos bufferevent_incref_(bev);
6566ecf6635Schristos rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
6576ecf6635Schristos &bev_async->connect_overlapped.overlapped);
6586ecf6635Schristos if (rc || WSAGetLastError() == ERROR_IO_PENDING)
6596ecf6635Schristos return 0;
6606ecf6635Schristos
661805a1ce9Schristos event_base_del_virtual_(bev->ev_base);
662805a1ce9Schristos bufferevent_decref_(bev);
6636ecf6635Schristos
6646ecf6635Schristos return -1;
6656ecf6635Schristos }
6666ecf6635Schristos
6676ecf6635Schristos static int
be_async_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)6686ecf6635Schristos be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
6696ecf6635Schristos union bufferevent_ctrl_data *data)
6706ecf6635Schristos {
6716ecf6635Schristos switch (op) {
6726ecf6635Schristos case BEV_CTRL_GET_FD:
673805a1ce9Schristos data->fd = evbuffer_overlapped_get_fd_(bev->input);
6746ecf6635Schristos return 0;
6756ecf6635Schristos case BEV_CTRL_SET_FD: {
676*657871a7Schristos struct bufferevent_async *bev_a = upcast(bev);
6776ecf6635Schristos struct event_iocp_port *iocp;
6786ecf6635Schristos
679805a1ce9Schristos if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
6806ecf6635Schristos return 0;
681805a1ce9Schristos if (!(iocp = event_base_get_iocp_(bev->ev_base)))
6826ecf6635Schristos return -1;
683*657871a7Schristos if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
684*657871a7Schristos if (fatal_error(GetLastError()))
6856ecf6635Schristos return -1;
686*657871a7Schristos }
687805a1ce9Schristos evbuffer_overlapped_set_fd_(bev->input, data->fd);
688805a1ce9Schristos evbuffer_overlapped_set_fd_(bev->output, data->fd);
689*657871a7Schristos bev_a->ok = data->fd >= 0;
6906ecf6635Schristos return 0;
6916ecf6635Schristos }
6926ecf6635Schristos case BEV_CTRL_CANCEL_ALL: {
6936ecf6635Schristos struct bufferevent_async *bev_a = upcast(bev);
694805a1ce9Schristos evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
695*657871a7Schristos if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
6966ecf6635Schristos (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
6976ecf6635Schristos closesocket(fd);
698*657871a7Schristos evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
6996ecf6635Schristos }
7006ecf6635Schristos bev_a->ok = 0;
7016ecf6635Schristos return 0;
7026ecf6635Schristos }
7036ecf6635Schristos case BEV_CTRL_GET_UNDERLYING:
7046ecf6635Schristos default:
7056ecf6635Schristos return -1;
7066ecf6635Schristos }
7076ecf6635Schristos }
7086ecf6635Schristos
7096ecf6635Schristos
710