1 /* $NetBSD: bufferevent.c,v 1.7 2024/08/18 20:47:20 christos Exp $ */ 2 3 /* 4 * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu> 5 * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "event2/event-config.h" 31 #include "evconfig-private.h" 32 33 #include <sys/types.h> 34 35 #ifdef EVENT__HAVE_SYS_TIME_H 36 #include <sys/time.h> 37 #endif 38 39 #include <errno.h> 40 #include <stdio.h> 41 #include <stdlib.h> 42 #include <string.h> 43 #ifdef EVENT__HAVE_STDARG_H 44 #include <stdarg.h> 45 #endif 46 47 #ifdef _WIN32 48 #include <winsock2.h> 49 #endif 50 51 #include "event2/util.h" 52 #include "event2/buffer.h" 53 #include "event2/buffer_compat.h" 54 #include "event2/bufferevent.h" 55 #include "event2/bufferevent_struct.h" 56 #include "event2/bufferevent_compat.h" 57 #include "event2/event.h" 58 #include "event-internal.h" 59 #include "log-internal.h" 60 #include "mm-internal.h" 61 #include "bufferevent-internal.h" 62 #include "evbuffer-internal.h" 63 #include "util-internal.h" 64 65 static void bufferevent_cancel_all_(struct bufferevent *bev); 66 static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_); 67 68 void 69 bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) 70 { 71 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 72 BEV_LOCK(bufev); 73 if (!bufev_private->read_suspended) 74 bufev->be_ops->disable(bufev, EV_READ); 75 bufev_private->read_suspended |= what; 76 BEV_UNLOCK(bufev); 77 } 78 79 void 80 bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) 81 { 82 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 83 BEV_LOCK(bufev); 84 bufev_private->read_suspended &= ~what; 85 if (!bufev_private->read_suspended && (bufev->enabled & EV_READ)) 86 bufev->be_ops->enable(bufev, EV_READ); 87 BEV_UNLOCK(bufev); 88 } 89 90 void 91 bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what) 92 { 93 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 94 BEV_LOCK(bufev); 95 if (!bufev_private->write_suspended) 96 bufev->be_ops->disable(bufev, EV_WRITE); 97 bufev_private->write_suspended |= what; 98 BEV_UNLOCK(bufev); 99 } 100 101 void 102 bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what) 103 { 104 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 105 BEV_LOCK(bufev); 106 bufev_private->write_suspended &= ~what; 107 if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE)) 108 bufev->be_ops->enable(bufev, EV_WRITE); 109 BEV_UNLOCK(bufev); 110 } 111 112 /** 113 * Sometimes bufferevent's implementation can overrun high watermarks 114 * (one of examples is openssl) and in this case if the read callback 115 * will not handle enough data do over condition above the read 116 * callback will never be called again (due to suspend above). 117 * 118 * To avoid this we are scheduling read callback again here, but only 119 * from the user callback to avoid multiple scheduling: 120 * - when the data had been added to it 121 * - when the data had been drained from it (user specified read callback) 122 */ 123 static void bufferevent_inbuf_wm_check(struct bufferevent *bev) 124 { 125 if (!bev->wm_read.high) 126 return; 127 if (!(bev->enabled & EV_READ)) 128 return; 129 if (evbuffer_get_length(bev->input) < bev->wm_read.high) 130 return; 131 132 bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); 133 } 134 135 /* Callback to implement watermarks on the input buffer. Only enabled 136 * if the watermark is set. */ 137 static void 138 bufferevent_inbuf_wm_cb(struct evbuffer *buf, 139 const struct evbuffer_cb_info *cbinfo, 140 void *arg) 141 { 142 struct bufferevent *bufev = arg; 143 size_t size; 144 145 size = evbuffer_get_length(buf); 146 147 if (size >= bufev->wm_read.high) 148 bufferevent_wm_suspend_read(bufev); 149 else 150 bufferevent_wm_unsuspend_read(bufev); 151 } 152 153 static void 154 bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg) 155 { 156 struct bufferevent_private *bufev_private = arg; 157 struct bufferevent *bufev = &bufev_private->bev; 158 159 BEV_LOCK(bufev); 160 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && 161 bufev->errorcb) { 162 /* The "connected" happened before any reads or writes, so 163 send it first. */ 164 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; 165 bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg); 166 } 167 if (bufev_private->readcb_pending && bufev->readcb) { 168 bufev_private->readcb_pending = 0; 169 bufev->readcb(bufev, bufev->cbarg); 170 bufferevent_inbuf_wm_check(bufev); 171 } 172 if (bufev_private->writecb_pending && bufev->writecb) { 173 bufev_private->writecb_pending = 0; 174 bufev->writecb(bufev, bufev->cbarg); 175 } 176 if (bufev_private->eventcb_pending && bufev->errorcb) { 177 short what = bufev_private->eventcb_pending; 178 int err = bufev_private->errno_pending; 179 bufev_private->eventcb_pending = 0; 180 bufev_private->errno_pending = 0; 181 EVUTIL_SET_SOCKET_ERROR(err); 182 bufev->errorcb(bufev, what, bufev->cbarg); 183 } 184 bufferevent_decref_and_unlock_(bufev); 185 } 186 187 static void 188 bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg) 189 { 190 struct bufferevent_private *bufev_private = arg; 191 struct bufferevent *bufev = &bufev_private->bev; 192 193 BEV_LOCK(bufev); 194 #define UNLOCKED(stmt) \ 195 do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0) 196 197 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && 198 bufev->errorcb) { 199 /* The "connected" happened before any reads or writes, so 200 send it first. */ 201 bufferevent_event_cb errorcb = bufev->errorcb; 202 void *cbarg = bufev->cbarg; 203 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; 204 UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg)); 205 } 206 if (bufev_private->readcb_pending && bufev->readcb) { 207 bufferevent_data_cb readcb = bufev->readcb; 208 void *cbarg = bufev->cbarg; 209 bufev_private->readcb_pending = 0; 210 UNLOCKED(readcb(bufev, cbarg)); 211 bufferevent_inbuf_wm_check(bufev); 212 } 213 if (bufev_private->writecb_pending && bufev->writecb) { 214 bufferevent_data_cb writecb = bufev->writecb; 215 void *cbarg = bufev->cbarg; 216 bufev_private->writecb_pending = 0; 217 UNLOCKED(writecb(bufev, cbarg)); 218 } 219 if (bufev_private->eventcb_pending && bufev->errorcb) { 220 bufferevent_event_cb errorcb = bufev->errorcb; 221 void *cbarg = bufev->cbarg; 222 short what = bufev_private->eventcb_pending; 223 int err = bufev_private->errno_pending; 224 bufev_private->eventcb_pending = 0; 225 bufev_private->errno_pending = 0; 226 EVUTIL_SET_SOCKET_ERROR(err); 227 UNLOCKED(errorcb(bufev,what,cbarg)); 228 } 229 bufferevent_decref_and_unlock_(bufev); 230 #undef UNLOCKED 231 } 232 233 #define SCHEDULE_DEFERRED(bevp) \ 234 do { \ 235 if (event_deferred_cb_schedule_( \ 236 (bevp)->bev.ev_base, \ 237 &(bevp)->deferred)) \ 238 bufferevent_incref_(&(bevp)->bev); \ 239 } while (0) 240 241 242 void 243 bufferevent_run_readcb_(struct bufferevent *bufev, int options) 244 { 245 /* Requires that we hold the lock and a reference */ 246 struct bufferevent_private *p = BEV_UPCAST(bufev); 247 if (bufev->readcb == NULL) 248 return; 249 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 250 p->readcb_pending = 1; 251 SCHEDULE_DEFERRED(p); 252 } else { 253 bufev->readcb(bufev, bufev->cbarg); 254 bufferevent_inbuf_wm_check(bufev); 255 } 256 } 257 258 void 259 bufferevent_run_writecb_(struct bufferevent *bufev, int options) 260 { 261 /* Requires that we hold the lock and a reference */ 262 struct bufferevent_private *p = BEV_UPCAST(bufev); 263 if (bufev->writecb == NULL) 264 return; 265 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 266 p->writecb_pending = 1; 267 SCHEDULE_DEFERRED(p); 268 } else { 269 bufev->writecb(bufev, bufev->cbarg); 270 } 271 } 272 273 #define BEV_TRIG_ALL_OPTS ( \ 274 BEV_TRIG_IGNORE_WATERMARKS| \ 275 BEV_TRIG_DEFER_CALLBACKS \ 276 ) 277 278 void 279 bufferevent_trigger(struct bufferevent *bufev, short iotype, int options) 280 { 281 bufferevent_incref_and_lock_(bufev); 282 bufferevent_trigger_nolock_(bufev, iotype, options&BEV_TRIG_ALL_OPTS); 283 bufferevent_decref_and_unlock_(bufev); 284 } 285 286 void 287 bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options) 288 { 289 /* Requires that we hold the lock and a reference */ 290 struct bufferevent_private *p = BEV_UPCAST(bufev); 291 if (bufev->errorcb == NULL) 292 return; 293 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 294 p->eventcb_pending |= what; 295 p->errno_pending = EVUTIL_SOCKET_ERROR(); 296 SCHEDULE_DEFERRED(p); 297 } else { 298 bufev->errorcb(bufev, what, bufev->cbarg); 299 } 300 } 301 302 void 303 bufferevent_trigger_event(struct bufferevent *bufev, short what, int options) 304 { 305 bufferevent_incref_and_lock_(bufev); 306 bufferevent_run_eventcb_(bufev, what, options&BEV_TRIG_ALL_OPTS); 307 bufferevent_decref_and_unlock_(bufev); 308 } 309 310 int 311 bufferevent_init_common_(struct bufferevent_private *bufev_private, 312 struct event_base *base, 313 const struct bufferevent_ops *ops, 314 enum bufferevent_options options) 315 { 316 struct bufferevent *bufev = &bufev_private->bev; 317 318 if (!bufev->input) { 319 if ((bufev->input = evbuffer_new()) == NULL) 320 goto err; 321 } 322 323 if (!bufev->output) { 324 if ((bufev->output = evbuffer_new()) == NULL) 325 goto err; 326 } 327 328 bufev_private->refcnt = 1; 329 bufev->ev_base = base; 330 331 /* Disable timeouts. */ 332 evutil_timerclear(&bufev->timeout_read); 333 evutil_timerclear(&bufev->timeout_write); 334 335 bufev->be_ops = ops; 336 337 if (bufferevent_ratelim_init_(bufev_private)) 338 goto err; 339 340 /* 341 * Set to EV_WRITE so that using bufferevent_write is going to 342 * trigger a callback. Reading needs to be explicitly enabled 343 * because otherwise no data will be available. 344 */ 345 bufev->enabled = EV_WRITE; 346 347 #ifndef EVENT__DISABLE_THREAD_SUPPORT 348 if (options & BEV_OPT_THREADSAFE) { 349 if (bufferevent_enable_locking_(bufev, NULL) < 0) 350 goto err; 351 } 352 #endif 353 if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS)) 354 == BEV_OPT_UNLOCK_CALLBACKS) { 355 event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); 356 goto err; 357 } 358 if (options & BEV_OPT_UNLOCK_CALLBACKS) 359 event_deferred_cb_init_( 360 &bufev_private->deferred, 361 event_base_get_npriorities(base) / 2, 362 bufferevent_run_deferred_callbacks_unlocked, 363 bufev_private); 364 else 365 event_deferred_cb_init_( 366 &bufev_private->deferred, 367 event_base_get_npriorities(base) / 2, 368 bufferevent_run_deferred_callbacks_locked, 369 bufev_private); 370 371 bufev_private->options = options; 372 373 evbuffer_set_parent_(bufev->input, bufev); 374 evbuffer_set_parent_(bufev->output, bufev); 375 376 return 0; 377 378 err: 379 if (bufev->input) { 380 evbuffer_free(bufev->input); 381 bufev->input = NULL; 382 } 383 if (bufev->output) { 384 evbuffer_free(bufev->output); 385 bufev->output = NULL; 386 } 387 return -1; 388 } 389 390 void 391 bufferevent_setcb(struct bufferevent *bufev, 392 bufferevent_data_cb readcb, bufferevent_data_cb writecb, 393 bufferevent_event_cb eventcb, void *cbarg) 394 { 395 BEV_LOCK(bufev); 396 397 bufev->readcb = readcb; 398 bufev->writecb = writecb; 399 bufev->errorcb = eventcb; 400 401 bufev->cbarg = cbarg; 402 BEV_UNLOCK(bufev); 403 } 404 405 void 406 bufferevent_getcb(struct bufferevent *bufev, 407 bufferevent_data_cb *readcb_ptr, 408 bufferevent_data_cb *writecb_ptr, 409 bufferevent_event_cb *eventcb_ptr, 410 void **cbarg_ptr) 411 { 412 BEV_LOCK(bufev); 413 if (readcb_ptr) 414 *readcb_ptr = bufev->readcb; 415 if (writecb_ptr) 416 *writecb_ptr = bufev->writecb; 417 if (eventcb_ptr) 418 *eventcb_ptr = bufev->errorcb; 419 if (cbarg_ptr) 420 *cbarg_ptr = bufev->cbarg; 421 422 BEV_UNLOCK(bufev); 423 } 424 425 struct evbuffer * 426 bufferevent_get_input(struct bufferevent *bufev) 427 { 428 return bufev->input; 429 } 430 431 struct evbuffer * 432 bufferevent_get_output(struct bufferevent *bufev) 433 { 434 return bufev->output; 435 } 436 437 struct event_base * 438 bufferevent_get_base(struct bufferevent *bufev) 439 { 440 return bufev->ev_base; 441 } 442 443 int 444 bufferevent_get_priority(const struct bufferevent *bufev) 445 { 446 if (event_initialized(&bufev->ev_read)) { 447 return event_get_priority(&bufev->ev_read); 448 } else { 449 return event_base_get_npriorities(bufev->ev_base) / 2; 450 } 451 } 452 453 int 454 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) 455 { 456 if (evbuffer_add(bufev->output, data, size) == -1) 457 return (-1); 458 459 return 0; 460 } 461 462 int 463 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 464 { 465 if (evbuffer_add_buffer(bufev->output, buf) == -1) 466 return (-1); 467 468 return 0; 469 } 470 471 size_t 472 bufferevent_read(struct bufferevent *bufev, void *data, size_t size) 473 { 474 return (evbuffer_remove(bufev->input, data, size)); 475 } 476 477 int 478 bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf) 479 { 480 return (evbuffer_add_buffer(buf, bufev->input)); 481 } 482 483 int 484 bufferevent_enable(struct bufferevent *bufev, short event) 485 { 486 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 487 short impl_events = event; 488 int r = 0; 489 490 bufferevent_incref_and_lock_(bufev); 491 if (bufev_private->read_suspended) 492 impl_events &= ~EV_READ; 493 if (bufev_private->write_suspended) 494 impl_events &= ~EV_WRITE; 495 496 bufev->enabled |= event; 497 498 if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0) 499 r = -1; 500 if (r) 501 event_debug(("%s: cannot enable 0x%hx on %p", __func__, event, bufev)); 502 503 bufferevent_decref_and_unlock_(bufev); 504 return r; 505 } 506 507 int 508 bufferevent_set_timeouts(struct bufferevent *bufev, 509 const struct timeval *tv_read, 510 const struct timeval *tv_write) 511 { 512 int r = 0; 513 BEV_LOCK(bufev); 514 if (tv_read) { 515 bufev->timeout_read = *tv_read; 516 } else { 517 evutil_timerclear(&bufev->timeout_read); 518 } 519 if (tv_write) { 520 bufev->timeout_write = *tv_write; 521 } else { 522 evutil_timerclear(&bufev->timeout_write); 523 } 524 525 if (bufev->be_ops->adj_timeouts) 526 r = bufev->be_ops->adj_timeouts(bufev); 527 BEV_UNLOCK(bufev); 528 529 return r; 530 } 531 532 533 /* Obsolete; use bufferevent_set_timeouts */ 534 void 535 bufferevent_settimeout(struct bufferevent *bufev, 536 int timeout_read, int timeout_write) 537 { 538 struct timeval tv_read, tv_write; 539 struct timeval *ptv_read = NULL, *ptv_write = NULL; 540 541 memset(&tv_read, 0, sizeof(tv_read)); 542 memset(&tv_write, 0, sizeof(tv_write)); 543 544 if (timeout_read) { 545 tv_read.tv_sec = timeout_read; 546 ptv_read = &tv_read; 547 } 548 if (timeout_write) { 549 tv_write.tv_sec = timeout_write; 550 ptv_write = &tv_write; 551 } 552 553 bufferevent_set_timeouts(bufev, ptv_read, ptv_write); 554 } 555 556 557 int 558 bufferevent_disable_hard_(struct bufferevent *bufev, short event) 559 { 560 int r = 0; 561 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 562 563 BEV_LOCK(bufev); 564 bufev->enabled &= ~event; 565 566 bufev_private->connecting = 0; 567 if (bufev->be_ops->disable(bufev, event) < 0) 568 r = -1; 569 570 BEV_UNLOCK(bufev); 571 return r; 572 } 573 574 int 575 bufferevent_disable(struct bufferevent *bufev, short event) 576 { 577 int r = 0; 578 579 BEV_LOCK(bufev); 580 bufev->enabled &= ~event; 581 582 if (bufev->be_ops->disable(bufev, event) < 0) 583 r = -1; 584 if (r) 585 event_debug(("%s: cannot disable 0x%hx on %p", __func__, event, bufev)); 586 587 BEV_UNLOCK(bufev); 588 return r; 589 } 590 591 /* 592 * Sets the water marks 593 */ 594 595 void 596 bufferevent_setwatermark(struct bufferevent *bufev, short events, 597 size_t lowmark, size_t highmark) 598 { 599 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 600 601 BEV_LOCK(bufev); 602 if (events & EV_WRITE) { 603 bufev->wm_write.low = lowmark; 604 bufev->wm_write.high = highmark; 605 } 606 607 if (events & EV_READ) { 608 bufev->wm_read.low = lowmark; 609 bufev->wm_read.high = highmark; 610 611 if (highmark) { 612 /* There is now a new high-water mark for read. 613 enable the callback if needed, and see if we should 614 suspend/bufferevent_wm_unsuspend. */ 615 616 if (bufev_private->read_watermarks_cb == NULL) { 617 bufev_private->read_watermarks_cb = 618 evbuffer_add_cb(bufev->input, 619 bufferevent_inbuf_wm_cb, 620 bufev); 621 } 622 evbuffer_cb_set_flags(bufev->input, 623 bufev_private->read_watermarks_cb, 624 EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER); 625 626 if (evbuffer_get_length(bufev->input) >= highmark) 627 bufferevent_wm_suspend_read(bufev); 628 else if (evbuffer_get_length(bufev->input) < highmark) 629 bufferevent_wm_unsuspend_read(bufev); 630 } else { 631 /* There is now no high-water mark for read. */ 632 if (bufev_private->read_watermarks_cb) 633 evbuffer_cb_clear_flags(bufev->input, 634 bufev_private->read_watermarks_cb, 635 EVBUFFER_CB_ENABLED); 636 bufferevent_wm_unsuspend_read(bufev); 637 } 638 } 639 BEV_UNLOCK(bufev); 640 } 641 642 int 643 bufferevent_getwatermark(struct bufferevent *bufev, short events, 644 size_t *lowmark, size_t *highmark) 645 { 646 if (events == EV_WRITE) { 647 BEV_LOCK(bufev); 648 if (lowmark) 649 *lowmark = bufev->wm_write.low; 650 if (highmark) 651 *highmark = bufev->wm_write.high; 652 BEV_UNLOCK(bufev); 653 return 0; 654 } 655 656 if (events == EV_READ) { 657 BEV_LOCK(bufev); 658 if (lowmark) 659 *lowmark = bufev->wm_read.low; 660 if (highmark) 661 *highmark = bufev->wm_read.high; 662 BEV_UNLOCK(bufev); 663 return 0; 664 } 665 return -1; 666 } 667 668 int 669 bufferevent_flush(struct bufferevent *bufev, 670 short iotype, 671 enum bufferevent_flush_mode mode) 672 { 673 int r = -1; 674 BEV_LOCK(bufev); 675 if (bufev->be_ops->flush) 676 r = bufev->be_ops->flush(bufev, iotype, mode); 677 BEV_UNLOCK(bufev); 678 return r; 679 } 680 681 void 682 bufferevent_incref_and_lock_(struct bufferevent *bufev) 683 { 684 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 685 BEV_LOCK(bufev); 686 ++bufev_private->refcnt; 687 } 688 689 #if 0 690 static void 691 bufferevent_transfer_lock_ownership_(struct bufferevent *donor, 692 struct bufferevent *recipient) 693 { 694 struct bufferevent_private *d = BEV_UPCAST(donor); 695 struct bufferevent_private *r = BEV_UPCAST(recipient); 696 if (d->lock != r->lock) 697 return; 698 if (r->own_lock) 699 return; 700 if (d->own_lock) { 701 d->own_lock = 0; 702 r->own_lock = 1; 703 } 704 } 705 #endif 706 707 int 708 bufferevent_decref_and_unlock_(struct bufferevent *bufev) 709 { 710 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 711 int n_cbs = 0; 712 #define MAX_CBS 16 713 struct event_callback *cbs[MAX_CBS]; 714 715 EVUTIL_ASSERT(bufev_private->refcnt > 0); 716 717 if (--bufev_private->refcnt) { 718 BEV_UNLOCK(bufev); 719 return 0; 720 } 721 722 if (bufev->be_ops->unlink) 723 bufev->be_ops->unlink(bufev); 724 725 /* Okay, we're out of references. Let's finalize this once all the 726 * callbacks are done running. */ 727 cbs[0] = &bufev->ev_read.ev_evcallback; 728 cbs[1] = &bufev->ev_write.ev_evcallback; 729 cbs[2] = &bufev_private->deferred; 730 n_cbs = 3; 731 if (bufev_private->rate_limiting) { 732 struct event *e = &bufev_private->rate_limiting->refill_bucket_event; 733 if (event_initialized(e)) 734 cbs[n_cbs++] = &e->ev_evcallback; 735 } 736 n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs); 737 n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs); 738 739 event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs, 740 bufferevent_finalize_cb_); 741 742 #undef MAX_CBS 743 BEV_UNLOCK(bufev); 744 745 return 1; 746 } 747 748 static void 749 bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_) 750 { 751 struct bufferevent *bufev = arg_; 752 struct bufferevent *underlying; 753 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 754 755 BEV_LOCK(bufev); 756 underlying = bufferevent_get_underlying(bufev); 757 758 /* Clean up the shared info */ 759 if (bufev->be_ops->destruct) 760 bufev->be_ops->destruct(bufev); 761 762 /* XXX what happens if refcnt for these buffers is > 1? 763 * The buffers can share a lock with this bufferevent object, 764 * but the lock might be destroyed below. */ 765 /* evbuffer will free the callbacks */ 766 evbuffer_free(bufev->input); 767 evbuffer_free(bufev->output); 768 769 if (bufev_private->rate_limiting) { 770 if (bufev_private->rate_limiting->group) 771 bufferevent_remove_from_rate_limit_group_internal_(bufev,0); 772 mm_free(bufev_private->rate_limiting); 773 bufev_private->rate_limiting = NULL; 774 } 775 776 777 BEV_UNLOCK(bufev); 778 779 if (bufev_private->own_lock) 780 EVTHREAD_FREE_LOCK(bufev_private->lock, 781 EVTHREAD_LOCKTYPE_RECURSIVE); 782 783 /* Free the actual allocated memory. */ 784 mm_free(((char*)bufev) - bufev->be_ops->mem_offset); 785 786 /* Release the reference to underlying now that we no longer need the 787 * reference to it. We wait this long mainly in case our lock is 788 * shared with underlying. 789 * 790 * The 'destruct' function will also drop a reference to underlying 791 * if BEV_OPT_CLOSE_ON_FREE is set. 792 * 793 * XXX Should we/can we just refcount evbuffer/bufferevent locks? 794 * It would probably save us some headaches. 795 */ 796 if (underlying) 797 bufferevent_decref_(underlying); 798 } 799 800 int 801 bufferevent_decref(struct bufferevent *bufev) 802 { 803 BEV_LOCK(bufev); 804 return bufferevent_decref_and_unlock_(bufev); 805 } 806 807 void 808 bufferevent_free(struct bufferevent *bufev) 809 { 810 BEV_LOCK(bufev); 811 bufferevent_setcb(bufev, NULL, NULL, NULL, NULL); 812 bufferevent_cancel_all_(bufev); 813 bufferevent_decref_and_unlock_(bufev); 814 } 815 816 void 817 bufferevent_incref(struct bufferevent *bufev) 818 { 819 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 820 821 /* XXX: now that this function is public, we might want to 822 * - return the count from this function 823 * - create a new function to atomically grab the current refcount 824 */ 825 BEV_LOCK(bufev); 826 ++bufev_private->refcnt; 827 BEV_UNLOCK(bufev); 828 } 829 830 int 831 bufferevent_enable_locking_(struct bufferevent *bufev, void *lock) 832 { 833 #ifdef EVENT__DISABLE_THREAD_SUPPORT 834 return -1; 835 #else 836 struct bufferevent *underlying; 837 838 if (BEV_UPCAST(bufev)->lock) 839 return -1; 840 underlying = bufferevent_get_underlying(bufev); 841 842 if (!lock && underlying && BEV_UPCAST(underlying)->lock) { 843 lock = BEV_UPCAST(underlying)->lock; 844 BEV_UPCAST(bufev)->lock = lock; 845 BEV_UPCAST(bufev)->own_lock = 0; 846 } else if (!lock) { 847 EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE); 848 if (!lock) 849 return -1; 850 BEV_UPCAST(bufev)->lock = lock; 851 BEV_UPCAST(bufev)->own_lock = 1; 852 } else { 853 BEV_UPCAST(bufev)->lock = lock; 854 BEV_UPCAST(bufev)->own_lock = 0; 855 } 856 evbuffer_enable_locking(bufev->input, lock); 857 evbuffer_enable_locking(bufev->output, lock); 858 859 if (underlying && !BEV_UPCAST(underlying)->lock) 860 bufferevent_enable_locking_(underlying, lock); 861 862 return 0; 863 #endif 864 } 865 866 int 867 bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd) 868 { 869 union bufferevent_ctrl_data d; 870 int res = -1; 871 d.fd = fd; 872 BEV_LOCK(bev); 873 if (bev->be_ops->ctrl) 874 res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d); 875 if (res) 876 event_debug(("%s: cannot set fd for %p to "EV_SOCK_FMT, __func__, bev, fd)); 877 BEV_UNLOCK(bev); 878 return res; 879 } 880 881 evutil_socket_t 882 bufferevent_getfd(struct bufferevent *bev) 883 { 884 union bufferevent_ctrl_data d; 885 int res = -1; 886 d.fd = -1; 887 BEV_LOCK(bev); 888 if (bev->be_ops->ctrl) 889 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d); 890 if (res) 891 event_debug(("%s: cannot get fd for %p", __func__, bev)); 892 BEV_UNLOCK(bev); 893 return (res<0) ? -1 : d.fd; 894 } 895 896 enum bufferevent_options 897 bufferevent_get_options_(struct bufferevent *bev) 898 { 899 struct bufferevent_private *bev_p = BEV_UPCAST(bev); 900 enum bufferevent_options options; 901 902 BEV_LOCK(bev); 903 options = bev_p->options; 904 BEV_UNLOCK(bev); 905 return options; 906 } 907 908 909 static void 910 bufferevent_cancel_all_(struct bufferevent *bev) 911 { 912 union bufferevent_ctrl_data d; 913 memset(&d, 0, sizeof(d)); 914 BEV_LOCK(bev); 915 if (bev->be_ops->ctrl) 916 bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d); 917 BEV_UNLOCK(bev); 918 } 919 920 short 921 bufferevent_get_enabled(struct bufferevent *bufev) 922 { 923 short r; 924 BEV_LOCK(bufev); 925 r = bufev->enabled; 926 BEV_UNLOCK(bufev); 927 return r; 928 } 929 930 struct bufferevent * 931 bufferevent_get_underlying(struct bufferevent *bev) 932 { 933 union bufferevent_ctrl_data d; 934 int res = -1; 935 d.ptr = NULL; 936 BEV_LOCK(bev); 937 if (bev->be_ops->ctrl) 938 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d); 939 BEV_UNLOCK(bev); 940 return (res<0) ? NULL : d.ptr; 941 } 942 943 static void 944 bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx) 945 { 946 struct bufferevent *bev = ctx; 947 bufferevent_incref_and_lock_(bev); 948 bufferevent_disable(bev, EV_READ); 949 bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0); 950 bufferevent_decref_and_unlock_(bev); 951 } 952 static void 953 bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx) 954 { 955 struct bufferevent *bev = ctx; 956 bufferevent_incref_and_lock_(bev); 957 bufferevent_disable(bev, EV_WRITE); 958 bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0); 959 bufferevent_decref_and_unlock_(bev); 960 } 961 962 void 963 bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev) 964 { 965 event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE, 966 bufferevent_generic_read_timeout_cb, bev); 967 event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE, 968 bufferevent_generic_write_timeout_cb, bev); 969 } 970 971 int 972 bufferevent_generic_adj_timeouts_(struct bufferevent *bev) 973 { 974 const short enabled = bev->enabled; 975 struct bufferevent_private *bev_p = BEV_UPCAST(bev); 976 int r1=0, r2=0; 977 if ((enabled & EV_READ) && !bev_p->read_suspended && 978 evutil_timerisset(&bev->timeout_read)) 979 r1 = event_add(&bev->ev_read, &bev->timeout_read); 980 else 981 r1 = event_del(&bev->ev_read); 982 983 if ((enabled & EV_WRITE) && !bev_p->write_suspended && 984 evutil_timerisset(&bev->timeout_write) && 985 evbuffer_get_length(bev->output)) 986 r2 = event_add(&bev->ev_write, &bev->timeout_write); 987 else 988 r2 = event_del(&bev->ev_write); 989 if (r1 < 0 || r2 < 0) 990 return -1; 991 return 0; 992 } 993 994 int 995 bufferevent_generic_adj_existing_timeouts_(struct bufferevent *bev) 996 { 997 int r = 0; 998 if (event_pending(&bev->ev_read, EV_READ, NULL)) { 999 if (evutil_timerisset(&bev->timeout_read)) { 1000 if (bufferevent_add_event_(&bev->ev_read, &bev->timeout_read) < 0) 1001 r = -1; 1002 } else { 1003 event_remove_timer(&bev->ev_read); 1004 } 1005 } 1006 if (event_pending(&bev->ev_write, EV_WRITE, NULL)) { 1007 if (evutil_timerisset(&bev->timeout_write)) { 1008 if (bufferevent_add_event_(&bev->ev_write, &bev->timeout_write) < 0) 1009 r = -1; 1010 } else { 1011 event_remove_timer(&bev->ev_write); 1012 } 1013 } 1014 return r; 1015 } 1016 1017 int 1018 bufferevent_add_event_(struct event *ev, const struct timeval *tv) 1019 { 1020 if (!evutil_timerisset(tv)) 1021 return event_add(ev, NULL); 1022 else 1023 return event_add(ev, tv); 1024 } 1025 1026 /* For use by user programs only; internally, we should be calling 1027 either bufferevent_incref_and_lock_(), or BEV_LOCK. */ 1028 void 1029 bufferevent_lock(struct bufferevent *bev) 1030 { 1031 bufferevent_incref_and_lock_(bev); 1032 } 1033 1034 void 1035 bufferevent_unlock(struct bufferevent *bev) 1036 { 1037 bufferevent_decref_and_unlock_(bev); 1038 } 1039