1 /* $NetBSD: bufferevent.c,v 1.6 2021/04/10 19:02:37 rillig Exp $ */ 2 3 /* 4 * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu> 5 * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "event2/event-config.h" 31 #include <sys/cdefs.h> 32 __RCSID("$NetBSD: bufferevent.c,v 1.6 2021/04/10 19:02:37 rillig Exp $"); 33 #include "evconfig-private.h" 34 35 #include <sys/types.h> 36 37 #ifdef EVENT__HAVE_SYS_TIME_H 38 #include <sys/time.h> 39 #endif 40 41 #include <errno.h> 42 #include <stdio.h> 43 #include <stdlib.h> 44 #include <string.h> 45 #ifdef EVENT__HAVE_STDARG_H 46 #include <stdarg.h> 47 #endif 48 49 #ifdef _WIN32 50 #include <winsock2.h> 51 #endif 52 53 #include "event2/util.h" 54 #include "event2/buffer.h" 55 #include "event2/buffer_compat.h" 56 #include "event2/bufferevent.h" 57 #include "event2/bufferevent_struct.h" 58 #include "event2/bufferevent_compat.h" 59 #include "event2/event.h" 60 #include "event-internal.h" 61 #include "log-internal.h" 62 #include "mm-internal.h" 63 #include "bufferevent-internal.h" 64 #include "evbuffer-internal.h" 65 #include "util-internal.h" 66 67 static void bufferevent_cancel_all_(struct bufferevent *bev); 68 static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_); 69 70 void 71 bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) 72 { 73 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 74 BEV_LOCK(bufev); 75 if (!bufev_private->read_suspended) 76 bufev->be_ops->disable(bufev, EV_READ); 77 bufev_private->read_suspended |= what; 78 BEV_UNLOCK(bufev); 79 } 80 81 void 82 bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) 83 { 84 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 85 BEV_LOCK(bufev); 86 bufev_private->read_suspended &= ~what; 87 if (!bufev_private->read_suspended && (bufev->enabled & EV_READ)) 88 bufev->be_ops->enable(bufev, EV_READ); 89 BEV_UNLOCK(bufev); 90 } 91 92 void 93 bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what) 94 { 95 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 96 BEV_LOCK(bufev); 97 if (!bufev_private->write_suspended) 98 bufev->be_ops->disable(bufev, EV_WRITE); 99 bufev_private->write_suspended |= what; 100 BEV_UNLOCK(bufev); 101 } 102 103 void 104 bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what) 105 { 106 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 107 BEV_LOCK(bufev); 108 bufev_private->write_suspended &= ~what; 109 if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE)) 110 bufev->be_ops->enable(bufev, EV_WRITE); 111 BEV_UNLOCK(bufev); 112 } 113 114 /** 115 * Sometimes bufferevent's implementation can overrun high watermarks 116 * (one of examples is openssl) and in this case if the read callback 117 * will not handle enough data do over condition above the read 118 * callback will never be called again (due to suspend above). 119 * 120 * To avoid this we are scheduling read callback again here, but only 121 * from the user callback to avoid multiple scheduling: 122 * - when the data had been added to it 123 * - when the data had been drained from it (user specified read callback) 124 */ 125 static void bufferevent_inbuf_wm_check(struct bufferevent *bev) 126 { 127 if (!bev->wm_read.high) 128 return; 129 if (!(bev->enabled & EV_READ)) 130 return; 131 if (evbuffer_get_length(bev->input) < bev->wm_read.high) 132 return; 133 134 bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); 135 } 136 137 /* Callback to implement watermarks on the input buffer. Only enabled 138 * if the watermark is set. */ 139 static void 140 bufferevent_inbuf_wm_cb(struct evbuffer *buf, 141 const struct evbuffer_cb_info *cbinfo, 142 void *arg) 143 { 144 struct bufferevent *bufev = arg; 145 size_t size; 146 147 size = evbuffer_get_length(buf); 148 149 if (size >= bufev->wm_read.high) 150 bufferevent_wm_suspend_read(bufev); 151 else 152 bufferevent_wm_unsuspend_read(bufev); 153 } 154 155 static void 156 bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg) 157 { 158 struct bufferevent_private *bufev_private = arg; 159 struct bufferevent *bufev = &bufev_private->bev; 160 161 BEV_LOCK(bufev); 162 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && 163 bufev->errorcb) { 164 /* The "connected" happened before any reads or writes, so 165 send it first. */ 166 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; 167 bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg); 168 } 169 if (bufev_private->readcb_pending && bufev->readcb) { 170 bufev_private->readcb_pending = 0; 171 bufev->readcb(bufev, bufev->cbarg); 172 bufferevent_inbuf_wm_check(bufev); 173 } 174 if (bufev_private->writecb_pending && bufev->writecb) { 175 bufev_private->writecb_pending = 0; 176 bufev->writecb(bufev, bufev->cbarg); 177 } 178 if (bufev_private->eventcb_pending && bufev->errorcb) { 179 short what = bufev_private->eventcb_pending; 180 int err = bufev_private->errno_pending; 181 bufev_private->eventcb_pending = 0; 182 bufev_private->errno_pending = 0; 183 EVUTIL_SET_SOCKET_ERROR(err); 184 bufev->errorcb(bufev, what, bufev->cbarg); 185 } 186 bufferevent_decref_and_unlock_(bufev); 187 } 188 189 static void 190 bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg) 191 { 192 struct bufferevent_private *bufev_private = arg; 193 struct bufferevent *bufev = &bufev_private->bev; 194 195 BEV_LOCK(bufev); 196 #define UNLOCKED(stmt) \ 197 do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0) 198 199 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && 200 bufev->errorcb) { 201 /* The "connected" happened before any reads or writes, so 202 send it first. */ 203 bufferevent_event_cb errorcb = bufev->errorcb; 204 void *cbarg = bufev->cbarg; 205 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; 206 UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg)); 207 } 208 if (bufev_private->readcb_pending && bufev->readcb) { 209 bufferevent_data_cb readcb = bufev->readcb; 210 void *cbarg = bufev->cbarg; 211 bufev_private->readcb_pending = 0; 212 UNLOCKED(readcb(bufev, cbarg)); 213 bufferevent_inbuf_wm_check(bufev); 214 } 215 if (bufev_private->writecb_pending && bufev->writecb) { 216 bufferevent_data_cb writecb = bufev->writecb; 217 void *cbarg = bufev->cbarg; 218 bufev_private->writecb_pending = 0; 219 UNLOCKED(writecb(bufev, cbarg)); 220 } 221 if (bufev_private->eventcb_pending && bufev->errorcb) { 222 bufferevent_event_cb errorcb = bufev->errorcb; 223 void *cbarg = bufev->cbarg; 224 short what = bufev_private->eventcb_pending; 225 int err = bufev_private->errno_pending; 226 bufev_private->eventcb_pending = 0; 227 bufev_private->errno_pending = 0; 228 EVUTIL_SET_SOCKET_ERROR(err); 229 UNLOCKED(errorcb(bufev,what,cbarg)); 230 } 231 bufferevent_decref_and_unlock_(bufev); 232 #undef UNLOCKED 233 } 234 235 #define SCHEDULE_DEFERRED(bevp) \ 236 do { \ 237 if (event_deferred_cb_schedule_( \ 238 (bevp)->bev.ev_base, \ 239 &(bevp)->deferred)) \ 240 bufferevent_incref_(&(bevp)->bev); \ 241 } while (0) 242 243 244 void 245 bufferevent_run_readcb_(struct bufferevent *bufev, int options) 246 { 247 /* Requires that we hold the lock and a reference */ 248 struct bufferevent_private *p = BEV_UPCAST(bufev); 249 if (bufev->readcb == NULL) 250 return; 251 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 252 p->readcb_pending = 1; 253 SCHEDULE_DEFERRED(p); 254 } else { 255 bufev->readcb(bufev, bufev->cbarg); 256 bufferevent_inbuf_wm_check(bufev); 257 } 258 } 259 260 void 261 bufferevent_run_writecb_(struct bufferevent *bufev, int options) 262 { 263 /* Requires that we hold the lock and a reference */ 264 struct bufferevent_private *p = BEV_UPCAST(bufev); 265 if (bufev->writecb == NULL) 266 return; 267 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 268 p->writecb_pending = 1; 269 SCHEDULE_DEFERRED(p); 270 } else { 271 bufev->writecb(bufev, bufev->cbarg); 272 } 273 } 274 275 #define BEV_TRIG_ALL_OPTS ( \ 276 BEV_TRIG_IGNORE_WATERMARKS| \ 277 BEV_TRIG_DEFER_CALLBACKS \ 278 ) 279 280 void 281 bufferevent_trigger(struct bufferevent *bufev, short iotype, int options) 282 { 283 bufferevent_incref_and_lock_(bufev); 284 bufferevent_trigger_nolock_(bufev, iotype, options&BEV_TRIG_ALL_OPTS); 285 bufferevent_decref_and_unlock_(bufev); 286 } 287 288 void 289 bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options) 290 { 291 /* Requires that we hold the lock and a reference */ 292 struct bufferevent_private *p = BEV_UPCAST(bufev); 293 if (bufev->errorcb == NULL) 294 return; 295 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 296 p->eventcb_pending |= what; 297 p->errno_pending = EVUTIL_SOCKET_ERROR(); 298 SCHEDULE_DEFERRED(p); 299 } else { 300 bufev->errorcb(bufev, what, bufev->cbarg); 301 } 302 } 303 304 void 305 bufferevent_trigger_event(struct bufferevent *bufev, short what, int options) 306 { 307 bufferevent_incref_and_lock_(bufev); 308 bufferevent_run_eventcb_(bufev, what, options&BEV_TRIG_ALL_OPTS); 309 bufferevent_decref_and_unlock_(bufev); 310 } 311 312 int 313 bufferevent_init_common_(struct bufferevent_private *bufev_private, 314 struct event_base *base, 315 const struct bufferevent_ops *ops, 316 enum bufferevent_options options) 317 { 318 struct bufferevent *bufev = &bufev_private->bev; 319 320 if (!bufev->input) { 321 if ((bufev->input = evbuffer_new()) == NULL) 322 goto err; 323 } 324 325 if (!bufev->output) { 326 if ((bufev->output = evbuffer_new()) == NULL) 327 goto err; 328 } 329 330 bufev_private->refcnt = 1; 331 bufev->ev_base = base; 332 333 /* Disable timeouts. */ 334 evutil_timerclear(&bufev->timeout_read); 335 evutil_timerclear(&bufev->timeout_write); 336 337 bufev->be_ops = ops; 338 339 if (bufferevent_ratelim_init_(bufev_private)) 340 goto err; 341 342 /* 343 * Set to EV_WRITE so that using bufferevent_write is going to 344 * trigger a callback. Reading needs to be explicitly enabled 345 * because otherwise no data will be available. 346 */ 347 bufev->enabled = EV_WRITE; 348 349 #ifndef EVENT__DISABLE_THREAD_SUPPORT 350 if (options & BEV_OPT_THREADSAFE) { 351 if (bufferevent_enable_locking_(bufev, NULL) < 0) 352 goto err; 353 } 354 #endif 355 if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS)) 356 == BEV_OPT_UNLOCK_CALLBACKS) { 357 event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); 358 goto err; 359 } 360 if (options & BEV_OPT_UNLOCK_CALLBACKS) 361 event_deferred_cb_init_( 362 &bufev_private->deferred, 363 event_base_get_npriorities(base) / 2, 364 bufferevent_run_deferred_callbacks_unlocked, 365 bufev_private); 366 else 367 event_deferred_cb_init_( 368 &bufev_private->deferred, 369 event_base_get_npriorities(base) / 2, 370 bufferevent_run_deferred_callbacks_locked, 371 bufev_private); 372 373 bufev_private->options = options; 374 375 evbuffer_set_parent_(bufev->input, bufev); 376 evbuffer_set_parent_(bufev->output, bufev); 377 378 return 0; 379 380 err: 381 if (bufev->input) { 382 evbuffer_free(bufev->input); 383 bufev->input = NULL; 384 } 385 if (bufev->output) { 386 evbuffer_free(bufev->output); 387 bufev->output = NULL; 388 } 389 return -1; 390 } 391 392 void 393 bufferevent_setcb(struct bufferevent *bufev, 394 bufferevent_data_cb readcb, bufferevent_data_cb writecb, 395 bufferevent_event_cb eventcb, void *cbarg) 396 { 397 BEV_LOCK(bufev); 398 399 bufev->readcb = readcb; 400 bufev->writecb = writecb; 401 bufev->errorcb = eventcb; 402 403 bufev->cbarg = cbarg; 404 BEV_UNLOCK(bufev); 405 } 406 407 void 408 bufferevent_getcb(struct bufferevent *bufev, 409 bufferevent_data_cb *readcb_ptr, 410 bufferevent_data_cb *writecb_ptr, 411 bufferevent_event_cb *eventcb_ptr, 412 void **cbarg_ptr) 413 { 414 BEV_LOCK(bufev); 415 if (readcb_ptr) 416 *readcb_ptr = bufev->readcb; 417 if (writecb_ptr) 418 *writecb_ptr = bufev->writecb; 419 if (eventcb_ptr) 420 *eventcb_ptr = bufev->errorcb; 421 if (cbarg_ptr) 422 *cbarg_ptr = bufev->cbarg; 423 424 BEV_UNLOCK(bufev); 425 } 426 427 struct evbuffer * 428 bufferevent_get_input(struct bufferevent *bufev) 429 { 430 return bufev->input; 431 } 432 433 struct evbuffer * 434 bufferevent_get_output(struct bufferevent *bufev) 435 { 436 return bufev->output; 437 } 438 439 struct event_base * 440 bufferevent_get_base(struct bufferevent *bufev) 441 { 442 return bufev->ev_base; 443 } 444 445 int 446 bufferevent_get_priority(const struct bufferevent *bufev) 447 { 448 if (event_initialized(&bufev->ev_read)) { 449 return event_get_priority(&bufev->ev_read); 450 } else { 451 return event_base_get_npriorities(bufev->ev_base) / 2; 452 } 453 } 454 455 int 456 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) 457 { 458 if (evbuffer_add(bufev->output, data, size) == -1) 459 return (-1); 460 461 return 0; 462 } 463 464 int 465 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 466 { 467 if (evbuffer_add_buffer(bufev->output, buf) == -1) 468 return (-1); 469 470 return 0; 471 } 472 473 size_t 474 bufferevent_read(struct bufferevent *bufev, void *data, size_t size) 475 { 476 return (evbuffer_remove(bufev->input, data, size)); 477 } 478 479 int 480 bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf) 481 { 482 return (evbuffer_add_buffer(buf, bufev->input)); 483 } 484 485 int 486 bufferevent_enable(struct bufferevent *bufev, short event) 487 { 488 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 489 short impl_events = event; 490 int r = 0; 491 492 bufferevent_incref_and_lock_(bufev); 493 if (bufev_private->read_suspended) 494 impl_events &= ~EV_READ; 495 if (bufev_private->write_suspended) 496 impl_events &= ~EV_WRITE; 497 498 bufev->enabled |= event; 499 500 if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0) 501 r = -1; 502 if (r) 503 event_debug(("%s: cannot enable 0x%hx on %p", __func__, event, bufev)); 504 505 bufferevent_decref_and_unlock_(bufev); 506 return r; 507 } 508 509 int 510 bufferevent_set_timeouts(struct bufferevent *bufev, 511 const struct timeval *tv_read, 512 const struct timeval *tv_write) 513 { 514 int r = 0; 515 BEV_LOCK(bufev); 516 if (tv_read) { 517 bufev->timeout_read = *tv_read; 518 } else { 519 evutil_timerclear(&bufev->timeout_read); 520 } 521 if (tv_write) { 522 bufev->timeout_write = *tv_write; 523 } else { 524 evutil_timerclear(&bufev->timeout_write); 525 } 526 527 if (bufev->be_ops->adj_timeouts) 528 r = bufev->be_ops->adj_timeouts(bufev); 529 BEV_UNLOCK(bufev); 530 531 return r; 532 } 533 534 535 /* Obsolete; use bufferevent_set_timeouts */ 536 void 537 bufferevent_settimeout(struct bufferevent *bufev, 538 int timeout_read, int timeout_write) 539 { 540 struct timeval tv_read, tv_write; 541 struct timeval *ptv_read = NULL, *ptv_write = NULL; 542 543 memset(&tv_read, 0, sizeof(tv_read)); 544 memset(&tv_write, 0, sizeof(tv_write)); 545 546 if (timeout_read) { 547 tv_read.tv_sec = timeout_read; 548 ptv_read = &tv_read; 549 } 550 if (timeout_write) { 551 tv_write.tv_sec = timeout_write; 552 ptv_write = &tv_write; 553 } 554 555 bufferevent_set_timeouts(bufev, ptv_read, ptv_write); 556 } 557 558 559 int 560 bufferevent_disable_hard_(struct bufferevent *bufev, short event) 561 { 562 int r = 0; 563 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 564 565 BEV_LOCK(bufev); 566 bufev->enabled &= ~event; 567 568 bufev_private->connecting = 0; 569 if (bufev->be_ops->disable(bufev, event) < 0) 570 r = -1; 571 572 BEV_UNLOCK(bufev); 573 return r; 574 } 575 576 int 577 bufferevent_disable(struct bufferevent *bufev, short event) 578 { 579 int r = 0; 580 581 BEV_LOCK(bufev); 582 bufev->enabled &= ~event; 583 584 if (bufev->be_ops->disable(bufev, event) < 0) 585 r = -1; 586 if (r) 587 event_debug(("%s: cannot disable 0x%hx on %p", __func__, event, bufev)); 588 589 BEV_UNLOCK(bufev); 590 return r; 591 } 592 593 /* 594 * Sets the water marks 595 */ 596 597 void 598 bufferevent_setwatermark(struct bufferevent *bufev, short events, 599 size_t lowmark, size_t highmark) 600 { 601 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 602 603 BEV_LOCK(bufev); 604 if (events & EV_WRITE) { 605 bufev->wm_write.low = lowmark; 606 bufev->wm_write.high = highmark; 607 } 608 609 if (events & EV_READ) { 610 bufev->wm_read.low = lowmark; 611 bufev->wm_read.high = highmark; 612 613 if (highmark) { 614 /* There is now a new high-water mark for read. 615 enable the callback if needed, and see if we should 616 suspend/bufferevent_wm_unsuspend. */ 617 618 if (bufev_private->read_watermarks_cb == NULL) { 619 bufev_private->read_watermarks_cb = 620 evbuffer_add_cb(bufev->input, 621 bufferevent_inbuf_wm_cb, 622 bufev); 623 } 624 evbuffer_cb_set_flags(bufev->input, 625 bufev_private->read_watermarks_cb, 626 EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER); 627 628 if (evbuffer_get_length(bufev->input) >= highmark) 629 bufferevent_wm_suspend_read(bufev); 630 else if (evbuffer_get_length(bufev->input) < highmark) 631 bufferevent_wm_unsuspend_read(bufev); 632 } else { 633 /* There is now no high-water mark for read. */ 634 if (bufev_private->read_watermarks_cb) 635 evbuffer_cb_clear_flags(bufev->input, 636 bufev_private->read_watermarks_cb, 637 EVBUFFER_CB_ENABLED); 638 bufferevent_wm_unsuspend_read(bufev); 639 } 640 } 641 BEV_UNLOCK(bufev); 642 } 643 644 int 645 bufferevent_getwatermark(struct bufferevent *bufev, short events, 646 size_t *lowmark, size_t *highmark) 647 { 648 if (events == EV_WRITE) { 649 BEV_LOCK(bufev); 650 if (lowmark) 651 *lowmark = bufev->wm_write.low; 652 if (highmark) 653 *highmark = bufev->wm_write.high; 654 BEV_UNLOCK(bufev); 655 return 0; 656 } 657 658 if (events == EV_READ) { 659 BEV_LOCK(bufev); 660 if (lowmark) 661 *lowmark = bufev->wm_read.low; 662 if (highmark) 663 *highmark = bufev->wm_read.high; 664 BEV_UNLOCK(bufev); 665 return 0; 666 } 667 return -1; 668 } 669 670 int 671 bufferevent_flush(struct bufferevent *bufev, 672 short iotype, 673 enum bufferevent_flush_mode mode) 674 { 675 int r = -1; 676 BEV_LOCK(bufev); 677 if (bufev->be_ops->flush) 678 r = bufev->be_ops->flush(bufev, iotype, mode); 679 BEV_UNLOCK(bufev); 680 return r; 681 } 682 683 void 684 bufferevent_incref_and_lock_(struct bufferevent *bufev) 685 { 686 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 687 BEV_LOCK(bufev); 688 ++bufev_private->refcnt; 689 } 690 691 #if 0 692 static void 693 bufferevent_transfer_lock_ownership_(struct bufferevent *donor, 694 struct bufferevent *recipient) 695 { 696 struct bufferevent_private *d = BEV_UPCAST(donor); 697 struct bufferevent_private *r = BEV_UPCAST(recipient); 698 if (d->lock != r->lock) 699 return; 700 if (r->own_lock) 701 return; 702 if (d->own_lock) { 703 d->own_lock = 0; 704 r->own_lock = 1; 705 } 706 } 707 #endif 708 709 int 710 bufferevent_decref_and_unlock_(struct bufferevent *bufev) 711 { 712 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 713 int n_cbs = 0; 714 #define MAX_CBS 16 715 struct event_callback *cbs[MAX_CBS]; 716 717 EVUTIL_ASSERT(bufev_private->refcnt > 0); 718 719 if (--bufev_private->refcnt) { 720 BEV_UNLOCK(bufev); 721 return 0; 722 } 723 724 if (bufev->be_ops->unlink) 725 bufev->be_ops->unlink(bufev); 726 727 /* Okay, we're out of references. Let's finalize this once all the 728 * callbacks are done running. */ 729 cbs[0] = &bufev->ev_read.ev_evcallback; 730 cbs[1] = &bufev->ev_write.ev_evcallback; 731 cbs[2] = &bufev_private->deferred; 732 n_cbs = 3; 733 if (bufev_private->rate_limiting) { 734 struct event *e = &bufev_private->rate_limiting->refill_bucket_event; 735 if (event_initialized(e)) 736 cbs[n_cbs++] = &e->ev_evcallback; 737 } 738 n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs); 739 n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs); 740 741 event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs, 742 bufferevent_finalize_cb_); 743 744 #undef MAX_CBS 745 BEV_UNLOCK(bufev); 746 747 return 1; 748 } 749 750 static void 751 bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_) 752 { 753 struct bufferevent *bufev = arg_; 754 struct bufferevent *underlying; 755 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 756 757 BEV_LOCK(bufev); 758 underlying = bufferevent_get_underlying(bufev); 759 760 /* Clean up the shared info */ 761 if (bufev->be_ops->destruct) 762 bufev->be_ops->destruct(bufev); 763 764 /* XXX what happens if refcnt for these buffers is > 1? 765 * The buffers can share a lock with this bufferevent object, 766 * but the lock might be destroyed below. */ 767 /* evbuffer will free the callbacks */ 768 evbuffer_free(bufev->input); 769 evbuffer_free(bufev->output); 770 771 if (bufev_private->rate_limiting) { 772 if (bufev_private->rate_limiting->group) 773 bufferevent_remove_from_rate_limit_group_internal_(bufev,0); 774 mm_free(bufev_private->rate_limiting); 775 bufev_private->rate_limiting = NULL; 776 } 777 778 779 BEV_UNLOCK(bufev); 780 781 if (bufev_private->own_lock) 782 EVTHREAD_FREE_LOCK(bufev_private->lock, 783 EVTHREAD_LOCKTYPE_RECURSIVE); 784 785 /* Free the actual allocated memory. */ 786 mm_free(((char*)bufev) - bufev->be_ops->mem_offset); 787 788 /* Release the reference to underlying now that we no longer need the 789 * reference to it. We wait this long mainly in case our lock is 790 * shared with underlying. 791 * 792 * The 'destruct' function will also drop a reference to underlying 793 * if BEV_OPT_CLOSE_ON_FREE is set. 794 * 795 * XXX Should we/can we just refcount evbuffer/bufferevent locks? 796 * It would probably save us some headaches. 797 */ 798 if (underlying) 799 bufferevent_decref_(underlying); 800 } 801 802 int 803 bufferevent_decref(struct bufferevent *bufev) 804 { 805 BEV_LOCK(bufev); 806 return bufferevent_decref_and_unlock_(bufev); 807 } 808 809 void 810 bufferevent_free(struct bufferevent *bufev) 811 { 812 BEV_LOCK(bufev); 813 bufferevent_setcb(bufev, NULL, NULL, NULL, NULL); 814 bufferevent_cancel_all_(bufev); 815 bufferevent_decref_and_unlock_(bufev); 816 } 817 818 void 819 bufferevent_incref(struct bufferevent *bufev) 820 { 821 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 822 823 /* XXX: now that this function is public, we might want to 824 * - return the count from this function 825 * - create a new function to atomically grab the current refcount 826 */ 827 BEV_LOCK(bufev); 828 ++bufev_private->refcnt; 829 BEV_UNLOCK(bufev); 830 } 831 832 int 833 bufferevent_enable_locking_(struct bufferevent *bufev, void *lock) 834 { 835 #ifdef EVENT__DISABLE_THREAD_SUPPORT 836 return -1; 837 #else 838 struct bufferevent *underlying; 839 840 if (BEV_UPCAST(bufev)->lock) 841 return -1; 842 underlying = bufferevent_get_underlying(bufev); 843 844 if (!lock && underlying && BEV_UPCAST(underlying)->lock) { 845 lock = BEV_UPCAST(underlying)->lock; 846 BEV_UPCAST(bufev)->lock = lock; 847 BEV_UPCAST(bufev)->own_lock = 0; 848 } else if (!lock) { 849 EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE); 850 if (!lock) 851 return -1; 852 BEV_UPCAST(bufev)->lock = lock; 853 BEV_UPCAST(bufev)->own_lock = 1; 854 } else { 855 BEV_UPCAST(bufev)->lock = lock; 856 BEV_UPCAST(bufev)->own_lock = 0; 857 } 858 evbuffer_enable_locking(bufev->input, lock); 859 evbuffer_enable_locking(bufev->output, lock); 860 861 if (underlying && !BEV_UPCAST(underlying)->lock) 862 bufferevent_enable_locking_(underlying, lock); 863 864 return 0; 865 #endif 866 } 867 868 int 869 bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd) 870 { 871 union bufferevent_ctrl_data d; 872 int res = -1; 873 d.fd = fd; 874 BEV_LOCK(bev); 875 if (bev->be_ops->ctrl) 876 res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d); 877 if (res) 878 event_debug(("%s: cannot set fd for %p to "EV_SOCK_FMT, __func__, bev, fd)); 879 BEV_UNLOCK(bev); 880 return res; 881 } 882 883 evutil_socket_t 884 bufferevent_getfd(struct bufferevent *bev) 885 { 886 union bufferevent_ctrl_data d; 887 int res = -1; 888 d.fd = -1; 889 BEV_LOCK(bev); 890 if (bev->be_ops->ctrl) 891 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d); 892 if (res) 893 event_debug(("%s: cannot get fd for %p", __func__, bev)); 894 BEV_UNLOCK(bev); 895 return (res<0) ? -1 : d.fd; 896 } 897 898 enum bufferevent_options 899 bufferevent_get_options_(struct bufferevent *bev) 900 { 901 struct bufferevent_private *bev_p = BEV_UPCAST(bev); 902 enum bufferevent_options options; 903 904 BEV_LOCK(bev); 905 options = bev_p->options; 906 BEV_UNLOCK(bev); 907 return options; 908 } 909 910 911 static void 912 bufferevent_cancel_all_(struct bufferevent *bev) 913 { 914 union bufferevent_ctrl_data d; 915 memset(&d, 0, sizeof(d)); 916 BEV_LOCK(bev); 917 if (bev->be_ops->ctrl) 918 bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d); 919 BEV_UNLOCK(bev); 920 } 921 922 short 923 bufferevent_get_enabled(struct bufferevent *bufev) 924 { 925 short r; 926 BEV_LOCK(bufev); 927 r = bufev->enabled; 928 BEV_UNLOCK(bufev); 929 return r; 930 } 931 932 struct bufferevent * 933 bufferevent_get_underlying(struct bufferevent *bev) 934 { 935 union bufferevent_ctrl_data d; 936 int res = -1; 937 d.ptr = NULL; 938 BEV_LOCK(bev); 939 if (bev->be_ops->ctrl) 940 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d); 941 BEV_UNLOCK(bev); 942 return (res<0) ? NULL : d.ptr; 943 } 944 945 static void 946 bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx) 947 { 948 struct bufferevent *bev = ctx; 949 bufferevent_incref_and_lock_(bev); 950 bufferevent_disable(bev, EV_READ); 951 bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0); 952 bufferevent_decref_and_unlock_(bev); 953 } 954 static void 955 bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx) 956 { 957 struct bufferevent *bev = ctx; 958 bufferevent_incref_and_lock_(bev); 959 bufferevent_disable(bev, EV_WRITE); 960 bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0); 961 bufferevent_decref_and_unlock_(bev); 962 } 963 964 void 965 bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev) 966 { 967 event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE, 968 bufferevent_generic_read_timeout_cb, bev); 969 event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE, 970 bufferevent_generic_write_timeout_cb, bev); 971 } 972 973 int 974 bufferevent_generic_adj_timeouts_(struct bufferevent *bev) 975 { 976 const short enabled = bev->enabled; 977 struct bufferevent_private *bev_p = BEV_UPCAST(bev); 978 int r1=0, r2=0; 979 if ((enabled & EV_READ) && !bev_p->read_suspended && 980 evutil_timerisset(&bev->timeout_read)) 981 r1 = event_add(&bev->ev_read, &bev->timeout_read); 982 else 983 r1 = event_del(&bev->ev_read); 984 985 if ((enabled & EV_WRITE) && !bev_p->write_suspended && 986 evutil_timerisset(&bev->timeout_write) && 987 evbuffer_get_length(bev->output)) 988 r2 = event_add(&bev->ev_write, &bev->timeout_write); 989 else 990 r2 = event_del(&bev->ev_write); 991 if (r1 < 0 || r2 < 0) 992 return -1; 993 return 0; 994 } 995 996 int 997 bufferevent_generic_adj_existing_timeouts_(struct bufferevent *bev) 998 { 999 int r = 0; 1000 if (event_pending(&bev->ev_read, EV_READ, NULL)) { 1001 if (evutil_timerisset(&bev->timeout_read)) { 1002 if (bufferevent_add_event_(&bev->ev_read, &bev->timeout_read) < 0) 1003 r = -1; 1004 } else { 1005 event_remove_timer(&bev->ev_read); 1006 } 1007 } 1008 if (event_pending(&bev->ev_write, EV_WRITE, NULL)) { 1009 if (evutil_timerisset(&bev->timeout_write)) { 1010 if (bufferevent_add_event_(&bev->ev_write, &bev->timeout_write) < 0) 1011 r = -1; 1012 } else { 1013 event_remove_timer(&bev->ev_write); 1014 } 1015 } 1016 return r; 1017 } 1018 1019 int 1020 bufferevent_add_event_(struct event *ev, const struct timeval *tv) 1021 { 1022 if (!evutil_timerisset(tv)) 1023 return event_add(ev, NULL); 1024 else 1025 return event_add(ev, tv); 1026 } 1027 1028 /* For use by user programs only; internally, we should be calling 1029 either bufferevent_incref_and_lock_(), or BEV_LOCK. */ 1030 void 1031 bufferevent_lock(struct bufferevent *bev) 1032 { 1033 bufferevent_incref_and_lock_(bev); 1034 } 1035 1036 void 1037 bufferevent_unlock(struct bufferevent *bev) 1038 { 1039 bufferevent_decref_and_unlock_(bev); 1040 } 1041