1 /* $NetBSD: bufferevent_async.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $ */ 2 /* 3 * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 4 * 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "event2/event-config.h" 31 #include <sys/cdefs.h> 32 __RCSID("$NetBSD: bufferevent_async.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $"); 33 #include "evconfig-private.h" 34 35 #ifdef EVENT__HAVE_SYS_TIME_H 36 #include <sys/time.h> 37 #endif 38 39 #include <errno.h> 40 #include <stdio.h> 41 #include <stdlib.h> 42 #include <string.h> 43 #ifdef EVENT__HAVE_STDARG_H 44 #include <stdarg.h> 45 #endif 46 #ifdef EVENT__HAVE_UNISTD_H 47 #include <unistd.h> 48 #endif 49 50 #ifdef _WIN32 51 #include <winsock2.h> 52 #include <winerror.h> 53 #include <ws2tcpip.h> 54 #endif 55 56 #include <sys/queue.h> 57 58 #include "event2/util.h" 59 #include "event2/bufferevent.h" 60 #include "event2/buffer.h" 61 #include "event2/bufferevent_struct.h" 62 #include "event2/event.h" 63 #include "event2/util.h" 64 #include "event-internal.h" 65 #include "log-internal.h" 66 #include "mm-internal.h" 67 #include "bufferevent-internal.h" 68 #include "util-internal.h" 69 #include "iocp-internal.h" 70 71 #ifndef SO_UPDATE_CONNECT_CONTEXT 72 /* Mingw is sometimes missing this */ 73 #define SO_UPDATE_CONNECT_CONTEXT 0x7010 74 #endif 75 76 /* prototypes */ 77 static int be_async_enable(struct bufferevent *, short); 78 static int be_async_disable(struct bufferevent *, short); 79 static void be_async_destruct(struct bufferevent *); 80 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 81 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 82 83 struct bufferevent_async { 84 struct bufferevent_private bev; 85 struct event_overlapped connect_overlapped; 86 struct event_overlapped read_overlapped; 87 struct event_overlapped write_overlapped; 88 size_t read_in_progress; 89 size_t write_in_progress; 90 unsigned ok : 1; 91 unsigned read_added : 1; 92 unsigned write_added : 1; 93 }; 94 95 const struct bufferevent_ops bufferevent_ops_async = { 96 "socket_async", 97 evutil_offsetof(struct bufferevent_async, bev.bev), 98 be_async_enable, 99 be_async_disable, 100 NULL, /* Unlink */ 101 be_async_destruct, 102 bufferevent_generic_adj_timeouts_, 103 be_async_flush, 104 be_async_ctrl, 105 }; 106 107 static inline void 108 be_async_run_eventcb(struct bufferevent *bev, short what, int options) 109 { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } 110 111 static inline void 112 be_async_trigger_nolock(struct bufferevent *bev, short what, int options) 113 { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } 114 115 static inline int 116 fatal_error(int err) 117 { 118 switch (err) { 119 /* We may have already associated this fd with a port. 120 * Let's hope it's this port, and that the error code 121 * for doing this neer changes. */ 122 case ERROR_INVALID_PARAMETER: 123 return 0; 124 } 125 return 1; 126 } 127 128 static inline struct bufferevent_async * 129 upcast(struct bufferevent *bev) 130 { 131 struct bufferevent_async *bev_a; 132 if (!BEV_IS_ASYNC(bev)) 133 return NULL; 134 bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 135 return bev_a; 136 } 137 138 static inline struct bufferevent_async * 139 upcast_connect(struct event_overlapped *eo) 140 { 141 struct bufferevent_async *bev_a; 142 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 143 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 144 return bev_a; 145 } 146 147 static inline struct bufferevent_async * 148 upcast_read(struct event_overlapped *eo) 149 { 150 struct bufferevent_async *bev_a; 151 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 152 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 153 return bev_a; 154 } 155 156 static inline struct bufferevent_async * 157 upcast_write(struct event_overlapped *eo) 158 { 159 struct bufferevent_async *bev_a; 160 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 161 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 162 return bev_a; 163 } 164 165 static void 166 bev_async_del_write(struct bufferevent_async *beva) 167 { 168 struct bufferevent *bev = &beva->bev.bev; 169 170 if (beva->write_added) { 171 beva->write_added = 0; 172 event_base_del_virtual_(bev->ev_base); 173 } 174 } 175 176 static void 177 bev_async_del_read(struct bufferevent_async *beva) 178 { 179 struct bufferevent *bev = &beva->bev.bev; 180 181 if (beva->read_added) { 182 beva->read_added = 0; 183 event_base_del_virtual_(bev->ev_base); 184 } 185 } 186 187 static void 188 bev_async_add_write(struct bufferevent_async *beva) 189 { 190 struct bufferevent *bev = &beva->bev.bev; 191 192 if (!beva->write_added) { 193 beva->write_added = 1; 194 event_base_add_virtual_(bev->ev_base); 195 } 196 } 197 198 static void 199 bev_async_add_read(struct bufferevent_async *beva) 200 { 201 struct bufferevent *bev = &beva->bev.bev; 202 203 if (!beva->read_added) { 204 beva->read_added = 1; 205 event_base_add_virtual_(bev->ev_base); 206 } 207 } 208 209 static void 210 bev_async_consider_writing(struct bufferevent_async *beva) 211 { 212 size_t at_most; 213 int limit; 214 struct bufferevent *bev = &beva->bev.bev; 215 216 /* Don't write if there's a write in progress, or we do not 217 * want to write, or when there's nothing left to write. */ 218 if (beva->write_in_progress || beva->bev.connecting) 219 return; 220 if (!beva->ok || !(bev->enabled&EV_WRITE) || 221 !evbuffer_get_length(bev->output)) { 222 bev_async_del_write(beva); 223 return; 224 } 225 226 at_most = evbuffer_get_length(bev->output); 227 228 /* This is safe so long as bufferevent_get_write_max never returns 229 * more than INT_MAX. That's true for now. XXXX */ 230 limit = (int)bufferevent_get_write_max_(&beva->bev); 231 if (at_most >= (size_t)limit && limit >= 0) 232 at_most = limit; 233 234 if (beva->bev.write_suspended) { 235 bev_async_del_write(beva); 236 return; 237 } 238 239 /* XXXX doesn't respect low-water mark very well. */ 240 bufferevent_incref_(bev); 241 if (evbuffer_launch_write_(bev->output, at_most, 242 &beva->write_overlapped)) { 243 bufferevent_decref_(bev); 244 beva->ok = 0; 245 be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); 246 } else { 247 beva->write_in_progress = at_most; 248 bufferevent_decrement_write_buckets_(&beva->bev, at_most); 249 bev_async_add_write(beva); 250 } 251 } 252 253 static void 254 bev_async_consider_reading(struct bufferevent_async *beva) 255 { 256 size_t cur_size; 257 size_t read_high; 258 size_t at_most; 259 int limit; 260 struct bufferevent *bev = &beva->bev.bev; 261 262 /* Don't read if there is a read in progress, or we do not 263 * want to read. */ 264 if (beva->read_in_progress || beva->bev.connecting) 265 return; 266 if (!beva->ok || !(bev->enabled&EV_READ)) { 267 bev_async_del_read(beva); 268 return; 269 } 270 271 /* Don't read if we're full */ 272 cur_size = evbuffer_get_length(bev->input); 273 read_high = bev->wm_read.high; 274 if (read_high) { 275 if (cur_size >= read_high) { 276 bev_async_del_read(beva); 277 return; 278 } 279 at_most = read_high - cur_size; 280 } else { 281 at_most = 16384; /* FIXME totally magic. */ 282 } 283 284 /* XXXX This over-commits. */ 285 /* XXXX see also not above on cast on bufferevent_get_write_max_() */ 286 limit = (int)bufferevent_get_read_max_(&beva->bev); 287 if (at_most >= (size_t)limit && limit >= 0) 288 at_most = limit; 289 290 if (beva->bev.read_suspended) { 291 bev_async_del_read(beva); 292 return; 293 } 294 295 bufferevent_incref_(bev); 296 if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { 297 beva->ok = 0; 298 be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); 299 bufferevent_decref_(bev); 300 } else { 301 beva->read_in_progress = at_most; 302 bufferevent_decrement_read_buckets_(&beva->bev, at_most); 303 bev_async_add_read(beva); 304 } 305 306 return; 307 } 308 309 static void 310 be_async_outbuf_callback(struct evbuffer *buf, 311 const struct evbuffer_cb_info *cbinfo, 312 void *arg) 313 { 314 struct bufferevent *bev = arg; 315 struct bufferevent_async *bev_async = upcast(bev); 316 317 /* If we added data to the outbuf and were not writing before, 318 * we may want to write now. */ 319 320 bufferevent_incref_and_lock_(bev); 321 322 if (cbinfo->n_added) 323 bev_async_consider_writing(bev_async); 324 325 bufferevent_decref_and_unlock_(bev); 326 } 327 328 static void 329 be_async_inbuf_callback(struct evbuffer *buf, 330 const struct evbuffer_cb_info *cbinfo, 331 void *arg) 332 { 333 struct bufferevent *bev = arg; 334 struct bufferevent_async *bev_async = upcast(bev); 335 336 /* If we drained data from the inbuf and were not reading before, 337 * we may want to read now */ 338 339 bufferevent_incref_and_lock_(bev); 340 341 if (cbinfo->n_deleted) 342 bev_async_consider_reading(bev_async); 343 344 bufferevent_decref_and_unlock_(bev); 345 } 346 347 static int 348 be_async_enable(struct bufferevent *buf, short what) 349 { 350 struct bufferevent_async *bev_async = upcast(buf); 351 352 if (!bev_async->ok) 353 return -1; 354 355 if (bev_async->bev.connecting) { 356 /* Don't launch anything during connection attempts. */ 357 return 0; 358 } 359 360 if (what & EV_READ) 361 BEV_RESET_GENERIC_READ_TIMEOUT(buf); 362 if (what & EV_WRITE) 363 BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 364 365 /* If we newly enable reading or writing, and we aren't reading or 366 writing already, consider launching a new read or write. */ 367 368 if (what & EV_READ) 369 bev_async_consider_reading(bev_async); 370 if (what & EV_WRITE) 371 bev_async_consider_writing(bev_async); 372 return 0; 373 } 374 375 static int 376 be_async_disable(struct bufferevent *bev, short what) 377 { 378 struct bufferevent_async *bev_async = upcast(bev); 379 /* XXXX If we disable reading or writing, we may want to consider 380 * canceling any in-progress read or write operation, though it might 381 * not work. */ 382 383 if (what & EV_READ) { 384 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 385 bev_async_del_read(bev_async); 386 } 387 if (what & EV_WRITE) { 388 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 389 bev_async_del_write(bev_async); 390 } 391 392 return 0; 393 } 394 395 static void 396 be_async_destruct(struct bufferevent *bev) 397 { 398 struct bufferevent_async *bev_async = upcast(bev); 399 struct bufferevent_private *bev_p = BEV_UPCAST(bev); 400 evutil_socket_t fd; 401 402 EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 403 !upcast(bev)->read_in_progress); 404 405 bev_async_del_read(bev_async); 406 bev_async_del_write(bev_async); 407 408 fd = evbuffer_overlapped_get_fd_(bev->input); 409 if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && 410 (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { 411 evutil_closesocket(fd); 412 evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); 413 } 414 } 415 416 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 417 * we use WSAGetOverlappedResult to translate. */ 418 static void 419 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 420 { 421 DWORD bytes, flags; 422 evutil_socket_t fd; 423 424 fd = evbuffer_overlapped_get_fd_(bev->input); 425 WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 426 } 427 428 static int 429 be_async_flush(struct bufferevent *bev, short what, 430 enum bufferevent_flush_mode mode) 431 { 432 return 0; 433 } 434 435 static void 436 connect_complete(struct event_overlapped *eo, ev_uintptr_t key, 437 ev_ssize_t nbytes, int ok) 438 { 439 struct bufferevent_async *bev_a = upcast_connect(eo); 440 struct bufferevent *bev = &bev_a->bev.bev; 441 evutil_socket_t sock; 442 443 BEV_LOCK(bev); 444 445 EVUTIL_ASSERT(bev_a->bev.connecting); 446 bev_a->bev.connecting = 0; 447 sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); 448 /* XXXX Handle error? */ 449 setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 450 451 if (ok) 452 bufferevent_async_set_connected_(bev); 453 else 454 bev_async_set_wsa_error(bev, eo); 455 456 be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); 457 458 event_base_del_virtual_(bev->ev_base); 459 460 bufferevent_decref_and_unlock_(bev); 461 } 462 463 static void 464 read_complete(struct event_overlapped *eo, ev_uintptr_t key, 465 ev_ssize_t nbytes, int ok) 466 { 467 struct bufferevent_async *bev_a = upcast_read(eo); 468 struct bufferevent *bev = &bev_a->bev.bev; 469 short what = BEV_EVENT_READING; 470 ev_ssize_t amount_unread; 471 BEV_LOCK(bev); 472 EVUTIL_ASSERT(bev_a->read_in_progress); 473 474 amount_unread = bev_a->read_in_progress - nbytes; 475 evbuffer_commit_read_(bev->input, nbytes); 476 bev_a->read_in_progress = 0; 477 if (amount_unread) 478 bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); 479 480 if (!ok) 481 bev_async_set_wsa_error(bev, eo); 482 483 if (bev_a->ok) { 484 if (ok && nbytes) { 485 BEV_RESET_GENERIC_READ_TIMEOUT(bev); 486 be_async_trigger_nolock(bev, EV_READ, 0); 487 bev_async_consider_reading(bev_a); 488 } else if (!ok) { 489 what |= BEV_EVENT_ERROR; 490 bev_a->ok = 0; 491 be_async_run_eventcb(bev, what, 0); 492 } else if (!nbytes) { 493 what |= BEV_EVENT_EOF; 494 bev_a->ok = 0; 495 be_async_run_eventcb(bev, what, 0); 496 } 497 } 498 499 bufferevent_decref_and_unlock_(bev); 500 } 501 502 static void 503 write_complete(struct event_overlapped *eo, ev_uintptr_t key, 504 ev_ssize_t nbytes, int ok) 505 { 506 struct bufferevent_async *bev_a = upcast_write(eo); 507 struct bufferevent *bev = &bev_a->bev.bev; 508 short what = BEV_EVENT_WRITING; 509 ev_ssize_t amount_unwritten; 510 511 BEV_LOCK(bev); 512 EVUTIL_ASSERT(bev_a->write_in_progress); 513 514 amount_unwritten = bev_a->write_in_progress - nbytes; 515 evbuffer_commit_write_(bev->output, nbytes); 516 bev_a->write_in_progress = 0; 517 518 if (amount_unwritten) 519 bufferevent_decrement_write_buckets_(&bev_a->bev, 520 -amount_unwritten); 521 522 523 if (!ok) 524 bev_async_set_wsa_error(bev, eo); 525 526 if (bev_a->ok) { 527 if (ok && nbytes) { 528 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 529 be_async_trigger_nolock(bev, EV_WRITE, 0); 530 bev_async_consider_writing(bev_a); 531 } else if (!ok) { 532 what |= BEV_EVENT_ERROR; 533 bev_a->ok = 0; 534 be_async_run_eventcb(bev, what, 0); 535 } else if (!nbytes) { 536 what |= BEV_EVENT_EOF; 537 bev_a->ok = 0; 538 be_async_run_eventcb(bev, what, 0); 539 } 540 } 541 542 bufferevent_decref_and_unlock_(bev); 543 } 544 545 struct bufferevent * 546 bufferevent_async_new_(struct event_base *base, 547 evutil_socket_t fd, int options) 548 { 549 struct bufferevent_async *bev_a; 550 struct bufferevent *bev; 551 struct event_iocp_port *iocp; 552 553 options |= BEV_OPT_THREADSAFE; 554 555 if (!(iocp = event_base_get_iocp_(base))) 556 return NULL; 557 558 if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { 559 if (fatal_error(GetLastError())) 560 return NULL; 561 } 562 563 if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 564 return NULL; 565 566 bev = &bev_a->bev.bev; 567 if (!(bev->input = evbuffer_overlapped_new_(fd))) { 568 mm_free(bev_a); 569 return NULL; 570 } 571 if (!(bev->output = evbuffer_overlapped_new_(fd))) { 572 evbuffer_free(bev->input); 573 mm_free(bev_a); 574 return NULL; 575 } 576 577 if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, 578 options)<0) 579 goto err; 580 581 evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 582 evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 583 584 event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); 585 event_overlapped_init_(&bev_a->read_overlapped, read_complete); 586 event_overlapped_init_(&bev_a->write_overlapped, write_complete); 587 588 bufferevent_init_generic_timeout_cbs_(bev); 589 590 bev_a->ok = fd >= 0; 591 592 return bev; 593 err: 594 bufferevent_free(&bev_a->bev.bev); 595 return NULL; 596 } 597 598 void 599 bufferevent_async_set_connected_(struct bufferevent *bev) 600 { 601 struct bufferevent_async *bev_async = upcast(bev); 602 bev_async->ok = 1; 603 /* Now's a good time to consider reading/writing */ 604 be_async_enable(bev, bev->enabled); 605 } 606 607 int 608 bufferevent_async_can_connect_(struct bufferevent *bev) 609 { 610 const struct win32_extension_fns *ext = 611 event_get_win32_extension_fns_(); 612 613 if (BEV_IS_ASYNC(bev) && 614 event_base_get_iocp_(bev->ev_base) && 615 ext && ext->ConnectEx) 616 return 1; 617 618 return 0; 619 } 620 621 int 622 bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, 623 const struct sockaddr *sa, int socklen) 624 { 625 BOOL rc; 626 struct bufferevent_async *bev_async = upcast(bev); 627 struct sockaddr_storage ss; 628 const struct win32_extension_fns *ext = 629 event_get_win32_extension_fns_(); 630 631 EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 632 633 /* ConnectEx() requires that the socket be bound to an address 634 * with bind() before using, otherwise it will fail. We attempt 635 * to issue a bind() here, taking into account that the error 636 * code is set to WSAEINVAL when the socket is already bound. */ 637 memset(&ss, 0, sizeof(ss)); 638 if (sa->sa_family == AF_INET) { 639 struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 640 sin->sin_family = AF_INET; 641 sin->sin_addr.s_addr = INADDR_ANY; 642 } else if (sa->sa_family == AF_INET6) { 643 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 644 sin6->sin6_family = AF_INET6; 645 sin6->sin6_addr = in6addr_any; 646 } else { 647 /* Well, the user will have to bind() */ 648 return -1; 649 } 650 if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 651 WSAGetLastError() != WSAEINVAL) 652 return -1; 653 654 event_base_add_virtual_(bev->ev_base); 655 bufferevent_incref_(bev); 656 rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 657 &bev_async->connect_overlapped.overlapped); 658 if (rc || WSAGetLastError() == ERROR_IO_PENDING) 659 return 0; 660 661 event_base_del_virtual_(bev->ev_base); 662 bufferevent_decref_(bev); 663 664 return -1; 665 } 666 667 static int 668 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 669 union bufferevent_ctrl_data *data) 670 { 671 switch (op) { 672 case BEV_CTRL_GET_FD: 673 data->fd = evbuffer_overlapped_get_fd_(bev->input); 674 return 0; 675 case BEV_CTRL_SET_FD: { 676 struct bufferevent_async *bev_a = upcast(bev); 677 struct event_iocp_port *iocp; 678 679 if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) 680 return 0; 681 if (!(iocp = event_base_get_iocp_(bev->ev_base))) 682 return -1; 683 if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) { 684 if (fatal_error(GetLastError())) 685 return -1; 686 } 687 evbuffer_overlapped_set_fd_(bev->input, data->fd); 688 evbuffer_overlapped_set_fd_(bev->output, data->fd); 689 bev_a->ok = data->fd >= 0; 690 return 0; 691 } 692 case BEV_CTRL_CANCEL_ALL: { 693 struct bufferevent_async *bev_a = upcast(bev); 694 evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); 695 if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && 696 (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 697 closesocket(fd); 698 evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); 699 } 700 bev_a->ok = 0; 701 return 0; 702 } 703 case BEV_CTRL_GET_UNDERLYING: 704 default: 705 return -1; 706 } 707 } 708 709 710