1 /* $NetBSD: bufferevent_async.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $ */
2 /*
3 * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
4 *
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "event2/event-config.h"
31 #include <sys/cdefs.h>
32 __RCSID("$NetBSD: bufferevent_async.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $");
33 #include "evconfig-private.h"
34
35 #ifdef EVENT__HAVE_SYS_TIME_H
36 #include <sys/time.h>
37 #endif
38
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <string.h>
43 #ifdef EVENT__HAVE_STDARG_H
44 #include <stdarg.h>
45 #endif
46 #ifdef EVENT__HAVE_UNISTD_H
47 #include <unistd.h>
48 #endif
49
50 #ifdef _WIN32
51 #include <winsock2.h>
52 #include <winerror.h>
53 #include <ws2tcpip.h>
54 #endif
55
56 #include <sys/queue.h>
57
58 #include "event2/util.h"
59 #include "event2/bufferevent.h"
60 #include "event2/buffer.h"
61 #include "event2/bufferevent_struct.h"
62 #include "event2/event.h"
63 #include "event2/util.h"
64 #include "event-internal.h"
65 #include "log-internal.h"
66 #include "mm-internal.h"
67 #include "bufferevent-internal.h"
68 #include "util-internal.h"
69 #include "iocp-internal.h"
70
71 #ifndef SO_UPDATE_CONNECT_CONTEXT
72 /* Mingw is sometimes missing this */
73 #define SO_UPDATE_CONNECT_CONTEXT 0x7010
74 #endif
75
76 /* prototypes */
77 static int be_async_enable(struct bufferevent *, short);
78 static int be_async_disable(struct bufferevent *, short);
79 static void be_async_destruct(struct bufferevent *);
80 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
81 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
82
83 struct bufferevent_async {
84 struct bufferevent_private bev;
85 struct event_overlapped connect_overlapped;
86 struct event_overlapped read_overlapped;
87 struct event_overlapped write_overlapped;
88 size_t read_in_progress;
89 size_t write_in_progress;
90 unsigned ok : 1;
91 unsigned read_added : 1;
92 unsigned write_added : 1;
93 };
94
95 const struct bufferevent_ops bufferevent_ops_async = {
96 "socket_async",
97 evutil_offsetof(struct bufferevent_async, bev.bev),
98 be_async_enable,
99 be_async_disable,
100 NULL, /* Unlink */
101 be_async_destruct,
102 bufferevent_generic_adj_timeouts_,
103 be_async_flush,
104 be_async_ctrl,
105 };
106
107 static inline void
be_async_run_eventcb(struct bufferevent * bev,short what,int options)108 be_async_run_eventcb(struct bufferevent *bev, short what, int options)
109 { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
110
111 static inline void
be_async_trigger_nolock(struct bufferevent * bev,short what,int options)112 be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
113 { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
114
115 static inline int
fatal_error(int err)116 fatal_error(int err)
117 {
118 switch (err) {
119 /* We may have already associated this fd with a port.
120 * Let's hope it's this port, and that the error code
121 * for doing this neer changes. */
122 case ERROR_INVALID_PARAMETER:
123 return 0;
124 }
125 return 1;
126 }
127
128 static inline struct bufferevent_async *
upcast(struct bufferevent * bev)129 upcast(struct bufferevent *bev)
130 {
131 struct bufferevent_async *bev_a;
132 if (!BEV_IS_ASYNC(bev))
133 return NULL;
134 bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
135 return bev_a;
136 }
137
138 static inline struct bufferevent_async *
upcast_connect(struct event_overlapped * eo)139 upcast_connect(struct event_overlapped *eo)
140 {
141 struct bufferevent_async *bev_a;
142 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
143 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
144 return bev_a;
145 }
146
147 static inline struct bufferevent_async *
upcast_read(struct event_overlapped * eo)148 upcast_read(struct event_overlapped *eo)
149 {
150 struct bufferevent_async *bev_a;
151 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
152 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
153 return bev_a;
154 }
155
156 static inline struct bufferevent_async *
upcast_write(struct event_overlapped * eo)157 upcast_write(struct event_overlapped *eo)
158 {
159 struct bufferevent_async *bev_a;
160 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
161 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
162 return bev_a;
163 }
164
165 static void
bev_async_del_write(struct bufferevent_async * beva)166 bev_async_del_write(struct bufferevent_async *beva)
167 {
168 struct bufferevent *bev = &beva->bev.bev;
169
170 if (beva->write_added) {
171 beva->write_added = 0;
172 event_base_del_virtual_(bev->ev_base);
173 }
174 }
175
176 static void
bev_async_del_read(struct bufferevent_async * beva)177 bev_async_del_read(struct bufferevent_async *beva)
178 {
179 struct bufferevent *bev = &beva->bev.bev;
180
181 if (beva->read_added) {
182 beva->read_added = 0;
183 event_base_del_virtual_(bev->ev_base);
184 }
185 }
186
187 static void
bev_async_add_write(struct bufferevent_async * beva)188 bev_async_add_write(struct bufferevent_async *beva)
189 {
190 struct bufferevent *bev = &beva->bev.bev;
191
192 if (!beva->write_added) {
193 beva->write_added = 1;
194 event_base_add_virtual_(bev->ev_base);
195 }
196 }
197
198 static void
bev_async_add_read(struct bufferevent_async * beva)199 bev_async_add_read(struct bufferevent_async *beva)
200 {
201 struct bufferevent *bev = &beva->bev.bev;
202
203 if (!beva->read_added) {
204 beva->read_added = 1;
205 event_base_add_virtual_(bev->ev_base);
206 }
207 }
208
209 static void
bev_async_consider_writing(struct bufferevent_async * beva)210 bev_async_consider_writing(struct bufferevent_async *beva)
211 {
212 size_t at_most;
213 int limit;
214 struct bufferevent *bev = &beva->bev.bev;
215
216 /* Don't write if there's a write in progress, or we do not
217 * want to write, or when there's nothing left to write. */
218 if (beva->write_in_progress || beva->bev.connecting)
219 return;
220 if (!beva->ok || !(bev->enabled&EV_WRITE) ||
221 !evbuffer_get_length(bev->output)) {
222 bev_async_del_write(beva);
223 return;
224 }
225
226 at_most = evbuffer_get_length(bev->output);
227
228 /* This is safe so long as bufferevent_get_write_max never returns
229 * more than INT_MAX. That's true for now. XXXX */
230 limit = (int)bufferevent_get_write_max_(&beva->bev);
231 if (at_most >= (size_t)limit && limit >= 0)
232 at_most = limit;
233
234 if (beva->bev.write_suspended) {
235 bev_async_del_write(beva);
236 return;
237 }
238
239 /* XXXX doesn't respect low-water mark very well. */
240 bufferevent_incref_(bev);
241 if (evbuffer_launch_write_(bev->output, at_most,
242 &beva->write_overlapped)) {
243 bufferevent_decref_(bev);
244 beva->ok = 0;
245 be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
246 } else {
247 beva->write_in_progress = at_most;
248 bufferevent_decrement_write_buckets_(&beva->bev, at_most);
249 bev_async_add_write(beva);
250 }
251 }
252
253 static void
bev_async_consider_reading(struct bufferevent_async * beva)254 bev_async_consider_reading(struct bufferevent_async *beva)
255 {
256 size_t cur_size;
257 size_t read_high;
258 size_t at_most;
259 int limit;
260 struct bufferevent *bev = &beva->bev.bev;
261
262 /* Don't read if there is a read in progress, or we do not
263 * want to read. */
264 if (beva->read_in_progress || beva->bev.connecting)
265 return;
266 if (!beva->ok || !(bev->enabled&EV_READ)) {
267 bev_async_del_read(beva);
268 return;
269 }
270
271 /* Don't read if we're full */
272 cur_size = evbuffer_get_length(bev->input);
273 read_high = bev->wm_read.high;
274 if (read_high) {
275 if (cur_size >= read_high) {
276 bev_async_del_read(beva);
277 return;
278 }
279 at_most = read_high - cur_size;
280 } else {
281 at_most = 16384; /* FIXME totally magic. */
282 }
283
284 /* XXXX This over-commits. */
285 /* XXXX see also not above on cast on bufferevent_get_write_max_() */
286 limit = (int)bufferevent_get_read_max_(&beva->bev);
287 if (at_most >= (size_t)limit && limit >= 0)
288 at_most = limit;
289
290 if (beva->bev.read_suspended) {
291 bev_async_del_read(beva);
292 return;
293 }
294
295 bufferevent_incref_(bev);
296 if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
297 beva->ok = 0;
298 be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
299 bufferevent_decref_(bev);
300 } else {
301 beva->read_in_progress = at_most;
302 bufferevent_decrement_read_buckets_(&beva->bev, at_most);
303 bev_async_add_read(beva);
304 }
305
306 return;
307 }
308
309 static void
be_async_outbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)310 be_async_outbuf_callback(struct evbuffer *buf,
311 const struct evbuffer_cb_info *cbinfo,
312 void *arg)
313 {
314 struct bufferevent *bev = arg;
315 struct bufferevent_async *bev_async = upcast(bev);
316
317 /* If we added data to the outbuf and were not writing before,
318 * we may want to write now. */
319
320 bufferevent_incref_and_lock_(bev);
321
322 if (cbinfo->n_added)
323 bev_async_consider_writing(bev_async);
324
325 bufferevent_decref_and_unlock_(bev);
326 }
327
328 static void
be_async_inbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)329 be_async_inbuf_callback(struct evbuffer *buf,
330 const struct evbuffer_cb_info *cbinfo,
331 void *arg)
332 {
333 struct bufferevent *bev = arg;
334 struct bufferevent_async *bev_async = upcast(bev);
335
336 /* If we drained data from the inbuf and were not reading before,
337 * we may want to read now */
338
339 bufferevent_incref_and_lock_(bev);
340
341 if (cbinfo->n_deleted)
342 bev_async_consider_reading(bev_async);
343
344 bufferevent_decref_and_unlock_(bev);
345 }
346
347 static int
be_async_enable(struct bufferevent * buf,short what)348 be_async_enable(struct bufferevent *buf, short what)
349 {
350 struct bufferevent_async *bev_async = upcast(buf);
351
352 if (!bev_async->ok)
353 return -1;
354
355 if (bev_async->bev.connecting) {
356 /* Don't launch anything during connection attempts. */
357 return 0;
358 }
359
360 if (what & EV_READ)
361 BEV_RESET_GENERIC_READ_TIMEOUT(buf);
362 if (what & EV_WRITE)
363 BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
364
365 /* If we newly enable reading or writing, and we aren't reading or
366 writing already, consider launching a new read or write. */
367
368 if (what & EV_READ)
369 bev_async_consider_reading(bev_async);
370 if (what & EV_WRITE)
371 bev_async_consider_writing(bev_async);
372 return 0;
373 }
374
375 static int
be_async_disable(struct bufferevent * bev,short what)376 be_async_disable(struct bufferevent *bev, short what)
377 {
378 struct bufferevent_async *bev_async = upcast(bev);
379 /* XXXX If we disable reading or writing, we may want to consider
380 * canceling any in-progress read or write operation, though it might
381 * not work. */
382
383 if (what & EV_READ) {
384 BEV_DEL_GENERIC_READ_TIMEOUT(bev);
385 bev_async_del_read(bev_async);
386 }
387 if (what & EV_WRITE) {
388 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
389 bev_async_del_write(bev_async);
390 }
391
392 return 0;
393 }
394
395 static void
be_async_destruct(struct bufferevent * bev)396 be_async_destruct(struct bufferevent *bev)
397 {
398 struct bufferevent_async *bev_async = upcast(bev);
399 struct bufferevent_private *bev_p = BEV_UPCAST(bev);
400 evutil_socket_t fd;
401
402 EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
403 !upcast(bev)->read_in_progress);
404
405 bev_async_del_read(bev_async);
406 bev_async_del_write(bev_async);
407
408 fd = evbuffer_overlapped_get_fd_(bev->input);
409 if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
410 (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
411 evutil_closesocket(fd);
412 evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
413 }
414 }
415
416 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
417 * we use WSAGetOverlappedResult to translate. */
418 static void
bev_async_set_wsa_error(struct bufferevent * bev,struct event_overlapped * eo)419 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
420 {
421 DWORD bytes, flags;
422 evutil_socket_t fd;
423
424 fd = evbuffer_overlapped_get_fd_(bev->input);
425 WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
426 }
427
428 static int
be_async_flush(struct bufferevent * bev,short what,enum bufferevent_flush_mode mode)429 be_async_flush(struct bufferevent *bev, short what,
430 enum bufferevent_flush_mode mode)
431 {
432 return 0;
433 }
434
435 static void
connect_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)436 connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
437 ev_ssize_t nbytes, int ok)
438 {
439 struct bufferevent_async *bev_a = upcast_connect(eo);
440 struct bufferevent *bev = &bev_a->bev.bev;
441 evutil_socket_t sock;
442
443 BEV_LOCK(bev);
444
445 EVUTIL_ASSERT(bev_a->bev.connecting);
446 bev_a->bev.connecting = 0;
447 sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
448 /* XXXX Handle error? */
449 setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
450
451 if (ok)
452 bufferevent_async_set_connected_(bev);
453 else
454 bev_async_set_wsa_error(bev, eo);
455
456 be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
457
458 event_base_del_virtual_(bev->ev_base);
459
460 bufferevent_decref_and_unlock_(bev);
461 }
462
463 static void
read_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)464 read_complete(struct event_overlapped *eo, ev_uintptr_t key,
465 ev_ssize_t nbytes, int ok)
466 {
467 struct bufferevent_async *bev_a = upcast_read(eo);
468 struct bufferevent *bev = &bev_a->bev.bev;
469 short what = BEV_EVENT_READING;
470 ev_ssize_t amount_unread;
471 BEV_LOCK(bev);
472 EVUTIL_ASSERT(bev_a->read_in_progress);
473
474 amount_unread = bev_a->read_in_progress - nbytes;
475 evbuffer_commit_read_(bev->input, nbytes);
476 bev_a->read_in_progress = 0;
477 if (amount_unread)
478 bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
479
480 if (!ok)
481 bev_async_set_wsa_error(bev, eo);
482
483 if (bev_a->ok) {
484 if (ok && nbytes) {
485 BEV_RESET_GENERIC_READ_TIMEOUT(bev);
486 be_async_trigger_nolock(bev, EV_READ, 0);
487 bev_async_consider_reading(bev_a);
488 } else if (!ok) {
489 what |= BEV_EVENT_ERROR;
490 bev_a->ok = 0;
491 be_async_run_eventcb(bev, what, 0);
492 } else if (!nbytes) {
493 what |= BEV_EVENT_EOF;
494 bev_a->ok = 0;
495 be_async_run_eventcb(bev, what, 0);
496 }
497 }
498
499 bufferevent_decref_and_unlock_(bev);
500 }
501
502 static void
write_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)503 write_complete(struct event_overlapped *eo, ev_uintptr_t key,
504 ev_ssize_t nbytes, int ok)
505 {
506 struct bufferevent_async *bev_a = upcast_write(eo);
507 struct bufferevent *bev = &bev_a->bev.bev;
508 short what = BEV_EVENT_WRITING;
509 ev_ssize_t amount_unwritten;
510
511 BEV_LOCK(bev);
512 EVUTIL_ASSERT(bev_a->write_in_progress);
513
514 amount_unwritten = bev_a->write_in_progress - nbytes;
515 evbuffer_commit_write_(bev->output, nbytes);
516 bev_a->write_in_progress = 0;
517
518 if (amount_unwritten)
519 bufferevent_decrement_write_buckets_(&bev_a->bev,
520 -amount_unwritten);
521
522
523 if (!ok)
524 bev_async_set_wsa_error(bev, eo);
525
526 if (bev_a->ok) {
527 if (ok && nbytes) {
528 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
529 be_async_trigger_nolock(bev, EV_WRITE, 0);
530 bev_async_consider_writing(bev_a);
531 } else if (!ok) {
532 what |= BEV_EVENT_ERROR;
533 bev_a->ok = 0;
534 be_async_run_eventcb(bev, what, 0);
535 } else if (!nbytes) {
536 what |= BEV_EVENT_EOF;
537 bev_a->ok = 0;
538 be_async_run_eventcb(bev, what, 0);
539 }
540 }
541
542 bufferevent_decref_and_unlock_(bev);
543 }
544
545 struct bufferevent *
bufferevent_async_new_(struct event_base * base,evutil_socket_t fd,int options)546 bufferevent_async_new_(struct event_base *base,
547 evutil_socket_t fd, int options)
548 {
549 struct bufferevent_async *bev_a;
550 struct bufferevent *bev;
551 struct event_iocp_port *iocp;
552
553 options |= BEV_OPT_THREADSAFE;
554
555 if (!(iocp = event_base_get_iocp_(base)))
556 return NULL;
557
558 if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
559 if (fatal_error(GetLastError()))
560 return NULL;
561 }
562
563 if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
564 return NULL;
565
566 bev = &bev_a->bev.bev;
567 if (!(bev->input = evbuffer_overlapped_new_(fd))) {
568 mm_free(bev_a);
569 return NULL;
570 }
571 if (!(bev->output = evbuffer_overlapped_new_(fd))) {
572 evbuffer_free(bev->input);
573 mm_free(bev_a);
574 return NULL;
575 }
576
577 if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
578 options)<0)
579 goto err;
580
581 evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
582 evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
583
584 event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
585 event_overlapped_init_(&bev_a->read_overlapped, read_complete);
586 event_overlapped_init_(&bev_a->write_overlapped, write_complete);
587
588 bufferevent_init_generic_timeout_cbs_(bev);
589
590 bev_a->ok = fd >= 0;
591
592 return bev;
593 err:
594 bufferevent_free(&bev_a->bev.bev);
595 return NULL;
596 }
597
598 void
bufferevent_async_set_connected_(struct bufferevent * bev)599 bufferevent_async_set_connected_(struct bufferevent *bev)
600 {
601 struct bufferevent_async *bev_async = upcast(bev);
602 bev_async->ok = 1;
603 /* Now's a good time to consider reading/writing */
604 be_async_enable(bev, bev->enabled);
605 }
606
607 int
bufferevent_async_can_connect_(struct bufferevent * bev)608 bufferevent_async_can_connect_(struct bufferevent *bev)
609 {
610 const struct win32_extension_fns *ext =
611 event_get_win32_extension_fns_();
612
613 if (BEV_IS_ASYNC(bev) &&
614 event_base_get_iocp_(bev->ev_base) &&
615 ext && ext->ConnectEx)
616 return 1;
617
618 return 0;
619 }
620
621 int
bufferevent_async_connect_(struct bufferevent * bev,evutil_socket_t fd,const struct sockaddr * sa,int socklen)622 bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
623 const struct sockaddr *sa, int socklen)
624 {
625 BOOL rc;
626 struct bufferevent_async *bev_async = upcast(bev);
627 struct sockaddr_storage ss;
628 const struct win32_extension_fns *ext =
629 event_get_win32_extension_fns_();
630
631 EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
632
633 /* ConnectEx() requires that the socket be bound to an address
634 * with bind() before using, otherwise it will fail. We attempt
635 * to issue a bind() here, taking into account that the error
636 * code is set to WSAEINVAL when the socket is already bound. */
637 memset(&ss, 0, sizeof(ss));
638 if (sa->sa_family == AF_INET) {
639 struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
640 sin->sin_family = AF_INET;
641 sin->sin_addr.s_addr = INADDR_ANY;
642 } else if (sa->sa_family == AF_INET6) {
643 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
644 sin6->sin6_family = AF_INET6;
645 sin6->sin6_addr = in6addr_any;
646 } else {
647 /* Well, the user will have to bind() */
648 return -1;
649 }
650 if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
651 WSAGetLastError() != WSAEINVAL)
652 return -1;
653
654 event_base_add_virtual_(bev->ev_base);
655 bufferevent_incref_(bev);
656 rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
657 &bev_async->connect_overlapped.overlapped);
658 if (rc || WSAGetLastError() == ERROR_IO_PENDING)
659 return 0;
660
661 event_base_del_virtual_(bev->ev_base);
662 bufferevent_decref_(bev);
663
664 return -1;
665 }
666
667 static int
be_async_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)668 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
669 union bufferevent_ctrl_data *data)
670 {
671 switch (op) {
672 case BEV_CTRL_GET_FD:
673 data->fd = evbuffer_overlapped_get_fd_(bev->input);
674 return 0;
675 case BEV_CTRL_SET_FD: {
676 struct bufferevent_async *bev_a = upcast(bev);
677 struct event_iocp_port *iocp;
678
679 if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
680 return 0;
681 if (!(iocp = event_base_get_iocp_(bev->ev_base)))
682 return -1;
683 if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
684 if (fatal_error(GetLastError()))
685 return -1;
686 }
687 evbuffer_overlapped_set_fd_(bev->input, data->fd);
688 evbuffer_overlapped_set_fd_(bev->output, data->fd);
689 bev_a->ok = data->fd >= 0;
690 return 0;
691 }
692 case BEV_CTRL_CANCEL_ALL: {
693 struct bufferevent_async *bev_a = upcast(bev);
694 evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
695 if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
696 (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
697 closesocket(fd);
698 evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
699 }
700 bev_a->ok = 0;
701 return 0;
702 }
703 case BEV_CTRL_GET_UNDERLYING:
704 default:
705 return -1;
706 }
707 }
708
709
710