xref: /netbsd-src/external/bsd/libevent/dist/bufferevent_async.c (revision 154bfe8e089c1a0a4e9ed8414f08d3da90949162)
1 /*	$NetBSD: bufferevent_async.c,v 1.1.1.2 2017/01/31 21:14:52 christos Exp $	*/
2 /*
3  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
4  *
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. The name of the author may not be used to endorse or promote products
16  *    derived from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "event2/event-config.h"
31 #include <sys/cdefs.h>
32 __RCSID("$NetBSD: bufferevent_async.c,v 1.1.1.2 2017/01/31 21:14:52 christos Exp $");
33 #include "evconfig-private.h"
34 
35 #ifdef EVENT__HAVE_SYS_TIME_H
36 #include <sys/time.h>
37 #endif
38 
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <string.h>
43 #ifdef EVENT__HAVE_STDARG_H
44 #include <stdarg.h>
45 #endif
46 #ifdef EVENT__HAVE_UNISTD_H
47 #include <unistd.h>
48 #endif
49 
50 #ifdef _WIN32
51 #include <winsock2.h>
52 #include <ws2tcpip.h>
53 #endif
54 
55 #include <sys/queue.h>
56 
57 #include "event2/util.h"
58 #include "event2/bufferevent.h"
59 #include "event2/buffer.h"
60 #include "event2/bufferevent_struct.h"
61 #include "event2/event.h"
62 #include "event2/util.h"
63 #include "event-internal.h"
64 #include "log-internal.h"
65 #include "mm-internal.h"
66 #include "bufferevent-internal.h"
67 #include "util-internal.h"
68 #include "iocp-internal.h"
69 
70 #ifndef SO_UPDATE_CONNECT_CONTEXT
71 /* Mingw is sometimes missing this */
72 #define SO_UPDATE_CONNECT_CONTEXT 0x7010
73 #endif
74 
75 /* prototypes */
76 static int be_async_enable(struct bufferevent *, short);
77 static int be_async_disable(struct bufferevent *, short);
78 static void be_async_destruct(struct bufferevent *);
79 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
80 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
81 
82 struct bufferevent_async {
83 	struct bufferevent_private bev;
84 	struct event_overlapped connect_overlapped;
85 	struct event_overlapped read_overlapped;
86 	struct event_overlapped write_overlapped;
87 	size_t read_in_progress;
88 	size_t write_in_progress;
89 	unsigned ok : 1;
90 	unsigned read_added : 1;
91 	unsigned write_added : 1;
92 };
93 
94 const struct bufferevent_ops bufferevent_ops_async = {
95 	"socket_async",
96 	evutil_offsetof(struct bufferevent_async, bev.bev),
97 	be_async_enable,
98 	be_async_disable,
99 	NULL, /* Unlink */
100 	be_async_destruct,
101 	bufferevent_generic_adj_timeouts_,
102 	be_async_flush,
103 	be_async_ctrl,
104 };
105 
106 static inline struct bufferevent_async *
107 upcast(struct bufferevent *bev)
108 {
109 	struct bufferevent_async *bev_a;
110 	if (bev->be_ops != &bufferevent_ops_async)
111 		return NULL;
112 	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
113 	return bev_a;
114 }
115 
116 static inline struct bufferevent_async *
117 upcast_connect(struct event_overlapped *eo)
118 {
119 	struct bufferevent_async *bev_a;
120 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
121 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
122 	return bev_a;
123 }
124 
125 static inline struct bufferevent_async *
126 upcast_read(struct event_overlapped *eo)
127 {
128 	struct bufferevent_async *bev_a;
129 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
130 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
131 	return bev_a;
132 }
133 
134 static inline struct bufferevent_async *
135 upcast_write(struct event_overlapped *eo)
136 {
137 	struct bufferevent_async *bev_a;
138 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
139 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
140 	return bev_a;
141 }
142 
143 static void
144 bev_async_del_write(struct bufferevent_async *beva)
145 {
146 	struct bufferevent *bev = &beva->bev.bev;
147 
148 	if (beva->write_added) {
149 		beva->write_added = 0;
150 		event_base_del_virtual_(bev->ev_base);
151 	}
152 }
153 
154 static void
155 bev_async_del_read(struct bufferevent_async *beva)
156 {
157 	struct bufferevent *bev = &beva->bev.bev;
158 
159 	if (beva->read_added) {
160 		beva->read_added = 0;
161 		event_base_del_virtual_(bev->ev_base);
162 	}
163 }
164 
165 static void
166 bev_async_add_write(struct bufferevent_async *beva)
167 {
168 	struct bufferevent *bev = &beva->bev.bev;
169 
170 	if (!beva->write_added) {
171 		beva->write_added = 1;
172 		event_base_add_virtual_(bev->ev_base);
173 	}
174 }
175 
176 static void
177 bev_async_add_read(struct bufferevent_async *beva)
178 {
179 	struct bufferevent *bev = &beva->bev.bev;
180 
181 	if (!beva->read_added) {
182 		beva->read_added = 1;
183 		event_base_add_virtual_(bev->ev_base);
184 	}
185 }
186 
187 static void
188 bev_async_consider_writing(struct bufferevent_async *beva)
189 {
190 	size_t at_most;
191 	int limit;
192 	struct bufferevent *bev = &beva->bev.bev;
193 
194 	/* Don't write if there's a write in progress, or we do not
195 	 * want to write, or when there's nothing left to write. */
196 	if (beva->write_in_progress || beva->bev.connecting)
197 		return;
198 	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
199 	    !evbuffer_get_length(bev->output)) {
200 		bev_async_del_write(beva);
201 		return;
202 	}
203 
204 	at_most = evbuffer_get_length(bev->output);
205 
206 	/* This is safe so long as bufferevent_get_write_max never returns
207 	 * more than INT_MAX.  That's true for now. XXXX */
208 	limit = (int)bufferevent_get_write_max_(&beva->bev);
209 	if (at_most >= (size_t)limit && limit >= 0)
210 		at_most = limit;
211 
212 	if (beva->bev.write_suspended) {
213 		bev_async_del_write(beva);
214 		return;
215 	}
216 
217 	/*  XXXX doesn't respect low-water mark very well. */
218 	bufferevent_incref_(bev);
219 	if (evbuffer_launch_write_(bev->output, at_most,
220 	    &beva->write_overlapped)) {
221 		bufferevent_decref_(bev);
222 		beva->ok = 0;
223 		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
224 	} else {
225 		beva->write_in_progress = at_most;
226 		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
227 		bev_async_add_write(beva);
228 	}
229 }
230 
231 static void
232 bev_async_consider_reading(struct bufferevent_async *beva)
233 {
234 	size_t cur_size;
235 	size_t read_high;
236 	size_t at_most;
237 	int limit;
238 	struct bufferevent *bev = &beva->bev.bev;
239 
240 	/* Don't read if there is a read in progress, or we do not
241 	 * want to read. */
242 	if (beva->read_in_progress || beva->bev.connecting)
243 		return;
244 	if (!beva->ok || !(bev->enabled&EV_READ)) {
245 		bev_async_del_read(beva);
246 		return;
247 	}
248 
249 	/* Don't read if we're full */
250 	cur_size = evbuffer_get_length(bev->input);
251 	read_high = bev->wm_read.high;
252 	if (read_high) {
253 		if (cur_size >= read_high) {
254 			bev_async_del_read(beva);
255 			return;
256 		}
257 		at_most = read_high - cur_size;
258 	} else {
259 		at_most = 16384; /* FIXME totally magic. */
260 	}
261 
262 	/* XXXX This over-commits. */
263 	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
264 	limit = (int)bufferevent_get_read_max_(&beva->bev);
265 	if (at_most >= (size_t)limit && limit >= 0)
266 		at_most = limit;
267 
268 	if (beva->bev.read_suspended) {
269 		bev_async_del_read(beva);
270 		return;
271 	}
272 
273 	bufferevent_incref_(bev);
274 	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
275 		beva->ok = 0;
276 		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
277 		bufferevent_decref_(bev);
278 	} else {
279 		beva->read_in_progress = at_most;
280 		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
281 		bev_async_add_read(beva);
282 	}
283 
284 	return;
285 }
286 
287 static void
288 be_async_outbuf_callback(struct evbuffer *buf,
289     const struct evbuffer_cb_info *cbinfo,
290     void *arg)
291 {
292 	struct bufferevent *bev = arg;
293 	struct bufferevent_async *bev_async = upcast(bev);
294 
295 	/* If we added data to the outbuf and were not writing before,
296 	 * we may want to write now. */
297 
298 	bufferevent_incref_and_lock_(bev);
299 
300 	if (cbinfo->n_added)
301 		bev_async_consider_writing(bev_async);
302 
303 	bufferevent_decref_and_unlock_(bev);
304 }
305 
306 static void
307 be_async_inbuf_callback(struct evbuffer *buf,
308     const struct evbuffer_cb_info *cbinfo,
309     void *arg)
310 {
311 	struct bufferevent *bev = arg;
312 	struct bufferevent_async *bev_async = upcast(bev);
313 
314 	/* If we drained data from the inbuf and were not reading before,
315 	 * we may want to read now */
316 
317 	bufferevent_incref_and_lock_(bev);
318 
319 	if (cbinfo->n_deleted)
320 		bev_async_consider_reading(bev_async);
321 
322 	bufferevent_decref_and_unlock_(bev);
323 }
324 
325 static int
326 be_async_enable(struct bufferevent *buf, short what)
327 {
328 	struct bufferevent_async *bev_async = upcast(buf);
329 
330 	if (!bev_async->ok)
331 		return -1;
332 
333 	if (bev_async->bev.connecting) {
334 		/* Don't launch anything during connection attempts. */
335 		return 0;
336 	}
337 
338 	if (what & EV_READ)
339 		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
340 	if (what & EV_WRITE)
341 		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
342 
343 	/* If we newly enable reading or writing, and we aren't reading or
344 	   writing already, consider launching a new read or write. */
345 
346 	if (what & EV_READ)
347 		bev_async_consider_reading(bev_async);
348 	if (what & EV_WRITE)
349 		bev_async_consider_writing(bev_async);
350 	return 0;
351 }
352 
353 static int
354 be_async_disable(struct bufferevent *bev, short what)
355 {
356 	struct bufferevent_async *bev_async = upcast(bev);
357 	/* XXXX If we disable reading or writing, we may want to consider
358 	 * canceling any in-progress read or write operation, though it might
359 	 * not work. */
360 
361 	if (what & EV_READ) {
362 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
363 		bev_async_del_read(bev_async);
364 	}
365 	if (what & EV_WRITE) {
366 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
367 		bev_async_del_write(bev_async);
368 	}
369 
370 	return 0;
371 }
372 
373 static void
374 be_async_destruct(struct bufferevent *bev)
375 {
376 	struct bufferevent_async *bev_async = upcast(bev);
377 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
378 	evutil_socket_t fd;
379 
380 	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
381 			!upcast(bev)->read_in_progress);
382 
383 	bev_async_del_read(bev_async);
384 	bev_async_del_write(bev_async);
385 
386 	fd = evbuffer_overlapped_get_fd_(bev->input);
387 	if (fd != (evutil_socket_t)INVALID_SOCKET &&
388 		(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
389 		evutil_closesocket(fd);
390 		evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
391 	}
392 }
393 
394 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
395  * we use WSAGetOverlappedResult to translate. */
396 static void
397 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
398 {
399 	DWORD bytes, flags;
400 	evutil_socket_t fd;
401 
402 	fd = evbuffer_overlapped_get_fd_(bev->input);
403 	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
404 }
405 
406 static int
407 be_async_flush(struct bufferevent *bev, short what,
408     enum bufferevent_flush_mode mode)
409 {
410 	return 0;
411 }
412 
413 static void
414 connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
415     ev_ssize_t nbytes, int ok)
416 {
417 	struct bufferevent_async *bev_a = upcast_connect(eo);
418 	struct bufferevent *bev = &bev_a->bev.bev;
419 	evutil_socket_t sock;
420 
421 	BEV_LOCK(bev);
422 
423 	EVUTIL_ASSERT(bev_a->bev.connecting);
424 	bev_a->bev.connecting = 0;
425 	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
426 	/* XXXX Handle error? */
427 	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
428 
429 	if (ok)
430 		bufferevent_async_set_connected_(bev);
431 	else
432 		bev_async_set_wsa_error(bev, eo);
433 
434 	bufferevent_run_eventcb_(bev,
435 			ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
436 
437 	event_base_del_virtual_(bev->ev_base);
438 
439 	bufferevent_decref_and_unlock_(bev);
440 }
441 
442 static void
443 read_complete(struct event_overlapped *eo, ev_uintptr_t key,
444     ev_ssize_t nbytes, int ok)
445 {
446 	struct bufferevent_async *bev_a = upcast_read(eo);
447 	struct bufferevent *bev = &bev_a->bev.bev;
448 	short what = BEV_EVENT_READING;
449 	ev_ssize_t amount_unread;
450 	BEV_LOCK(bev);
451 	EVUTIL_ASSERT(bev_a->read_in_progress);
452 
453 	amount_unread = bev_a->read_in_progress - nbytes;
454 	evbuffer_commit_read_(bev->input, nbytes);
455 	bev_a->read_in_progress = 0;
456 	if (amount_unread)
457 		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
458 
459 	if (!ok)
460 		bev_async_set_wsa_error(bev, eo);
461 
462 	if (bev_a->ok) {
463 		if (ok && nbytes) {
464 			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
465 			bufferevent_trigger_nolock_(bev, EV_READ, 0);
466 			bev_async_consider_reading(bev_a);
467 		} else if (!ok) {
468 			what |= BEV_EVENT_ERROR;
469 			bev_a->ok = 0;
470 			bufferevent_run_eventcb_(bev, what, 0);
471 		} else if (!nbytes) {
472 			what |= BEV_EVENT_EOF;
473 			bev_a->ok = 0;
474 			bufferevent_run_eventcb_(bev, what, 0);
475 		}
476 	}
477 
478 	bufferevent_decref_and_unlock_(bev);
479 }
480 
481 static void
482 write_complete(struct event_overlapped *eo, ev_uintptr_t key,
483     ev_ssize_t nbytes, int ok)
484 {
485 	struct bufferevent_async *bev_a = upcast_write(eo);
486 	struct bufferevent *bev = &bev_a->bev.bev;
487 	short what = BEV_EVENT_WRITING;
488 	ev_ssize_t amount_unwritten;
489 
490 	BEV_LOCK(bev);
491 	EVUTIL_ASSERT(bev_a->write_in_progress);
492 
493 	amount_unwritten = bev_a->write_in_progress - nbytes;
494 	evbuffer_commit_write_(bev->output, nbytes);
495 	bev_a->write_in_progress = 0;
496 
497 	if (amount_unwritten)
498 		bufferevent_decrement_write_buckets_(&bev_a->bev,
499 		                                     -amount_unwritten);
500 
501 
502 	if (!ok)
503 		bev_async_set_wsa_error(bev, eo);
504 
505 	if (bev_a->ok) {
506 		if (ok && nbytes) {
507 			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
508 			bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
509 			bev_async_consider_writing(bev_a);
510 		} else if (!ok) {
511 			what |= BEV_EVENT_ERROR;
512 			bev_a->ok = 0;
513 			bufferevent_run_eventcb_(bev, what, 0);
514 		} else if (!nbytes) {
515 			what |= BEV_EVENT_EOF;
516 			bev_a->ok = 0;
517 			bufferevent_run_eventcb_(bev, what, 0);
518 		}
519 	}
520 
521 	bufferevent_decref_and_unlock_(bev);
522 }
523 
524 struct bufferevent *
525 bufferevent_async_new_(struct event_base *base,
526     evutil_socket_t fd, int options)
527 {
528 	struct bufferevent_async *bev_a;
529 	struct bufferevent *bev;
530 	struct event_iocp_port *iocp;
531 
532 	options |= BEV_OPT_THREADSAFE;
533 
534 	if (!(iocp = event_base_get_iocp_(base)))
535 		return NULL;
536 
537 	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
538 		int err = GetLastError();
539 		/* We may have alrady associated this fd with a port.
540 		 * Let's hope it's this port, and that the error code
541 		 * for doing this neer changes. */
542 		if (err != ERROR_INVALID_PARAMETER)
543 			return NULL;
544 	}
545 
546 	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
547 		return NULL;
548 
549 	bev = &bev_a->bev.bev;
550 	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
551 		mm_free(bev_a);
552 		return NULL;
553 	}
554 	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
555 		evbuffer_free(bev->input);
556 		mm_free(bev_a);
557 		return NULL;
558 	}
559 
560 	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
561 		options)<0)
562 		goto err;
563 
564 	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
565 	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
566 
567 	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
568 	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
569 	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
570 
571 	bufferevent_init_generic_timeout_cbs_(bev);
572 
573 	bev_a->ok = fd >= 0;
574 
575 	return bev;
576 err:
577 	bufferevent_free(&bev_a->bev.bev);
578 	return NULL;
579 }
580 
581 void
582 bufferevent_async_set_connected_(struct bufferevent *bev)
583 {
584 	struct bufferevent_async *bev_async = upcast(bev);
585 	bev_async->ok = 1;
586 	bufferevent_init_generic_timeout_cbs_(bev);
587 	/* Now's a good time to consider reading/writing */
588 	be_async_enable(bev, bev->enabled);
589 }
590 
591 int
592 bufferevent_async_can_connect_(struct bufferevent *bev)
593 {
594 	const struct win32_extension_fns *ext =
595 	    event_get_win32_extension_fns_();
596 
597 	if (BEV_IS_ASYNC(bev) &&
598 	    event_base_get_iocp_(bev->ev_base) &&
599 	    ext && ext->ConnectEx)
600 		return 1;
601 
602 	return 0;
603 }
604 
605 int
606 bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
607 	const struct sockaddr *sa, int socklen)
608 {
609 	BOOL rc;
610 	struct bufferevent_async *bev_async = upcast(bev);
611 	struct sockaddr_storage ss;
612 	const struct win32_extension_fns *ext =
613 	    event_get_win32_extension_fns_();
614 
615 	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
616 
617 	/* ConnectEx() requires that the socket be bound to an address
618 	 * with bind() before using, otherwise it will fail. We attempt
619 	 * to issue a bind() here, taking into account that the error
620 	 * code is set to WSAEINVAL when the socket is already bound. */
621 	memset(&ss, 0, sizeof(ss));
622 	if (sa->sa_family == AF_INET) {
623 		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
624 		sin->sin_family = AF_INET;
625 		sin->sin_addr.s_addr = INADDR_ANY;
626 	} else if (sa->sa_family == AF_INET6) {
627 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
628 		sin6->sin6_family = AF_INET6;
629 		sin6->sin6_addr = in6addr_any;
630 	} else {
631 		/* Well, the user will have to bind() */
632 		return -1;
633 	}
634 	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
635 	    WSAGetLastError() != WSAEINVAL)
636 		return -1;
637 
638 	event_base_add_virtual_(bev->ev_base);
639 	bufferevent_incref_(bev);
640 	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
641 			    &bev_async->connect_overlapped.overlapped);
642 	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
643 		return 0;
644 
645 	event_base_del_virtual_(bev->ev_base);
646 	bufferevent_decref_(bev);
647 
648 	return -1;
649 }
650 
651 static int
652 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
653     union bufferevent_ctrl_data *data)
654 {
655 	switch (op) {
656 	case BEV_CTRL_GET_FD:
657 		data->fd = evbuffer_overlapped_get_fd_(bev->input);
658 		return 0;
659 	case BEV_CTRL_SET_FD: {
660 		struct event_iocp_port *iocp;
661 
662 		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
663 			return 0;
664 		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
665 			return -1;
666 		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0)
667 			return -1;
668 		evbuffer_overlapped_set_fd_(bev->input, data->fd);
669 		evbuffer_overlapped_set_fd_(bev->output, data->fd);
670 		return 0;
671 	}
672 	case BEV_CTRL_CANCEL_ALL: {
673 		struct bufferevent_async *bev_a = upcast(bev);
674 		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
675 		if (fd != (evutil_socket_t)INVALID_SOCKET &&
676 		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
677 			closesocket(fd);
678 			evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
679 		}
680 		bev_a->ok = 0;
681 		return 0;
682 	}
683 	case BEV_CTRL_GET_UNDERLYING:
684 	default:
685 		return -1;
686 	}
687 }
688 
689 
690