1 /* $NetBSD: bufferevent_async.c,v 1.7 2024/08/18 20:47:20 christos Exp $ */ 2 3 /* 4 * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 5 * 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. The name of the author may not be used to endorse or promote products 17 * derived from this software without specific prior written permission. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 21 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 22 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 24 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #include "event2/event-config.h" 32 #include "evconfig-private.h" 33 34 #ifdef EVENT__HAVE_SYS_TIME_H 35 #include <sys/time.h> 36 #endif 37 38 #include <errno.h> 39 #include <stdio.h> 40 #include <stdlib.h> 41 #include <string.h> 42 #ifdef EVENT__HAVE_STDARG_H 43 #include <stdarg.h> 44 #endif 45 #ifdef EVENT__HAVE_UNISTD_H 46 #include <unistd.h> 47 #endif 48 49 #ifdef _WIN32 50 #include <winsock2.h> 51 #include <winerror.h> 52 #include <ws2tcpip.h> 53 #endif 54 55 #include <sys/queue.h> 56 57 #include "event2/util.h" 58 #include "event2/bufferevent.h" 59 #include "event2/buffer.h" 60 #include "event2/bufferevent_struct.h" 61 #include "event2/event.h" 62 #include "event2/util.h" 63 #include "event-internal.h" 64 #include "log-internal.h" 65 #include "mm-internal.h" 66 #include "bufferevent-internal.h" 67 #include "util-internal.h" 68 #include "iocp-internal.h" 69 70 #ifndef SO_UPDATE_CONNECT_CONTEXT 71 /* Mingw is sometimes missing this */ 72 #define SO_UPDATE_CONNECT_CONTEXT 0x7010 73 #endif 74 75 /* prototypes */ 76 static int be_async_enable(struct bufferevent *, short); 77 static int be_async_disable(struct bufferevent *, short); 78 static void be_async_destruct(struct bufferevent *); 79 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 80 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 81 82 struct bufferevent_async { 83 struct bufferevent_private bev; 84 struct event_overlapped connect_overlapped; 85 struct event_overlapped read_overlapped; 86 struct event_overlapped write_overlapped; 87 size_t read_in_progress; 88 size_t write_in_progress; 89 unsigned ok : 1; 90 unsigned read_added : 1; 91 unsigned write_added : 1; 92 }; 93 94 const struct bufferevent_ops bufferevent_ops_async = { 95 "socket_async", 96 evutil_offsetof(struct bufferevent_async, bev.bev), 97 be_async_enable, 98 be_async_disable, 99 NULL, /* Unlink */ 100 be_async_destruct, 101 bufferevent_generic_adj_timeouts_, 102 be_async_flush, 103 be_async_ctrl, 104 }; 105 106 static inline void 107 be_async_run_eventcb(struct bufferevent *bev, short what, int options) 108 { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } 109 110 static inline void 111 be_async_trigger_nolock(struct bufferevent *bev, short what, int options) 112 { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } 113 114 static inline int 115 fatal_error(int err) 116 { 117 switch (err) { 118 /* We may have already associated this fd with a port. 119 * Let's hope it's this port, and that the error code 120 * for doing this neer changes. */ 121 case ERROR_INVALID_PARAMETER: 122 return 0; 123 } 124 return 1; 125 } 126 127 static inline struct bufferevent_async * 128 upcast(struct bufferevent *bev) 129 { 130 struct bufferevent_async *bev_a; 131 if (!BEV_IS_ASYNC(bev)) 132 return NULL; 133 bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 134 return bev_a; 135 } 136 137 static inline struct bufferevent_async * 138 upcast_connect(struct event_overlapped *eo) 139 { 140 struct bufferevent_async *bev_a; 141 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 142 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 143 return bev_a; 144 } 145 146 static inline struct bufferevent_async * 147 upcast_read(struct event_overlapped *eo) 148 { 149 struct bufferevent_async *bev_a; 150 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 151 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 152 return bev_a; 153 } 154 155 static inline struct bufferevent_async * 156 upcast_write(struct event_overlapped *eo) 157 { 158 struct bufferevent_async *bev_a; 159 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 160 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 161 return bev_a; 162 } 163 164 static void 165 bev_async_del_write(struct bufferevent_async *beva) 166 { 167 struct bufferevent *bev = &beva->bev.bev; 168 169 if (beva->write_added) { 170 beva->write_added = 0; 171 event_base_del_virtual_(bev->ev_base); 172 } 173 } 174 175 static void 176 bev_async_del_read(struct bufferevent_async *beva) 177 { 178 struct bufferevent *bev = &beva->bev.bev; 179 180 if (beva->read_added) { 181 beva->read_added = 0; 182 event_base_del_virtual_(bev->ev_base); 183 } 184 } 185 186 static void 187 bev_async_add_write(struct bufferevent_async *beva) 188 { 189 struct bufferevent *bev = &beva->bev.bev; 190 191 if (!beva->write_added) { 192 beva->write_added = 1; 193 event_base_add_virtual_(bev->ev_base); 194 } 195 } 196 197 static void 198 bev_async_add_read(struct bufferevent_async *beva) 199 { 200 struct bufferevent *bev = &beva->bev.bev; 201 202 if (!beva->read_added) { 203 beva->read_added = 1; 204 event_base_add_virtual_(bev->ev_base); 205 } 206 } 207 208 static void 209 bev_async_consider_writing(struct bufferevent_async *beva) 210 { 211 size_t at_most; 212 int limit; 213 struct bufferevent *bev = &beva->bev.bev; 214 215 /* Don't write if there's a write in progress, or we do not 216 * want to write, or when there's nothing left to write. */ 217 if (beva->write_in_progress || beva->bev.connecting) 218 return; 219 if (!beva->ok || !(bev->enabled&EV_WRITE) || 220 !evbuffer_get_length(bev->output)) { 221 bev_async_del_write(beva); 222 return; 223 } 224 225 at_most = evbuffer_get_length(bev->output); 226 227 /* This is safe so long as bufferevent_get_write_max never returns 228 * more than INT_MAX. That's true for now. XXXX */ 229 limit = (int)bufferevent_get_write_max_(&beva->bev); 230 if (at_most >= (size_t)limit && limit >= 0) 231 at_most = limit; 232 233 if (beva->bev.write_suspended) { 234 bev_async_del_write(beva); 235 return; 236 } 237 238 /* XXXX doesn't respect low-water mark very well. */ 239 bufferevent_incref_(bev); 240 if (evbuffer_launch_write_(bev->output, at_most, 241 &beva->write_overlapped)) { 242 bufferevent_decref_(bev); 243 beva->ok = 0; 244 be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); 245 } else { 246 beva->write_in_progress = at_most; 247 bufferevent_decrement_write_buckets_(&beva->bev, at_most); 248 bev_async_add_write(beva); 249 } 250 } 251 252 static void 253 bev_async_consider_reading(struct bufferevent_async *beva) 254 { 255 size_t cur_size; 256 size_t read_high; 257 size_t at_most; 258 int limit; 259 struct bufferevent *bev = &beva->bev.bev; 260 261 /* Don't read if there is a read in progress, or we do not 262 * want to read. */ 263 if (beva->read_in_progress || beva->bev.connecting) 264 return; 265 if (!beva->ok || !(bev->enabled&EV_READ)) { 266 bev_async_del_read(beva); 267 return; 268 } 269 270 /* Don't read if we're full */ 271 cur_size = evbuffer_get_length(bev->input); 272 read_high = bev->wm_read.high; 273 if (read_high) { 274 if (cur_size >= read_high) { 275 bev_async_del_read(beva); 276 return; 277 } 278 at_most = read_high - cur_size; 279 } else { 280 at_most = 16384; /* FIXME totally magic. */ 281 } 282 283 /* XXXX This over-commits. */ 284 /* XXXX see also not above on cast on bufferevent_get_write_max_() */ 285 limit = (int)bufferevent_get_read_max_(&beva->bev); 286 if (at_most >= (size_t)limit && limit >= 0) 287 at_most = limit; 288 289 if (beva->bev.read_suspended) { 290 bev_async_del_read(beva); 291 return; 292 } 293 294 bufferevent_incref_(bev); 295 if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { 296 beva->ok = 0; 297 be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); 298 bufferevent_decref_(bev); 299 } else { 300 beva->read_in_progress = at_most; 301 bufferevent_decrement_read_buckets_(&beva->bev, at_most); 302 bev_async_add_read(beva); 303 } 304 305 return; 306 } 307 308 static void 309 be_async_outbuf_callback(struct evbuffer *buf, 310 const struct evbuffer_cb_info *cbinfo, 311 void *arg) 312 { 313 struct bufferevent *bev = arg; 314 struct bufferevent_async *bev_async = upcast(bev); 315 316 /* If we added data to the outbuf and were not writing before, 317 * we may want to write now. */ 318 319 bufferevent_incref_and_lock_(bev); 320 321 if (cbinfo->n_added) 322 bev_async_consider_writing(bev_async); 323 324 bufferevent_decref_and_unlock_(bev); 325 } 326 327 static void 328 be_async_inbuf_callback(struct evbuffer *buf, 329 const struct evbuffer_cb_info *cbinfo, 330 void *arg) 331 { 332 struct bufferevent *bev = arg; 333 struct bufferevent_async *bev_async = upcast(bev); 334 335 /* If we drained data from the inbuf and were not reading before, 336 * we may want to read now */ 337 338 bufferevent_incref_and_lock_(bev); 339 340 if (cbinfo->n_deleted) 341 bev_async_consider_reading(bev_async); 342 343 bufferevent_decref_and_unlock_(bev); 344 } 345 346 static int 347 be_async_enable(struct bufferevent *buf, short what) 348 { 349 struct bufferevent_async *bev_async = upcast(buf); 350 351 if (!bev_async->ok) 352 return -1; 353 354 if (bev_async->bev.connecting) { 355 /* Don't launch anything during connection attempts. */ 356 return 0; 357 } 358 359 if (what & EV_READ) 360 BEV_RESET_GENERIC_READ_TIMEOUT(buf); 361 if (what & EV_WRITE) 362 BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 363 364 /* If we newly enable reading or writing, and we aren't reading or 365 writing already, consider launching a new read or write. */ 366 367 if (what & EV_READ) 368 bev_async_consider_reading(bev_async); 369 if (what & EV_WRITE) 370 bev_async_consider_writing(bev_async); 371 return 0; 372 } 373 374 static int 375 be_async_disable(struct bufferevent *bev, short what) 376 { 377 struct bufferevent_async *bev_async = upcast(bev); 378 /* XXXX If we disable reading or writing, we may want to consider 379 * canceling any in-progress read or write operation, though it might 380 * not work. */ 381 382 if (what & EV_READ) { 383 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 384 bev_async_del_read(bev_async); 385 } 386 if (what & EV_WRITE) { 387 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 388 bev_async_del_write(bev_async); 389 } 390 391 return 0; 392 } 393 394 static void 395 be_async_destruct(struct bufferevent *bev) 396 { 397 struct bufferevent_async *bev_async = upcast(bev); 398 struct bufferevent_private *bev_p = BEV_UPCAST(bev); 399 evutil_socket_t fd; 400 401 EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 402 !upcast(bev)->read_in_progress); 403 404 bev_async_del_read(bev_async); 405 bev_async_del_write(bev_async); 406 407 fd = evbuffer_overlapped_get_fd_(bev->input); 408 if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && 409 (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { 410 evutil_closesocket(fd); 411 evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); 412 } 413 } 414 415 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 416 * we use WSAGetOverlappedResult to translate. */ 417 static void 418 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 419 { 420 DWORD bytes, flags; 421 evutil_socket_t fd; 422 423 fd = evbuffer_overlapped_get_fd_(bev->input); 424 WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 425 } 426 427 static int 428 be_async_flush(struct bufferevent *bev, short what, 429 enum bufferevent_flush_mode mode) 430 { 431 return 0; 432 } 433 434 static void 435 connect_complete(struct event_overlapped *eo, ev_uintptr_t key, 436 ev_ssize_t nbytes, int ok) 437 { 438 struct bufferevent_async *bev_a = upcast_connect(eo); 439 struct bufferevent *bev = &bev_a->bev.bev; 440 evutil_socket_t sock; 441 442 BEV_LOCK(bev); 443 444 EVUTIL_ASSERT(bev_a->bev.connecting); 445 bev_a->bev.connecting = 0; 446 sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); 447 /* XXXX Handle error? */ 448 setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 449 450 if (ok) 451 bufferevent_async_set_connected_(bev); 452 else 453 bev_async_set_wsa_error(bev, eo); 454 455 be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); 456 457 event_base_del_virtual_(bev->ev_base); 458 459 bufferevent_decref_and_unlock_(bev); 460 } 461 462 static void 463 read_complete(struct event_overlapped *eo, ev_uintptr_t key, 464 ev_ssize_t nbytes, int ok) 465 { 466 struct bufferevent_async *bev_a = upcast_read(eo); 467 struct bufferevent *bev = &bev_a->bev.bev; 468 short what = BEV_EVENT_READING; 469 ev_ssize_t amount_unread; 470 BEV_LOCK(bev); 471 EVUTIL_ASSERT(bev_a->read_in_progress); 472 473 amount_unread = bev_a->read_in_progress - nbytes; 474 evbuffer_commit_read_(bev->input, nbytes); 475 bev_a->read_in_progress = 0; 476 if (amount_unread) 477 bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); 478 479 if (!ok) 480 bev_async_set_wsa_error(bev, eo); 481 482 if (bev_a->ok) { 483 if (ok && nbytes) { 484 BEV_RESET_GENERIC_READ_TIMEOUT(bev); 485 be_async_trigger_nolock(bev, EV_READ, 0); 486 bev_async_consider_reading(bev_a); 487 } else if (!ok) { 488 what |= BEV_EVENT_ERROR; 489 bev_a->ok = 0; 490 be_async_run_eventcb(bev, what, 0); 491 } else if (!nbytes) { 492 what |= BEV_EVENT_EOF; 493 bev_a->ok = 0; 494 be_async_run_eventcb(bev, what, 0); 495 } 496 } 497 498 bufferevent_decref_and_unlock_(bev); 499 } 500 501 static void 502 write_complete(struct event_overlapped *eo, ev_uintptr_t key, 503 ev_ssize_t nbytes, int ok) 504 { 505 struct bufferevent_async *bev_a = upcast_write(eo); 506 struct bufferevent *bev = &bev_a->bev.bev; 507 short what = BEV_EVENT_WRITING; 508 ev_ssize_t amount_unwritten; 509 510 BEV_LOCK(bev); 511 EVUTIL_ASSERT(bev_a->write_in_progress); 512 513 amount_unwritten = bev_a->write_in_progress - nbytes; 514 evbuffer_commit_write_(bev->output, nbytes); 515 bev_a->write_in_progress = 0; 516 517 if (amount_unwritten) 518 bufferevent_decrement_write_buckets_(&bev_a->bev, 519 -amount_unwritten); 520 521 522 if (!ok) 523 bev_async_set_wsa_error(bev, eo); 524 525 if (bev_a->ok) { 526 if (ok && nbytes) { 527 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 528 be_async_trigger_nolock(bev, EV_WRITE, 0); 529 bev_async_consider_writing(bev_a); 530 } else if (!ok) { 531 what |= BEV_EVENT_ERROR; 532 bev_a->ok = 0; 533 be_async_run_eventcb(bev, what, 0); 534 } else if (!nbytes) { 535 what |= BEV_EVENT_EOF; 536 bev_a->ok = 0; 537 be_async_run_eventcb(bev, what, 0); 538 } 539 } 540 541 bufferevent_decref_and_unlock_(bev); 542 } 543 544 struct bufferevent * 545 bufferevent_async_new_(struct event_base *base, 546 evutil_socket_t fd, int options) 547 { 548 struct bufferevent_async *bev_a; 549 struct bufferevent *bev; 550 struct event_iocp_port *iocp; 551 552 options |= BEV_OPT_THREADSAFE; 553 554 if (!(iocp = event_base_get_iocp_(base))) 555 return NULL; 556 557 if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { 558 if (fatal_error(GetLastError())) 559 return NULL; 560 } 561 562 if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 563 return NULL; 564 565 bev = &bev_a->bev.bev; 566 if (!(bev->input = evbuffer_overlapped_new_(fd))) { 567 mm_free(bev_a); 568 return NULL; 569 } 570 if (!(bev->output = evbuffer_overlapped_new_(fd))) { 571 evbuffer_free(bev->input); 572 mm_free(bev_a); 573 return NULL; 574 } 575 576 if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, 577 options)<0) 578 goto err; 579 580 evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 581 evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 582 583 event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); 584 event_overlapped_init_(&bev_a->read_overlapped, read_complete); 585 event_overlapped_init_(&bev_a->write_overlapped, write_complete); 586 587 bufferevent_init_generic_timeout_cbs_(bev); 588 589 bev_a->ok = fd >= 0; 590 591 return bev; 592 err: 593 bufferevent_free(&bev_a->bev.bev); 594 return NULL; 595 } 596 597 void 598 bufferevent_async_set_connected_(struct bufferevent *bev) 599 { 600 struct bufferevent_async *bev_async = upcast(bev); 601 bev_async->ok = 1; 602 /* Now's a good time to consider reading/writing */ 603 be_async_enable(bev, bev->enabled); 604 } 605 606 int 607 bufferevent_async_can_connect_(struct bufferevent *bev) 608 { 609 const struct win32_extension_fns *ext = 610 event_get_win32_extension_fns_(); 611 612 if (BEV_IS_ASYNC(bev) && 613 event_base_get_iocp_(bev->ev_base) && 614 ext && ext->ConnectEx) 615 return 1; 616 617 return 0; 618 } 619 620 int 621 bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, 622 const struct sockaddr *sa, int socklen) 623 { 624 BOOL rc; 625 struct bufferevent_async *bev_async = upcast(bev); 626 struct sockaddr_storage ss; 627 const struct win32_extension_fns *ext = 628 event_get_win32_extension_fns_(); 629 630 EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 631 632 /* ConnectEx() requires that the socket be bound to an address 633 * with bind() before using, otherwise it will fail. We attempt 634 * to issue a bind() here, taking into account that the error 635 * code is set to WSAEINVAL when the socket is already bound. */ 636 memset(&ss, 0, sizeof(ss)); 637 if (sa->sa_family == AF_INET) { 638 struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 639 sin->sin_family = AF_INET; 640 sin->sin_addr.s_addr = INADDR_ANY; 641 } else if (sa->sa_family == AF_INET6) { 642 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 643 sin6->sin6_family = AF_INET6; 644 sin6->sin6_addr = in6addr_any; 645 } else { 646 /* Well, the user will have to bind() */ 647 return -1; 648 } 649 if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 650 WSAGetLastError() != WSAEINVAL) 651 return -1; 652 653 event_base_add_virtual_(bev->ev_base); 654 bufferevent_incref_(bev); 655 rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 656 &bev_async->connect_overlapped.overlapped); 657 if (rc || WSAGetLastError() == ERROR_IO_PENDING) 658 return 0; 659 660 event_base_del_virtual_(bev->ev_base); 661 bufferevent_decref_(bev); 662 663 return -1; 664 } 665 666 static int 667 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 668 union bufferevent_ctrl_data *data) 669 { 670 switch (op) { 671 case BEV_CTRL_GET_FD: 672 data->fd = evbuffer_overlapped_get_fd_(bev->input); 673 return 0; 674 case BEV_CTRL_SET_FD: { 675 struct bufferevent_async *bev_a = upcast(bev); 676 struct event_iocp_port *iocp; 677 678 if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) 679 return 0; 680 if (!(iocp = event_base_get_iocp_(bev->ev_base))) 681 return -1; 682 if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) { 683 if (fatal_error(GetLastError())) 684 return -1; 685 } 686 evbuffer_overlapped_set_fd_(bev->input, data->fd); 687 evbuffer_overlapped_set_fd_(bev->output, data->fd); 688 bev_a->ok = data->fd >= 0; 689 return 0; 690 } 691 case BEV_CTRL_CANCEL_ALL: { 692 struct bufferevent_async *bev_a = upcast(bev); 693 evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); 694 if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && 695 (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 696 closesocket(fd); 697 evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); 698 } 699 bev_a->ok = 0; 700 return 0; 701 } 702 case BEV_CTRL_GET_UNDERLYING: 703 default: 704 return -1; 705 } 706 } 707 708 709