xref: /netbsd-src/external/bsd/libevent/dist/bufferevent_async.c (revision 657871a79c9a2060a6255a242fa1a1ef76b56ec6)
1 /*	$NetBSD: bufferevent_async.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $	*/
2 /*
3  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
4  *
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. The name of the author may not be used to endorse or promote products
16  *    derived from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "event2/event-config.h"
31 #include <sys/cdefs.h>
32 __RCSID("$NetBSD: bufferevent_async.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $");
33 #include "evconfig-private.h"
34 
35 #ifdef EVENT__HAVE_SYS_TIME_H
36 #include <sys/time.h>
37 #endif
38 
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <string.h>
43 #ifdef EVENT__HAVE_STDARG_H
44 #include <stdarg.h>
45 #endif
46 #ifdef EVENT__HAVE_UNISTD_H
47 #include <unistd.h>
48 #endif
49 
50 #ifdef _WIN32
51 #include <winsock2.h>
52 #include <winerror.h>
53 #include <ws2tcpip.h>
54 #endif
55 
56 #include <sys/queue.h>
57 
58 #include "event2/util.h"
59 #include "event2/bufferevent.h"
60 #include "event2/buffer.h"
61 #include "event2/bufferevent_struct.h"
62 #include "event2/event.h"
63 #include "event2/util.h"
64 #include "event-internal.h"
65 #include "log-internal.h"
66 #include "mm-internal.h"
67 #include "bufferevent-internal.h"
68 #include "util-internal.h"
69 #include "iocp-internal.h"
70 
71 #ifndef SO_UPDATE_CONNECT_CONTEXT
72 /* Mingw is sometimes missing this */
73 #define SO_UPDATE_CONNECT_CONTEXT 0x7010
74 #endif
75 
76 /* prototypes */
77 static int be_async_enable(struct bufferevent *, short);
78 static int be_async_disable(struct bufferevent *, short);
79 static void be_async_destruct(struct bufferevent *);
80 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
81 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
82 
83 struct bufferevent_async {
84 	struct bufferevent_private bev;
85 	struct event_overlapped connect_overlapped;
86 	struct event_overlapped read_overlapped;
87 	struct event_overlapped write_overlapped;
88 	size_t read_in_progress;
89 	size_t write_in_progress;
90 	unsigned ok : 1;
91 	unsigned read_added : 1;
92 	unsigned write_added : 1;
93 };
94 
95 const struct bufferevent_ops bufferevent_ops_async = {
96 	"socket_async",
97 	evutil_offsetof(struct bufferevent_async, bev.bev),
98 	be_async_enable,
99 	be_async_disable,
100 	NULL, /* Unlink */
101 	be_async_destruct,
102 	bufferevent_generic_adj_timeouts_,
103 	be_async_flush,
104 	be_async_ctrl,
105 };
106 
107 static inline void
be_async_run_eventcb(struct bufferevent * bev,short what,int options)108 be_async_run_eventcb(struct bufferevent *bev, short what, int options)
109 { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
110 
111 static inline void
be_async_trigger_nolock(struct bufferevent * bev,short what,int options)112 be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
113 { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
114 
115 static inline int
fatal_error(int err)116 fatal_error(int err)
117 {
118 	switch (err) {
119 		/* We may have already associated this fd with a port.
120 		 * Let's hope it's this port, and that the error code
121 		 * for doing this neer changes. */
122 		case ERROR_INVALID_PARAMETER:
123 			return 0;
124 	}
125 	return 1;
126 }
127 
128 static inline struct bufferevent_async *
upcast(struct bufferevent * bev)129 upcast(struct bufferevent *bev)
130 {
131 	struct bufferevent_async *bev_a;
132 	if (!BEV_IS_ASYNC(bev))
133 		return NULL;
134 	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
135 	return bev_a;
136 }
137 
138 static inline struct bufferevent_async *
upcast_connect(struct event_overlapped * eo)139 upcast_connect(struct event_overlapped *eo)
140 {
141 	struct bufferevent_async *bev_a;
142 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
143 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
144 	return bev_a;
145 }
146 
147 static inline struct bufferevent_async *
upcast_read(struct event_overlapped * eo)148 upcast_read(struct event_overlapped *eo)
149 {
150 	struct bufferevent_async *bev_a;
151 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
152 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
153 	return bev_a;
154 }
155 
156 static inline struct bufferevent_async *
upcast_write(struct event_overlapped * eo)157 upcast_write(struct event_overlapped *eo)
158 {
159 	struct bufferevent_async *bev_a;
160 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
161 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
162 	return bev_a;
163 }
164 
165 static void
bev_async_del_write(struct bufferevent_async * beva)166 bev_async_del_write(struct bufferevent_async *beva)
167 {
168 	struct bufferevent *bev = &beva->bev.bev;
169 
170 	if (beva->write_added) {
171 		beva->write_added = 0;
172 		event_base_del_virtual_(bev->ev_base);
173 	}
174 }
175 
176 static void
bev_async_del_read(struct bufferevent_async * beva)177 bev_async_del_read(struct bufferevent_async *beva)
178 {
179 	struct bufferevent *bev = &beva->bev.bev;
180 
181 	if (beva->read_added) {
182 		beva->read_added = 0;
183 		event_base_del_virtual_(bev->ev_base);
184 	}
185 }
186 
187 static void
bev_async_add_write(struct bufferevent_async * beva)188 bev_async_add_write(struct bufferevent_async *beva)
189 {
190 	struct bufferevent *bev = &beva->bev.bev;
191 
192 	if (!beva->write_added) {
193 		beva->write_added = 1;
194 		event_base_add_virtual_(bev->ev_base);
195 	}
196 }
197 
198 static void
bev_async_add_read(struct bufferevent_async * beva)199 bev_async_add_read(struct bufferevent_async *beva)
200 {
201 	struct bufferevent *bev = &beva->bev.bev;
202 
203 	if (!beva->read_added) {
204 		beva->read_added = 1;
205 		event_base_add_virtual_(bev->ev_base);
206 	}
207 }
208 
209 static void
bev_async_consider_writing(struct bufferevent_async * beva)210 bev_async_consider_writing(struct bufferevent_async *beva)
211 {
212 	size_t at_most;
213 	int limit;
214 	struct bufferevent *bev = &beva->bev.bev;
215 
216 	/* Don't write if there's a write in progress, or we do not
217 	 * want to write, or when there's nothing left to write. */
218 	if (beva->write_in_progress || beva->bev.connecting)
219 		return;
220 	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
221 	    !evbuffer_get_length(bev->output)) {
222 		bev_async_del_write(beva);
223 		return;
224 	}
225 
226 	at_most = evbuffer_get_length(bev->output);
227 
228 	/* This is safe so long as bufferevent_get_write_max never returns
229 	 * more than INT_MAX.  That's true for now. XXXX */
230 	limit = (int)bufferevent_get_write_max_(&beva->bev);
231 	if (at_most >= (size_t)limit && limit >= 0)
232 		at_most = limit;
233 
234 	if (beva->bev.write_suspended) {
235 		bev_async_del_write(beva);
236 		return;
237 	}
238 
239 	/*  XXXX doesn't respect low-water mark very well. */
240 	bufferevent_incref_(bev);
241 	if (evbuffer_launch_write_(bev->output, at_most,
242 	    &beva->write_overlapped)) {
243 		bufferevent_decref_(bev);
244 		beva->ok = 0;
245 		be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
246 	} else {
247 		beva->write_in_progress = at_most;
248 		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
249 		bev_async_add_write(beva);
250 	}
251 }
252 
253 static void
bev_async_consider_reading(struct bufferevent_async * beva)254 bev_async_consider_reading(struct bufferevent_async *beva)
255 {
256 	size_t cur_size;
257 	size_t read_high;
258 	size_t at_most;
259 	int limit;
260 	struct bufferevent *bev = &beva->bev.bev;
261 
262 	/* Don't read if there is a read in progress, or we do not
263 	 * want to read. */
264 	if (beva->read_in_progress || beva->bev.connecting)
265 		return;
266 	if (!beva->ok || !(bev->enabled&EV_READ)) {
267 		bev_async_del_read(beva);
268 		return;
269 	}
270 
271 	/* Don't read if we're full */
272 	cur_size = evbuffer_get_length(bev->input);
273 	read_high = bev->wm_read.high;
274 	if (read_high) {
275 		if (cur_size >= read_high) {
276 			bev_async_del_read(beva);
277 			return;
278 		}
279 		at_most = read_high - cur_size;
280 	} else {
281 		at_most = 16384; /* FIXME totally magic. */
282 	}
283 
284 	/* XXXX This over-commits. */
285 	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
286 	limit = (int)bufferevent_get_read_max_(&beva->bev);
287 	if (at_most >= (size_t)limit && limit >= 0)
288 		at_most = limit;
289 
290 	if (beva->bev.read_suspended) {
291 		bev_async_del_read(beva);
292 		return;
293 	}
294 
295 	bufferevent_incref_(bev);
296 	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
297 		beva->ok = 0;
298 		be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
299 		bufferevent_decref_(bev);
300 	} else {
301 		beva->read_in_progress = at_most;
302 		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
303 		bev_async_add_read(beva);
304 	}
305 
306 	return;
307 }
308 
309 static void
be_async_outbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)310 be_async_outbuf_callback(struct evbuffer *buf,
311     const struct evbuffer_cb_info *cbinfo,
312     void *arg)
313 {
314 	struct bufferevent *bev = arg;
315 	struct bufferevent_async *bev_async = upcast(bev);
316 
317 	/* If we added data to the outbuf and were not writing before,
318 	 * we may want to write now. */
319 
320 	bufferevent_incref_and_lock_(bev);
321 
322 	if (cbinfo->n_added)
323 		bev_async_consider_writing(bev_async);
324 
325 	bufferevent_decref_and_unlock_(bev);
326 }
327 
328 static void
be_async_inbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)329 be_async_inbuf_callback(struct evbuffer *buf,
330     const struct evbuffer_cb_info *cbinfo,
331     void *arg)
332 {
333 	struct bufferevent *bev = arg;
334 	struct bufferevent_async *bev_async = upcast(bev);
335 
336 	/* If we drained data from the inbuf and were not reading before,
337 	 * we may want to read now */
338 
339 	bufferevent_incref_and_lock_(bev);
340 
341 	if (cbinfo->n_deleted)
342 		bev_async_consider_reading(bev_async);
343 
344 	bufferevent_decref_and_unlock_(bev);
345 }
346 
347 static int
be_async_enable(struct bufferevent * buf,short what)348 be_async_enable(struct bufferevent *buf, short what)
349 {
350 	struct bufferevent_async *bev_async = upcast(buf);
351 
352 	if (!bev_async->ok)
353 		return -1;
354 
355 	if (bev_async->bev.connecting) {
356 		/* Don't launch anything during connection attempts. */
357 		return 0;
358 	}
359 
360 	if (what & EV_READ)
361 		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
362 	if (what & EV_WRITE)
363 		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
364 
365 	/* If we newly enable reading or writing, and we aren't reading or
366 	   writing already, consider launching a new read or write. */
367 
368 	if (what & EV_READ)
369 		bev_async_consider_reading(bev_async);
370 	if (what & EV_WRITE)
371 		bev_async_consider_writing(bev_async);
372 	return 0;
373 }
374 
375 static int
be_async_disable(struct bufferevent * bev,short what)376 be_async_disable(struct bufferevent *bev, short what)
377 {
378 	struct bufferevent_async *bev_async = upcast(bev);
379 	/* XXXX If we disable reading or writing, we may want to consider
380 	 * canceling any in-progress read or write operation, though it might
381 	 * not work. */
382 
383 	if (what & EV_READ) {
384 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
385 		bev_async_del_read(bev_async);
386 	}
387 	if (what & EV_WRITE) {
388 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
389 		bev_async_del_write(bev_async);
390 	}
391 
392 	return 0;
393 }
394 
395 static void
be_async_destruct(struct bufferevent * bev)396 be_async_destruct(struct bufferevent *bev)
397 {
398 	struct bufferevent_async *bev_async = upcast(bev);
399 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
400 	evutil_socket_t fd;
401 
402 	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
403 			!upcast(bev)->read_in_progress);
404 
405 	bev_async_del_read(bev_async);
406 	bev_async_del_write(bev_async);
407 
408 	fd = evbuffer_overlapped_get_fd_(bev->input);
409 	if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
410 		(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
411 		evutil_closesocket(fd);
412 		evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
413 	}
414 }
415 
416 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
417  * we use WSAGetOverlappedResult to translate. */
418 static void
bev_async_set_wsa_error(struct bufferevent * bev,struct event_overlapped * eo)419 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
420 {
421 	DWORD bytes, flags;
422 	evutil_socket_t fd;
423 
424 	fd = evbuffer_overlapped_get_fd_(bev->input);
425 	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
426 }
427 
428 static int
be_async_flush(struct bufferevent * bev,short what,enum bufferevent_flush_mode mode)429 be_async_flush(struct bufferevent *bev, short what,
430     enum bufferevent_flush_mode mode)
431 {
432 	return 0;
433 }
434 
435 static void
connect_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)436 connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
437     ev_ssize_t nbytes, int ok)
438 {
439 	struct bufferevent_async *bev_a = upcast_connect(eo);
440 	struct bufferevent *bev = &bev_a->bev.bev;
441 	evutil_socket_t sock;
442 
443 	BEV_LOCK(bev);
444 
445 	EVUTIL_ASSERT(bev_a->bev.connecting);
446 	bev_a->bev.connecting = 0;
447 	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
448 	/* XXXX Handle error? */
449 	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
450 
451 	if (ok)
452 		bufferevent_async_set_connected_(bev);
453 	else
454 		bev_async_set_wsa_error(bev, eo);
455 
456 	be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
457 
458 	event_base_del_virtual_(bev->ev_base);
459 
460 	bufferevent_decref_and_unlock_(bev);
461 }
462 
463 static void
read_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)464 read_complete(struct event_overlapped *eo, ev_uintptr_t key,
465     ev_ssize_t nbytes, int ok)
466 {
467 	struct bufferevent_async *bev_a = upcast_read(eo);
468 	struct bufferevent *bev = &bev_a->bev.bev;
469 	short what = BEV_EVENT_READING;
470 	ev_ssize_t amount_unread;
471 	BEV_LOCK(bev);
472 	EVUTIL_ASSERT(bev_a->read_in_progress);
473 
474 	amount_unread = bev_a->read_in_progress - nbytes;
475 	evbuffer_commit_read_(bev->input, nbytes);
476 	bev_a->read_in_progress = 0;
477 	if (amount_unread)
478 		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
479 
480 	if (!ok)
481 		bev_async_set_wsa_error(bev, eo);
482 
483 	if (bev_a->ok) {
484 		if (ok && nbytes) {
485 			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
486 			be_async_trigger_nolock(bev, EV_READ, 0);
487 			bev_async_consider_reading(bev_a);
488 		} else if (!ok) {
489 			what |= BEV_EVENT_ERROR;
490 			bev_a->ok = 0;
491 			be_async_run_eventcb(bev, what, 0);
492 		} else if (!nbytes) {
493 			what |= BEV_EVENT_EOF;
494 			bev_a->ok = 0;
495 			be_async_run_eventcb(bev, what, 0);
496 		}
497 	}
498 
499 	bufferevent_decref_and_unlock_(bev);
500 }
501 
502 static void
write_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)503 write_complete(struct event_overlapped *eo, ev_uintptr_t key,
504     ev_ssize_t nbytes, int ok)
505 {
506 	struct bufferevent_async *bev_a = upcast_write(eo);
507 	struct bufferevent *bev = &bev_a->bev.bev;
508 	short what = BEV_EVENT_WRITING;
509 	ev_ssize_t amount_unwritten;
510 
511 	BEV_LOCK(bev);
512 	EVUTIL_ASSERT(bev_a->write_in_progress);
513 
514 	amount_unwritten = bev_a->write_in_progress - nbytes;
515 	evbuffer_commit_write_(bev->output, nbytes);
516 	bev_a->write_in_progress = 0;
517 
518 	if (amount_unwritten)
519 		bufferevent_decrement_write_buckets_(&bev_a->bev,
520 		                                     -amount_unwritten);
521 
522 
523 	if (!ok)
524 		bev_async_set_wsa_error(bev, eo);
525 
526 	if (bev_a->ok) {
527 		if (ok && nbytes) {
528 			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
529 			be_async_trigger_nolock(bev, EV_WRITE, 0);
530 			bev_async_consider_writing(bev_a);
531 		} else if (!ok) {
532 			what |= BEV_EVENT_ERROR;
533 			bev_a->ok = 0;
534 			be_async_run_eventcb(bev, what, 0);
535 		} else if (!nbytes) {
536 			what |= BEV_EVENT_EOF;
537 			bev_a->ok = 0;
538 			be_async_run_eventcb(bev, what, 0);
539 		}
540 	}
541 
542 	bufferevent_decref_and_unlock_(bev);
543 }
544 
545 struct bufferevent *
bufferevent_async_new_(struct event_base * base,evutil_socket_t fd,int options)546 bufferevent_async_new_(struct event_base *base,
547     evutil_socket_t fd, int options)
548 {
549 	struct bufferevent_async *bev_a;
550 	struct bufferevent *bev;
551 	struct event_iocp_port *iocp;
552 
553 	options |= BEV_OPT_THREADSAFE;
554 
555 	if (!(iocp = event_base_get_iocp_(base)))
556 		return NULL;
557 
558 	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
559 		if (fatal_error(GetLastError()))
560 			return NULL;
561 	}
562 
563 	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
564 		return NULL;
565 
566 	bev = &bev_a->bev.bev;
567 	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
568 		mm_free(bev_a);
569 		return NULL;
570 	}
571 	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
572 		evbuffer_free(bev->input);
573 		mm_free(bev_a);
574 		return NULL;
575 	}
576 
577 	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
578 		options)<0)
579 		goto err;
580 
581 	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
582 	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
583 
584 	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
585 	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
586 	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
587 
588 	bufferevent_init_generic_timeout_cbs_(bev);
589 
590 	bev_a->ok = fd >= 0;
591 
592 	return bev;
593 err:
594 	bufferevent_free(&bev_a->bev.bev);
595 	return NULL;
596 }
597 
598 void
bufferevent_async_set_connected_(struct bufferevent * bev)599 bufferevent_async_set_connected_(struct bufferevent *bev)
600 {
601 	struct bufferevent_async *bev_async = upcast(bev);
602 	bev_async->ok = 1;
603 	/* Now's a good time to consider reading/writing */
604 	be_async_enable(bev, bev->enabled);
605 }
606 
607 int
bufferevent_async_can_connect_(struct bufferevent * bev)608 bufferevent_async_can_connect_(struct bufferevent *bev)
609 {
610 	const struct win32_extension_fns *ext =
611 	    event_get_win32_extension_fns_();
612 
613 	if (BEV_IS_ASYNC(bev) &&
614 	    event_base_get_iocp_(bev->ev_base) &&
615 	    ext && ext->ConnectEx)
616 		return 1;
617 
618 	return 0;
619 }
620 
621 int
bufferevent_async_connect_(struct bufferevent * bev,evutil_socket_t fd,const struct sockaddr * sa,int socklen)622 bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
623 	const struct sockaddr *sa, int socklen)
624 {
625 	BOOL rc;
626 	struct bufferevent_async *bev_async = upcast(bev);
627 	struct sockaddr_storage ss;
628 	const struct win32_extension_fns *ext =
629 	    event_get_win32_extension_fns_();
630 
631 	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
632 
633 	/* ConnectEx() requires that the socket be bound to an address
634 	 * with bind() before using, otherwise it will fail. We attempt
635 	 * to issue a bind() here, taking into account that the error
636 	 * code is set to WSAEINVAL when the socket is already bound. */
637 	memset(&ss, 0, sizeof(ss));
638 	if (sa->sa_family == AF_INET) {
639 		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
640 		sin->sin_family = AF_INET;
641 		sin->sin_addr.s_addr = INADDR_ANY;
642 	} else if (sa->sa_family == AF_INET6) {
643 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
644 		sin6->sin6_family = AF_INET6;
645 		sin6->sin6_addr = in6addr_any;
646 	} else {
647 		/* Well, the user will have to bind() */
648 		return -1;
649 	}
650 	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
651 	    WSAGetLastError() != WSAEINVAL)
652 		return -1;
653 
654 	event_base_add_virtual_(bev->ev_base);
655 	bufferevent_incref_(bev);
656 	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
657 			    &bev_async->connect_overlapped.overlapped);
658 	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
659 		return 0;
660 
661 	event_base_del_virtual_(bev->ev_base);
662 	bufferevent_decref_(bev);
663 
664 	return -1;
665 }
666 
667 static int
be_async_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)668 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
669     union bufferevent_ctrl_data *data)
670 {
671 	switch (op) {
672 	case BEV_CTRL_GET_FD:
673 		data->fd = evbuffer_overlapped_get_fd_(bev->input);
674 		return 0;
675 	case BEV_CTRL_SET_FD: {
676 		struct bufferevent_async *bev_a = upcast(bev);
677 		struct event_iocp_port *iocp;
678 
679 		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
680 			return 0;
681 		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
682 			return -1;
683 		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
684 			if (fatal_error(GetLastError()))
685 				return -1;
686 		}
687 		evbuffer_overlapped_set_fd_(bev->input, data->fd);
688 		evbuffer_overlapped_set_fd_(bev->output, data->fd);
689 		bev_a->ok = data->fd >= 0;
690 		return 0;
691 	}
692 	case BEV_CTRL_CANCEL_ALL: {
693 		struct bufferevent_async *bev_a = upcast(bev);
694 		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
695 		if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
696 		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
697 			closesocket(fd);
698 			evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
699 		}
700 		bev_a->ok = 0;
701 		return 0;
702 	}
703 	case BEV_CTRL_GET_UNDERLYING:
704 	default:
705 		return -1;
706 	}
707 }
708 
709 
710