1 /* $NetBSD: event.c,v 1.3 2015/01/29 07:26:02 spz Exp $ */ 2 /* 3 * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu> 4 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. The name of the author may not be used to endorse or promote products 15 * derived from this software without specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 */ 28 #include "event2/event-config.h" 29 #include <sys/cdefs.h> 30 __RCSID("$NetBSD: event.c,v 1.3 2015/01/29 07:26:02 spz Exp $"); 31 32 #ifdef WIN32 33 #include <winsock2.h> 34 #define WIN32_LEAN_AND_MEAN 35 #include <windows.h> 36 #undef WIN32_LEAN_AND_MEAN 37 #endif 38 #include <sys/types.h> 39 #if !defined(WIN32) && defined(_EVENT_HAVE_SYS_TIME_H) 40 #include <sys/time.h> 41 #endif 42 #include <sys/queue.h> 43 #ifdef _EVENT_HAVE_SYS_SOCKET_H 44 #include <sys/socket.h> 45 #endif 46 #include <stdio.h> 47 #include <stdlib.h> 48 #ifdef _EVENT_HAVE_UNISTD_H 49 #include <unistd.h> 50 #endif 51 #ifdef _EVENT_HAVE_SYS_EVENTFD_H 52 #include <sys/eventfd.h> 53 #endif 54 #include <ctype.h> 55 #include <errno.h> 56 #include <signal.h> 57 #include <string.h> 58 #include <time.h> 59 60 #include "event2/event.h" 61 #include "event2/event_struct.h" 62 #include "event2/event_compat.h" 63 #include "event-internal.h" 64 #include "defer-internal.h" 65 #include "evthread-internal.h" 66 #include "event2/thread.h" 67 #include "event2/util.h" 68 #include "log-internal.h" 69 #include "evmap-internal.h" 70 #include "iocp-internal.h" 71 #include "changelist-internal.h" 72 #include "ht-internal.h" 73 #include "util-internal.h" 74 75 #ifdef _EVENT_HAVE_EVENT_PORTS 76 extern const struct eventop evportops; 77 #endif 78 #ifdef _EVENT_HAVE_SELECT 79 extern const struct eventop selectops; 80 #endif 81 #ifdef _EVENT_HAVE_POLL 82 extern const struct eventop pollops; 83 #endif 84 #ifdef _EVENT_HAVE_EPOLL 85 extern const struct eventop epollops; 86 #endif 87 #ifdef _EVENT_HAVE_WORKING_KQUEUE 88 extern const struct eventop kqops; 89 #endif 90 #ifdef _EVENT_HAVE_DEVPOLL 91 extern const struct eventop devpollops; 92 #endif 93 #ifdef WIN32 94 extern const struct eventop win32ops; 95 #endif 96 97 /* Array of backends in order of preference. */ 98 static const struct eventop *eventops[] = { 99 #ifdef _EVENT_HAVE_EVENT_PORTS 100 &evportops, 101 #endif 102 #ifdef _EVENT_HAVE_WORKING_KQUEUE 103 &kqops, 104 #endif 105 #ifdef _EVENT_HAVE_EPOLL 106 &epollops, 107 #endif 108 #ifdef _EVENT_HAVE_DEVPOLL 109 &devpollops, 110 #endif 111 #ifdef _EVENT_HAVE_POLL 112 &pollops, 113 #endif 114 #ifdef _EVENT_HAVE_SELECT 115 &selectops, 116 #endif 117 #ifdef WIN32 118 &win32ops, 119 #endif 120 NULL 121 }; 122 123 /* Global state; deprecated */ 124 struct event_base *event_global_current_base_ = NULL; 125 #define current_base event_global_current_base_ 126 127 /* Global state */ 128 129 static int use_monotonic; 130 131 /* Prototypes */ 132 static inline int event_add_internal(struct event *ev, 133 const struct timeval *tv, int tv_is_absolute); 134 static inline int event_del_internal(struct event *ev); 135 136 static void event_queue_insert(struct event_base *, struct event *, int); 137 static void event_queue_remove(struct event_base *, struct event *, int); 138 static int event_haveevents(struct event_base *); 139 140 static int event_process_active(struct event_base *); 141 142 static int timeout_next(struct event_base *, struct timeval **); 143 static void timeout_process(struct event_base *); 144 static void timeout_correct(struct event_base *, struct timeval *); 145 146 static inline void event_signal_closure(struct event_base *, struct event *ev); 147 static inline void event_persist_closure(struct event_base *, struct event *ev); 148 149 static int evthread_notify_base(struct event_base *base); 150 151 #ifndef _EVENT_DISABLE_DEBUG_MODE 152 /* These functions implement a hashtable of which 'struct event *' structures 153 * have been setup or added. We don't want to trust the content of the struct 154 * event itself, since we're trying to work through cases where an event gets 155 * clobbered or freed. Instead, we keep a hashtable indexed by the pointer. 156 */ 157 158 struct event_debug_entry { 159 HT_ENTRY(event_debug_entry) node; 160 const struct event *ptr; 161 unsigned added : 1; 162 }; 163 164 static inline unsigned 165 hash_debug_entry(const struct event_debug_entry *e) 166 { 167 /* We need to do this silliness to convince compilers that we 168 * honestly mean to cast e->ptr to an integer, and discard any 169 * part of it that doesn't fit in an unsigned. 170 */ 171 unsigned u = (unsigned) ((ev_uintptr_t) e->ptr); 172 /* Our hashtable implementation is pretty sensitive to low bits, 173 * and every struct event is over 64 bytes in size, so we can 174 * just say >>6. */ 175 return (u >> 6); 176 } 177 178 static inline int 179 eq_debug_entry(const struct event_debug_entry *a, 180 const struct event_debug_entry *b) 181 { 182 return a->ptr == b->ptr; 183 } 184 185 int _event_debug_mode_on = 0; 186 /* Set if it's too late to enable event_debug_mode. */ 187 static int event_debug_mode_too_late = 0; 188 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 189 static void *_event_debug_map_lock = NULL; 190 #endif 191 static HT_HEAD(event_debug_map, event_debug_entry) global_debug_map = 192 HT_INITIALIZER(); 193 194 HT_PROTOTYPE(event_debug_map, event_debug_entry, node, hash_debug_entry, 195 eq_debug_entry) 196 HT_GENERATE(event_debug_map, event_debug_entry, node, hash_debug_entry, 197 eq_debug_entry, 0.5, mm_malloc, mm_realloc, mm_free) 198 199 /* Macro: record that ev is now setup (that is, ready for an add) */ 200 #define _event_debug_note_setup(ev) do { \ 201 if (_event_debug_mode_on) { \ 202 struct event_debug_entry *dent,find; \ 203 find.ptr = (ev); \ 204 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 205 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \ 206 if (dent) { \ 207 dent->added = 0; \ 208 } else { \ 209 dent = mm_malloc(sizeof(*dent)); \ 210 if (!dent) \ 211 event_err(1, \ 212 "Out of memory in debugging code"); \ 213 dent->ptr = (ev); \ 214 dent->added = 0; \ 215 HT_INSERT(event_debug_map, &global_debug_map, dent); \ 216 } \ 217 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 218 } \ 219 event_debug_mode_too_late = 1; \ 220 } while (/*CONSTCOND*/0) 221 /* Macro: record that ev is no longer setup */ 222 #define _event_debug_note_teardown(ev) do { \ 223 if (_event_debug_mode_on) { \ 224 struct event_debug_entry *dent,find; \ 225 find.ptr = (ev); \ 226 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 227 dent = HT_REMOVE(event_debug_map, &global_debug_map, &find); \ 228 if (dent) \ 229 mm_free(dent); \ 230 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 231 } \ 232 event_debug_mode_too_late = 1; \ 233 } while (/*CONSTCOND*/0) 234 /* Macro: record that ev is now added */ 235 #define _event_debug_note_add(ev) do { \ 236 if (_event_debug_mode_on) { \ 237 struct event_debug_entry *dent,find; \ 238 find.ptr = (ev); \ 239 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 240 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \ 241 if (dent) { \ 242 dent->added = 1; \ 243 } else { \ 244 event_errx(_EVENT_ERR_ABORT, \ 245 "%s: noting an add on a non-setup event %p" \ 246 " (events: 0x%x, fd: "EV_SOCK_FMT \ 247 ", flags: 0x%x)", \ 248 __func__, (ev), (ev)->ev_events, \ 249 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \ 250 } \ 251 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 252 } \ 253 event_debug_mode_too_late = 1; \ 254 } while (/*CONSTCOND*/0) 255 /* Macro: record that ev is no longer added */ 256 #define _event_debug_note_del(ev) do { \ 257 if (_event_debug_mode_on) { \ 258 struct event_debug_entry *dent,find; \ 259 find.ptr = (ev); \ 260 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 261 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \ 262 if (dent) { \ 263 dent->added = 0; \ 264 } else { \ 265 event_errx(_EVENT_ERR_ABORT, \ 266 "%s: noting a del on a non-setup event %p" \ 267 " (events: 0x%x, fd: "EV_SOCK_FMT \ 268 ", flags: 0x%x)", \ 269 __func__, (ev), (ev)->ev_events, \ 270 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \ 271 } \ 272 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 273 } \ 274 event_debug_mode_too_late = 1; \ 275 } while (/*CONSTCOND*/0) 276 /* Macro: assert that ev is setup (i.e., okay to add or inspect) */ 277 #define _event_debug_assert_is_setup(ev) do { \ 278 if (_event_debug_mode_on) { \ 279 struct event_debug_entry *dent,find; \ 280 find.ptr = (ev); \ 281 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 282 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \ 283 if (!dent) { \ 284 event_errx(_EVENT_ERR_ABORT, \ 285 "%s called on a non-initialized event %p" \ 286 " (events: 0x%x, fd: "EV_SOCK_FMT\ 287 ", flags: 0x%x)", \ 288 __func__, (ev), (ev)->ev_events, \ 289 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \ 290 } \ 291 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 292 } \ 293 } while (/*CONSTCOND*/0) 294 /* Macro: assert that ev is not added (i.e., okay to tear down or set 295 * up again) */ 296 #define _event_debug_assert_not_added(ev) do { \ 297 if (_event_debug_mode_on) { \ 298 struct event_debug_entry *dent,find; \ 299 find.ptr = (ev); \ 300 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 301 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \ 302 if (dent && dent->added) { \ 303 event_errx(_EVENT_ERR_ABORT, \ 304 "%s called on an already added event %p" \ 305 " (events: 0x%x, fd: "EV_SOCK_FMT", " \ 306 "flags: 0x%x)", \ 307 __func__, (ev), (ev)->ev_events, \ 308 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \ 309 } \ 310 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 311 } \ 312 } while (/*CONSTCOND*/0) 313 #else 314 #define _event_debug_note_setup(ev) \ 315 ((void)0) 316 #define _event_debug_note_teardown(ev) \ 317 ((void)0) 318 #define _event_debug_note_add(ev) \ 319 ((void)0) 320 #define _event_debug_note_del(ev) \ 321 ((void)0) 322 #define _event_debug_assert_is_setup(ev) \ 323 ((void)0) 324 #define _event_debug_assert_not_added(ev) \ 325 ((void)0) 326 #endif 327 328 #define EVENT_BASE_ASSERT_LOCKED(base) \ 329 EVLOCK_ASSERT_LOCKED((base)->th_base_lock) 330 331 /* The first time this function is called, it sets use_monotonic to 1 332 * if we have a clock function that supports monotonic time */ 333 static void 334 detect_monotonic(void) 335 { 336 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 337 struct timespec ts; 338 static int use_monotonic_initialized = 0; 339 340 if (use_monotonic_initialized) 341 return; 342 343 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) 344 use_monotonic = 1; 345 346 use_monotonic_initialized = 1; 347 #endif 348 } 349 350 /* How often (in seconds) do we check for changes in wall clock time relative 351 * to monotonic time? Set this to -1 for 'never.' */ 352 #define CLOCK_SYNC_INTERVAL -1 353 354 /** Set 'tp' to the current time according to 'base'. We must hold the lock 355 * on 'base'. If there is a cached time, return it. Otherwise, use 356 * clock_gettime or gettimeofday as appropriate to find out the right time. 357 * Return 0 on success, -1 on failure. 358 */ 359 static int 360 gettime(struct event_base *base, struct timeval *tp) 361 { 362 EVENT_BASE_ASSERT_LOCKED(base); 363 364 if (base->tv_cache.tv_sec) { 365 *tp = base->tv_cache; 366 return (0); 367 } 368 369 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 370 if (use_monotonic) { 371 struct timespec ts; 372 373 if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) 374 return (-1); 375 376 tp->tv_sec = ts.tv_sec; 377 tp->tv_usec = ts.tv_nsec / 1000; 378 if (base->last_updated_clock_diff + CLOCK_SYNC_INTERVAL 379 < ts.tv_sec) { 380 struct timeval tv; 381 evutil_gettimeofday(&tv,NULL); 382 evutil_timersub(&tv, tp, &base->tv_clock_diff); 383 base->last_updated_clock_diff = ts.tv_sec; 384 } 385 386 return (0); 387 } 388 #endif 389 390 return (evutil_gettimeofday(tp, NULL)); 391 } 392 393 int 394 event_base_gettimeofday_cached(struct event_base *base, struct timeval *tv) 395 { 396 int r; 397 if (!base) { 398 base = current_base; 399 if (!current_base) 400 return evutil_gettimeofday(tv, NULL); 401 } 402 403 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 404 if (base->tv_cache.tv_sec == 0) { 405 r = evutil_gettimeofday(tv, NULL); 406 } else { 407 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 408 evutil_timeradd(&base->tv_cache, &base->tv_clock_diff, tv); 409 #else 410 *tv = base->tv_cache; 411 #endif 412 r = 0; 413 } 414 EVBASE_RELEASE_LOCK(base, th_base_lock); 415 return r; 416 } 417 418 /** Make 'base' have no current cached time. */ 419 static inline void 420 clear_time_cache(struct event_base *base) 421 { 422 base->tv_cache.tv_sec = 0; 423 } 424 425 /** Replace the cached time in 'base' with the current time. */ 426 static inline void 427 update_time_cache(struct event_base *base) 428 { 429 base->tv_cache.tv_sec = 0; 430 if (!(base->flags & EVENT_BASE_FLAG_NO_CACHE_TIME)) 431 gettime(base, &base->tv_cache); 432 } 433 434 struct event_base * 435 event_init(void) 436 { 437 struct event_base *base = event_base_new_with_config(NULL); 438 439 if (base == NULL) { 440 event_errx(1, "%s: Unable to construct event_base", __func__); 441 return NULL; 442 } 443 444 current_base = base; 445 446 return (base); 447 } 448 449 struct event_base * 450 event_base_new(void) 451 { 452 struct event_base *base = NULL; 453 struct event_config *cfg = event_config_new(); 454 if (cfg) { 455 base = event_base_new_with_config(cfg); 456 event_config_free(cfg); 457 } 458 return base; 459 } 460 461 /** Return true iff 'method' is the name of a method that 'cfg' tells us to 462 * avoid. */ 463 static int 464 event_config_is_avoided_method(const struct event_config *cfg, 465 const char *method) 466 { 467 struct event_config_entry *entry; 468 469 TAILQ_FOREACH(entry, &cfg->entries, next) { 470 if (entry->avoid_method != NULL && 471 strcmp(entry->avoid_method, method) == 0) 472 return (1); 473 } 474 475 return (0); 476 } 477 478 /** Return true iff 'method' is disabled according to the environment. */ 479 static int 480 event_is_method_disabled(const char *name) 481 { 482 char environment[64]; 483 int i; 484 485 evutil_snprintf(environment, sizeof(environment), "EVENT_NO%s", name); 486 for (i = 8; environment[i] != '\0'; ++i) 487 environment[i] = EVUTIL_TOUPPER(environment[i]); 488 /* Note that evutil_getenv() ignores the environment entirely if 489 * we're setuid */ 490 return (evutil_getenv(environment) != NULL); 491 } 492 493 int 494 event_base_get_features(const struct event_base *base) 495 { 496 return base->evsel->features; 497 } 498 499 void 500 event_deferred_cb_queue_init(struct deferred_cb_queue *cb) 501 { 502 memset(cb, 0, sizeof(struct deferred_cb_queue)); 503 TAILQ_INIT(&cb->deferred_cb_list); 504 } 505 506 /** Helper for the deferred_cb queue: wake up the event base. */ 507 static void 508 notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr) 509 { 510 struct event_base *base = baseptr; 511 if (EVBASE_NEED_NOTIFY(base)) 512 evthread_notify_base(base); 513 } 514 515 struct deferred_cb_queue * 516 event_base_get_deferred_cb_queue(struct event_base *base) 517 { 518 return base ? &base->defer_queue : NULL; 519 } 520 521 void 522 event_enable_debug_mode(void) 523 { 524 #ifndef _EVENT_DISABLE_DEBUG_MODE 525 if (_event_debug_mode_on) 526 event_errx(1, "%s was called twice!", __func__); 527 if (event_debug_mode_too_late) 528 event_errx(1, "%s must be called *before* creating any events " 529 "or event_bases",__func__); 530 531 _event_debug_mode_on = 1; 532 533 HT_INIT(event_debug_map, &global_debug_map); 534 #endif 535 } 536 537 #if 0 538 void 539 event_disable_debug_mode(void) 540 { 541 struct event_debug_entry **ent, *victim; 542 543 EVLOCK_LOCK(_event_debug_map_lock, 0); 544 for (ent = HT_START(event_debug_map, &global_debug_map); ent; ) { 545 victim = *ent; 546 ent = HT_NEXT_RMV(event_debug_map,&global_debug_map, ent); 547 mm_free(victim); 548 } 549 HT_CLEAR(event_debug_map, &global_debug_map); 550 EVLOCK_UNLOCK(_event_debug_map_lock , 0); 551 } 552 #endif 553 554 struct event_base * 555 event_base_new_with_config(const struct event_config *cfg) 556 { 557 int i; 558 struct event_base *base; 559 int should_check_environment; 560 561 #ifndef _EVENT_DISABLE_DEBUG_MODE 562 event_debug_mode_too_late = 1; 563 #endif 564 565 if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) { 566 event_warn("%s: calloc", __func__); 567 return NULL; 568 } 569 detect_monotonic(); 570 gettime(base, &base->event_tv); 571 572 min_heap_ctor(&base->timeheap); 573 TAILQ_INIT(&base->eventqueue); 574 base->sig.ev_signal_pair[0] = -1; 575 base->sig.ev_signal_pair[1] = -1; 576 base->th_notify_fd[0] = -1; 577 base->th_notify_fd[1] = -1; 578 579 event_deferred_cb_queue_init(&base->defer_queue); 580 base->defer_queue.notify_fn = notify_base_cbq_callback; 581 base->defer_queue.notify_arg = base; 582 if (cfg) 583 base->flags = cfg->flags; 584 585 evmap_io_initmap(&base->io); 586 evmap_signal_initmap(&base->sigmap); 587 event_changelist_init(&base->changelist); 588 589 base->evbase = NULL; 590 591 should_check_environment = 592 !(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV)); 593 594 for (i = 0; eventops[i] && !base->evbase; i++) { 595 if (cfg != NULL) { 596 /* determine if this backend should be avoided */ 597 if (event_config_is_avoided_method(cfg, 598 eventops[i]->name)) 599 continue; 600 if ((eventops[i]->features & cfg->require_features) 601 != cfg->require_features) 602 continue; 603 } 604 605 /* also obey the environment variables */ 606 if (should_check_environment && 607 event_is_method_disabled(eventops[i]->name)) 608 continue; 609 610 base->evsel = eventops[i]; 611 612 base->evbase = base->evsel->init(base); 613 } 614 615 if (base->evbase == NULL) { 616 event_warnx("%s: no event mechanism available", 617 __func__); 618 base->evsel = NULL; 619 event_base_free(base); 620 return NULL; 621 } 622 623 if (evutil_getenv("EVENT_SHOW_METHOD")) 624 event_msgx("libevent using: %s", base->evsel->name); 625 626 /* allocate a single active event queue */ 627 if (event_base_priority_init(base, 1) < 0) { 628 event_base_free(base); 629 return NULL; 630 } 631 632 /* prepare for threading */ 633 634 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 635 if (EVTHREAD_LOCKING_ENABLED() && 636 (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) { 637 int r; 638 EVTHREAD_ALLOC_LOCK(base->th_base_lock, 639 EVTHREAD_LOCKTYPE_RECURSIVE); 640 base->defer_queue.lock = base->th_base_lock; 641 EVTHREAD_ALLOC_COND(base->current_event_cond); 642 r = evthread_make_base_notifiable(base); 643 if (r<0) { 644 event_warnx("%s: Unable to make base notifiable.", __func__); 645 event_base_free(base); 646 return NULL; 647 } 648 } 649 #endif 650 651 #ifdef WIN32 652 if (cfg && (cfg->flags & EVENT_BASE_FLAG_STARTUP_IOCP)) 653 event_base_start_iocp(base, cfg->n_cpus_hint); 654 #endif 655 656 return (base); 657 } 658 659 int 660 event_base_start_iocp(struct event_base *base, int n_cpus) 661 { 662 #ifdef WIN32 663 if (base->iocp) 664 return 0; 665 base->iocp = event_iocp_port_launch(n_cpus); 666 if (!base->iocp) { 667 event_warnx("%s: Couldn't launch IOCP", __func__); 668 return -1; 669 } 670 return 0; 671 #else 672 return -1; 673 #endif 674 } 675 676 void 677 event_base_stop_iocp(struct event_base *base) 678 { 679 #ifdef WIN32 680 int rv; 681 682 if (!base->iocp) 683 return; 684 rv = event_iocp_shutdown(base->iocp, -1); 685 EVUTIL_ASSERT(rv >= 0); 686 base->iocp = NULL; 687 #endif 688 } 689 690 void 691 event_base_free(struct event_base *base) 692 { 693 int i, n_deleted=0; 694 struct event *ev; 695 /* XXXX grab the lock? If there is contention when one thread frees 696 * the base, then the contending thread will be very sad soon. */ 697 698 /* event_base_free(NULL) is how to free the current_base if we 699 * made it with event_init and forgot to hold a reference to it. */ 700 if (base == NULL && current_base) 701 base = current_base; 702 /* If we're freeing current_base, there won't be a current_base. */ 703 if (base == current_base) 704 current_base = NULL; 705 /* Don't actually free NULL. */ 706 if (base == NULL) { 707 event_warnx("%s: no base to free", __func__); 708 return; 709 } 710 /* XXX(niels) - check for internal events first */ 711 712 #ifdef WIN32 713 event_base_stop_iocp(base); 714 #endif 715 716 /* threading fds if we have them */ 717 if (base->th_notify_fd[0] != -1) { 718 event_del(&base->th_notify); 719 EVUTIL_CLOSESOCKET(base->th_notify_fd[0]); 720 if (base->th_notify_fd[1] != -1) 721 EVUTIL_CLOSESOCKET(base->th_notify_fd[1]); 722 base->th_notify_fd[0] = -1; 723 base->th_notify_fd[1] = -1; 724 event_debug_unassign(&base->th_notify); 725 } 726 727 /* Delete all non-internal events. */ 728 for (ev = TAILQ_FIRST(&base->eventqueue); ev; ) { 729 struct event *next = TAILQ_NEXT(ev, ev_next); 730 if (!(ev->ev_flags & EVLIST_INTERNAL)) { 731 event_del(ev); 732 ++n_deleted; 733 } 734 ev = next; 735 } 736 while ((ev = min_heap_top(&base->timeheap)) != NULL) { 737 event_del(ev); 738 ++n_deleted; 739 } 740 for (i = 0; i < base->n_common_timeouts; ++i) { 741 struct common_timeout_list *ctl = 742 base->common_timeout_queues[i]; 743 event_del(&ctl->timeout_event); /* Internal; doesn't count */ 744 event_debug_unassign(&ctl->timeout_event); 745 for (ev = TAILQ_FIRST(&ctl->events); ev; ) { 746 struct event *next = TAILQ_NEXT(ev, 747 ev_timeout_pos.ev_next_with_common_timeout); 748 if (!(ev->ev_flags & EVLIST_INTERNAL)) { 749 event_del(ev); 750 ++n_deleted; 751 } 752 ev = next; 753 } 754 mm_free(ctl); 755 } 756 if (base->common_timeout_queues) 757 mm_free(base->common_timeout_queues); 758 759 for (i = 0; i < base->nactivequeues; ++i) { 760 for (ev = TAILQ_FIRST(&base->activequeues[i]); ev; ) { 761 struct event *next = TAILQ_NEXT(ev, ev_active_next); 762 if (!(ev->ev_flags & EVLIST_INTERNAL)) { 763 event_del(ev); 764 ++n_deleted; 765 } 766 ev = next; 767 } 768 } 769 770 if (n_deleted) 771 event_debug(("%s: %d events were still set in base", 772 __func__, n_deleted)); 773 774 if (base->evsel != NULL && base->evsel->dealloc != NULL) 775 base->evsel->dealloc(base); 776 777 for (i = 0; i < base->nactivequeues; ++i) 778 EVUTIL_ASSERT(TAILQ_EMPTY(&base->activequeues[i])); 779 780 EVUTIL_ASSERT(min_heap_empty(&base->timeheap)); 781 min_heap_dtor(&base->timeheap); 782 783 mm_free(base->activequeues); 784 785 EVUTIL_ASSERT(TAILQ_EMPTY(&base->eventqueue)); 786 787 evmap_io_clear(&base->io); 788 evmap_signal_clear(&base->sigmap); 789 event_changelist_freemem(&base->changelist); 790 791 EVTHREAD_FREE_LOCK(base->th_base_lock, EVTHREAD_LOCKTYPE_RECURSIVE); 792 EVTHREAD_FREE_COND(base->current_event_cond); 793 794 mm_free(base); 795 } 796 797 /* reinitialize the event base after a fork */ 798 int 799 event_reinit(struct event_base *base) 800 { 801 const struct eventop *evsel; 802 int res = 0; 803 struct event *ev; 804 int was_notifiable = 0; 805 806 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 807 808 evsel = base->evsel; 809 810 #if 0 811 /* Right now, reinit always takes effect, since even if the 812 backend doesn't require it, the signal socketpair code does. 813 814 XXX 815 */ 816 /* check if this event mechanism requires reinit */ 817 if (!evsel->need_reinit) 818 goto done; 819 #endif 820 821 /* prevent internal delete */ 822 if (base->sig.ev_signal_added) { 823 /* we cannot call event_del here because the base has 824 * not been reinitialized yet. */ 825 event_queue_remove(base, &base->sig.ev_signal, 826 EVLIST_INSERTED); 827 if (base->sig.ev_signal.ev_flags & EVLIST_ACTIVE) 828 event_queue_remove(base, &base->sig.ev_signal, 829 EVLIST_ACTIVE); 830 if (base->sig.ev_signal_pair[0] != -1) 831 EVUTIL_CLOSESOCKET(base->sig.ev_signal_pair[0]); 832 if (base->sig.ev_signal_pair[1] != -1) 833 EVUTIL_CLOSESOCKET(base->sig.ev_signal_pair[1]); 834 base->sig.ev_signal_added = 0; 835 } 836 if (base->th_notify_fd[0] != -1) { 837 /* we cannot call event_del here because the base has 838 * not been reinitialized yet. */ 839 was_notifiable = 1; 840 event_queue_remove(base, &base->th_notify, 841 EVLIST_INSERTED); 842 if (base->th_notify.ev_flags & EVLIST_ACTIVE) 843 event_queue_remove(base, &base->th_notify, 844 EVLIST_ACTIVE); 845 base->sig.ev_signal_added = 0; 846 EVUTIL_CLOSESOCKET(base->th_notify_fd[0]); 847 if (base->th_notify_fd[1] != -1) 848 EVUTIL_CLOSESOCKET(base->th_notify_fd[1]); 849 base->th_notify_fd[0] = -1; 850 base->th_notify_fd[1] = -1; 851 event_debug_unassign(&base->th_notify); 852 } 853 854 if (base->evsel->dealloc != NULL) 855 base->evsel->dealloc(base); 856 base->evbase = evsel->init(base); 857 if (base->evbase == NULL) { 858 event_errx(1, "%s: could not reinitialize event mechanism", 859 __func__); 860 res = -1; 861 goto done; 862 } 863 864 event_changelist_freemem(&base->changelist); /* XXX */ 865 evmap_io_clear(&base->io); 866 evmap_signal_clear(&base->sigmap); 867 868 TAILQ_FOREACH(ev, &base->eventqueue, ev_next) { 869 if (ev->ev_events & (EV_READ|EV_WRITE)) { 870 if (ev == &base->sig.ev_signal) { 871 /* If we run into the ev_signal event, it's only 872 * in eventqueue because some signal event was 873 * added, which made evsig_add re-add ev_signal. 874 * So don't double-add it. */ 875 continue; 876 } 877 if (evmap_io_add(base, ev->ev_fd, ev) == -1) 878 res = -1; 879 } else if (ev->ev_events & EV_SIGNAL) { 880 if (evmap_signal_add(base, (int)ev->ev_fd, ev) == -1) 881 res = -1; 882 } 883 } 884 885 if (was_notifiable && res == 0) 886 res = evthread_make_base_notifiable(base); 887 888 done: 889 EVBASE_RELEASE_LOCK(base, th_base_lock); 890 return (res); 891 } 892 893 const char ** 894 event_get_supported_methods(void) 895 { 896 static const char **methods = NULL; 897 const struct eventop **method; 898 const char **tmp; 899 int i = 0, k; 900 901 /* count all methods */ 902 for (method = &eventops[0]; *method != NULL; ++method) { 903 ++i; 904 } 905 906 /* allocate one more than we need for the NULL pointer */ 907 tmp = mm_calloc((i + 1), sizeof(char *)); 908 if (tmp == NULL) 909 return (NULL); 910 911 /* populate the array with the supported methods */ 912 for (k = 0, i = 0; eventops[k] != NULL; ++k) { 913 tmp[i++] = eventops[k]->name; 914 } 915 tmp[i] = NULL; 916 917 if (methods != NULL) 918 mm_free(__UNCONST(methods)); 919 920 methods = tmp; 921 922 return (methods); 923 } 924 925 struct event_config * 926 event_config_new(void) 927 { 928 struct event_config *cfg = mm_calloc(1, sizeof(*cfg)); 929 930 if (cfg == NULL) 931 return (NULL); 932 933 TAILQ_INIT(&cfg->entries); 934 935 return (cfg); 936 } 937 938 static void 939 event_config_entry_free(struct event_config_entry *entry) 940 { 941 if (entry->avoid_method != NULL) 942 mm_free(__UNCONST(entry->avoid_method)); 943 mm_free(entry); 944 } 945 946 void 947 event_config_free(struct event_config *cfg) 948 { 949 struct event_config_entry *entry; 950 951 while ((entry = TAILQ_FIRST(&cfg->entries)) != NULL) { 952 TAILQ_REMOVE(&cfg->entries, entry, next); 953 event_config_entry_free(entry); 954 } 955 mm_free(cfg); 956 } 957 958 int 959 event_config_set_flag(struct event_config *cfg, int flag) 960 { 961 if (!cfg) 962 return -1; 963 cfg->flags |= flag; 964 return 0; 965 } 966 967 int 968 event_config_avoid_method(struct event_config *cfg, const char *method) 969 { 970 struct event_config_entry *entry = mm_malloc(sizeof(*entry)); 971 if (entry == NULL) 972 return (-1); 973 974 if ((entry->avoid_method = mm_strdup(method)) == NULL) { 975 mm_free(entry); 976 return (-1); 977 } 978 979 TAILQ_INSERT_TAIL(&cfg->entries, entry, next); 980 981 return (0); 982 } 983 984 int 985 event_config_require_features(struct event_config *cfg, 986 int features) 987 { 988 if (!cfg) 989 return (-1); 990 cfg->require_features = features; 991 return (0); 992 } 993 994 int 995 event_config_set_num_cpus_hint(struct event_config *cfg, int cpus) 996 { 997 if (!cfg) 998 return (-1); 999 cfg->n_cpus_hint = cpus; 1000 return (0); 1001 } 1002 1003 int 1004 event_priority_init(int npriorities) 1005 { 1006 return event_base_priority_init(current_base, npriorities); 1007 } 1008 1009 int 1010 event_base_priority_init(struct event_base *base, int npriorities) 1011 { 1012 int i; 1013 1014 if (N_ACTIVE_CALLBACKS(base) || npriorities < 1 1015 || npriorities >= EVENT_MAX_PRIORITIES) 1016 return (-1); 1017 1018 if (npriorities == base->nactivequeues) 1019 return (0); 1020 1021 if (base->nactivequeues) { 1022 mm_free(base->activequeues); 1023 base->nactivequeues = 0; 1024 } 1025 1026 /* Allocate our priority queues */ 1027 base->activequeues = (struct event_list *) 1028 mm_calloc(npriorities, sizeof(struct event_list)); 1029 if (base->activequeues == NULL) { 1030 event_warn("%s: calloc", __func__); 1031 return (-1); 1032 } 1033 base->nactivequeues = npriorities; 1034 1035 for (i = 0; i < base->nactivequeues; ++i) { 1036 TAILQ_INIT(&base->activequeues[i]); 1037 } 1038 1039 return (0); 1040 } 1041 1042 /* Returns true iff we're currently watching any events. */ 1043 static int 1044 event_haveevents(struct event_base *base) 1045 { 1046 /* Caller must hold th_base_lock */ 1047 return (base->virtual_event_count > 0 || base->event_count > 0); 1048 } 1049 1050 /* "closure" function called when processing active signal events */ 1051 static inline void 1052 event_signal_closure(struct event_base *base, struct event *ev) 1053 { 1054 short ncalls; 1055 int should_break; 1056 1057 /* Allows deletes to work */ 1058 ncalls = ev->ev_ncalls; 1059 if (ncalls != 0) 1060 ev->ev_pncalls = &ncalls; 1061 EVBASE_RELEASE_LOCK(base, th_base_lock); 1062 while (ncalls) { 1063 ncalls--; 1064 ev->ev_ncalls = ncalls; 1065 if (ncalls == 0) 1066 ev->ev_pncalls = NULL; 1067 (*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg); 1068 1069 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 1070 should_break = base->event_break; 1071 EVBASE_RELEASE_LOCK(base, th_base_lock); 1072 1073 if (should_break) { 1074 if (ncalls != 0) 1075 ev->ev_pncalls = NULL; 1076 return; 1077 } 1078 } 1079 } 1080 1081 /* Common timeouts are special timeouts that are handled as queues rather than 1082 * in the minheap. This is more efficient than the minheap if we happen to 1083 * know that we're going to get several thousands of timeout events all with 1084 * the same timeout value. 1085 * 1086 * Since all our timeout handling code assumes timevals can be copied, 1087 * assigned, etc, we can't use "magic pointer" to encode these common 1088 * timeouts. Searching through a list to see if every timeout is common could 1089 * also get inefficient. Instead, we take advantage of the fact that tv_usec 1090 * is 32 bits long, but only uses 20 of those bits (since it can never be over 1091 * 999999.) We use the top bits to encode 4 bites of magic number, and 8 bits 1092 * of index into the event_base's aray of common timeouts. 1093 */ 1094 1095 #define MICROSECONDS_MASK COMMON_TIMEOUT_MICROSECONDS_MASK 1096 #define COMMON_TIMEOUT_IDX_MASK 0x0ff00000 1097 #define COMMON_TIMEOUT_IDX_SHIFT 20 1098 #define COMMON_TIMEOUT_MASK 0xf0000000 1099 #define COMMON_TIMEOUT_MAGIC 0x50000000 1100 1101 #define COMMON_TIMEOUT_IDX(tv) \ 1102 (((tv)->tv_usec & COMMON_TIMEOUT_IDX_MASK)>>COMMON_TIMEOUT_IDX_SHIFT) 1103 1104 /** Return true iff if 'tv' is a common timeout in 'base' */ 1105 static inline int 1106 is_common_timeout(const struct timeval *tv, 1107 const struct event_base *base) 1108 { 1109 int idx; 1110 if ((tv->tv_usec & COMMON_TIMEOUT_MASK) != COMMON_TIMEOUT_MAGIC) 1111 return 0; 1112 idx = COMMON_TIMEOUT_IDX(tv); 1113 return idx < base->n_common_timeouts; 1114 } 1115 1116 /* True iff tv1 and tv2 have the same common-timeout index, or if neither 1117 * one is a common timeout. */ 1118 static inline int 1119 is_same_common_timeout(const struct timeval *tv1, const struct timeval *tv2) 1120 { 1121 return (tv1->tv_usec & ~MICROSECONDS_MASK) == 1122 (tv2->tv_usec & ~MICROSECONDS_MASK); 1123 } 1124 1125 /** Requires that 'tv' is a common timeout. Return the corresponding 1126 * common_timeout_list. */ 1127 static inline struct common_timeout_list * 1128 get_common_timeout_list(struct event_base *base, const struct timeval *tv) 1129 { 1130 return base->common_timeout_queues[COMMON_TIMEOUT_IDX(tv)]; 1131 } 1132 1133 #if 0 1134 static inline int 1135 common_timeout_ok(const struct timeval *tv, 1136 struct event_base *base) 1137 { 1138 const struct timeval *expect = 1139 &get_common_timeout_list(base, tv)->duration; 1140 return tv->tv_sec == expect->tv_sec && 1141 tv->tv_usec == expect->tv_usec; 1142 } 1143 #endif 1144 1145 /* Add the timeout for the first event in given common timeout list to the 1146 * event_base's minheap. */ 1147 static void 1148 common_timeout_schedule(struct common_timeout_list *ctl, 1149 const struct timeval *now, struct event *head) 1150 { 1151 struct timeval timeout = head->ev_timeout; 1152 timeout.tv_usec &= MICROSECONDS_MASK; 1153 event_add_internal(&ctl->timeout_event, &timeout, 1); 1154 } 1155 1156 /* Callback: invoked when the timeout for a common timeout queue triggers. 1157 * This means that (at least) the first event in that queue should be run, 1158 * and the timeout should be rescheduled if there are more events. */ 1159 static void 1160 common_timeout_callback(evutil_socket_t fd, short what, void *arg) 1161 { 1162 struct timeval now; 1163 struct common_timeout_list *ctl = arg; 1164 struct event_base *base = ctl->base; 1165 struct event *ev = NULL; 1166 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 1167 gettime(base, &now); 1168 while (1) { 1169 ev = TAILQ_FIRST(&ctl->events); 1170 if (!ev || ev->ev_timeout.tv_sec > now.tv_sec || 1171 (ev->ev_timeout.tv_sec == now.tv_sec && 1172 (ev->ev_timeout.tv_usec&MICROSECONDS_MASK) > now.tv_usec)) 1173 break; 1174 event_del_internal(ev); 1175 event_active_nolock(ev, EV_TIMEOUT, 1); 1176 } 1177 if (ev) 1178 common_timeout_schedule(ctl, &now, ev); 1179 EVBASE_RELEASE_LOCK(base, th_base_lock); 1180 } 1181 1182 #define MAX_COMMON_TIMEOUTS 256 1183 1184 const struct timeval * 1185 event_base_init_common_timeout(struct event_base *base, 1186 const struct timeval *duration) 1187 { 1188 int i; 1189 struct timeval tv; 1190 const struct timeval *result=NULL; 1191 struct common_timeout_list *new_ctl; 1192 1193 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 1194 if (duration->tv_usec > 1000000) { 1195 memcpy(&tv, duration, sizeof(struct timeval)); 1196 if (is_common_timeout(duration, base)) 1197 tv.tv_usec &= MICROSECONDS_MASK; 1198 tv.tv_sec += tv.tv_usec / 1000000; 1199 tv.tv_usec %= 1000000; 1200 duration = &tv; 1201 } 1202 for (i = 0; i < base->n_common_timeouts; ++i) { 1203 const struct common_timeout_list *ctl = 1204 base->common_timeout_queues[i]; 1205 if (duration->tv_sec == ctl->duration.tv_sec && 1206 duration->tv_usec == 1207 (ctl->duration.tv_usec & MICROSECONDS_MASK)) { 1208 EVUTIL_ASSERT(is_common_timeout(&ctl->duration, base)); 1209 result = &ctl->duration; 1210 goto done; 1211 } 1212 } 1213 if (base->n_common_timeouts == MAX_COMMON_TIMEOUTS) { 1214 event_warnx("%s: Too many common timeouts already in use; " 1215 "we only support %d per event_base", __func__, 1216 MAX_COMMON_TIMEOUTS); 1217 goto done; 1218 } 1219 if (base->n_common_timeouts_allocated == base->n_common_timeouts) { 1220 int n = base->n_common_timeouts < 16 ? 16 : 1221 base->n_common_timeouts*2; 1222 struct common_timeout_list **newqueues = 1223 mm_realloc(base->common_timeout_queues, 1224 n*sizeof(struct common_timeout_queue *)); 1225 if (!newqueues) { 1226 event_warn("%s: realloc",__func__); 1227 goto done; 1228 } 1229 base->n_common_timeouts_allocated = n; 1230 base->common_timeout_queues = newqueues; 1231 } 1232 new_ctl = mm_calloc(1, sizeof(struct common_timeout_list)); 1233 if (!new_ctl) { 1234 event_warn("%s: calloc",__func__); 1235 goto done; 1236 } 1237 TAILQ_INIT(&new_ctl->events); 1238 new_ctl->duration.tv_sec = duration->tv_sec; 1239 new_ctl->duration.tv_usec = 1240 duration->tv_usec | COMMON_TIMEOUT_MAGIC | 1241 (base->n_common_timeouts << COMMON_TIMEOUT_IDX_SHIFT); 1242 evtimer_assign(&new_ctl->timeout_event, base, 1243 common_timeout_callback, new_ctl); 1244 new_ctl->timeout_event.ev_flags |= EVLIST_INTERNAL; 1245 event_priority_set(&new_ctl->timeout_event, 0); 1246 new_ctl->base = base; 1247 base->common_timeout_queues[base->n_common_timeouts++] = new_ctl; 1248 result = &new_ctl->duration; 1249 1250 done: 1251 if (result) 1252 EVUTIL_ASSERT(is_common_timeout(result, base)); 1253 1254 EVBASE_RELEASE_LOCK(base, th_base_lock); 1255 return result; 1256 } 1257 1258 /* Closure function invoked when we're activating a persistent event. */ 1259 static inline void 1260 event_persist_closure(struct event_base *base, struct event *ev) 1261 { 1262 // Define our callback, we use this to store our callback before it's executed 1263 void (*evcb_callback)(evutil_socket_t, short, void *); 1264 1265 // Other fields of *ev that must be stored before executing 1266 evutil_socket_t evcb_fd; 1267 short evcb_res; 1268 void *evcb_arg; 1269 1270 /* reschedule the persistent event if we have a timeout. */ 1271 if (ev->ev_io_timeout.tv_sec || ev->ev_io_timeout.tv_usec) { 1272 /* If there was a timeout, we want it to run at an interval of 1273 * ev_io_timeout after the last time it was _scheduled_ for, 1274 * not ev_io_timeout after _now_. If it fired for another 1275 * reason, though, the timeout ought to start ticking _now_. */ 1276 struct timeval run_at, relative_to, delay, now; 1277 ev_uint32_t usec_mask = 0; 1278 EVUTIL_ASSERT(is_same_common_timeout(&ev->ev_timeout, 1279 &ev->ev_io_timeout)); 1280 gettime(base, &now); 1281 if (is_common_timeout(&ev->ev_timeout, base)) { 1282 delay = ev->ev_io_timeout; 1283 usec_mask = delay.tv_usec & ~MICROSECONDS_MASK; 1284 delay.tv_usec &= MICROSECONDS_MASK; 1285 if (ev->ev_res & EV_TIMEOUT) { 1286 relative_to = ev->ev_timeout; 1287 relative_to.tv_usec &= MICROSECONDS_MASK; 1288 } else { 1289 relative_to = now; 1290 } 1291 } else { 1292 delay = ev->ev_io_timeout; 1293 if (ev->ev_res & EV_TIMEOUT) { 1294 relative_to = ev->ev_timeout; 1295 } else { 1296 relative_to = now; 1297 } 1298 } 1299 evutil_timeradd(&relative_to, &delay, &run_at); 1300 if (evutil_timercmp(&run_at, &now, <)) { 1301 /* Looks like we missed at least one invocation due to 1302 * a clock jump, not running the event loop for a 1303 * while, really slow callbacks, or 1304 * something. Reschedule relative to now. 1305 */ 1306 evutil_timeradd(&now, &delay, &run_at); 1307 } 1308 run_at.tv_usec |= usec_mask; 1309 event_add_internal(ev, &run_at, 1); 1310 } 1311 1312 // Save our callback before we release the lock 1313 evcb_callback = ev->ev_callback; 1314 evcb_fd = ev->ev_fd; 1315 evcb_res = ev->ev_res; 1316 evcb_arg = ev->ev_arg; 1317 1318 // Release the lock 1319 EVBASE_RELEASE_LOCK(base, th_base_lock); 1320 1321 // Execute the callback 1322 (evcb_callback)(evcb_fd, evcb_res, evcb_arg); 1323 } 1324 1325 /* 1326 Helper for event_process_active to process all the events in a single queue, 1327 releasing the lock as we go. This function requires that the lock be held 1328 when it's invoked. Returns -1 if we get a signal or an event_break that 1329 means we should stop processing any active events now. Otherwise returns 1330 the number of non-internal events that we processed. 1331 */ 1332 static int 1333 event_process_active_single_queue(struct event_base *base, 1334 struct event_list *activeq) 1335 { 1336 struct event *ev; 1337 int count = 0; 1338 1339 EVUTIL_ASSERT(activeq != NULL); 1340 1341 for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) { 1342 if (ev->ev_events & EV_PERSIST) 1343 event_queue_remove(base, ev, EVLIST_ACTIVE); 1344 else 1345 event_del_internal(ev); 1346 if (!(ev->ev_flags & EVLIST_INTERNAL)) 1347 ++count; 1348 1349 event_debug(( 1350 "event_process_active: event: %p, %s%scall %p", 1351 ev, 1352 ev->ev_res & EV_READ ? "EV_READ " : " ", 1353 ev->ev_res & EV_WRITE ? "EV_WRITE " : " ", 1354 ev->ev_callback)); 1355 1356 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 1357 base->current_event = ev; 1358 base->current_event_waiters = 0; 1359 #endif 1360 1361 switch (ev->ev_closure) { 1362 case EV_CLOSURE_SIGNAL: 1363 event_signal_closure(base, ev); 1364 break; 1365 case EV_CLOSURE_PERSIST: 1366 event_persist_closure(base, ev); 1367 break; 1368 default: 1369 case EV_CLOSURE_NONE: 1370 EVBASE_RELEASE_LOCK(base, th_base_lock); 1371 (*ev->ev_callback)( 1372 ev->ev_fd, ev->ev_res, ev->ev_arg); 1373 break; 1374 } 1375 1376 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 1377 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 1378 base->current_event = NULL; 1379 if (base->current_event_waiters) { 1380 base->current_event_waiters = 0; 1381 EVTHREAD_COND_BROADCAST(base->current_event_cond); 1382 } 1383 #endif 1384 1385 if (base->event_break) 1386 return -1; 1387 if (base->event_continue) 1388 break; 1389 } 1390 return count; 1391 } 1392 1393 /* 1394 Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If 1395 *breakptr becomes set to 1, stop. Requires that we start out holding 1396 the lock on 'queue'; releases the lock around 'queue' for each deferred_cb 1397 we process. 1398 */ 1399 static int 1400 event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr) 1401 { 1402 int count = 0; 1403 struct deferred_cb *cb; 1404 1405 #define MAX_DEFERRED 16 1406 while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) { 1407 cb->queued = 0; 1408 TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); 1409 --queue->active_count; 1410 UNLOCK_DEFERRED_QUEUE(queue); 1411 1412 cb->cb(cb, cb->arg); 1413 1414 LOCK_DEFERRED_QUEUE(queue); 1415 if (*breakptr) 1416 return -1; 1417 if (++count == MAX_DEFERRED) 1418 break; 1419 } 1420 #undef MAX_DEFERRED 1421 return count; 1422 } 1423 1424 /* 1425 * Active events are stored in priority queues. Lower priorities are always 1426 * process before higher priorities. Low priority events can starve high 1427 * priority ones. 1428 */ 1429 1430 static int 1431 event_process_active(struct event_base *base) 1432 { 1433 /* Caller must hold th_base_lock */ 1434 struct event_list *activeq = NULL; 1435 int i, c = 0; 1436 1437 for (i = 0; i < base->nactivequeues; ++i) { 1438 if (TAILQ_FIRST(&base->activequeues[i]) != NULL) { 1439 base->event_running_priority = i; 1440 activeq = &base->activequeues[i]; 1441 c = event_process_active_single_queue(base, activeq); 1442 if (c < 0) { 1443 base->event_running_priority = -1; 1444 return -1; 1445 } else if (c > 0) 1446 break; /* Processed a real event; do not 1447 * consider lower-priority events */ 1448 /* If we get here, all of the events we processed 1449 * were internal. Continue. */ 1450 } 1451 } 1452 1453 event_process_deferred_callbacks(&base->defer_queue,&base->event_break); 1454 base->event_running_priority = -1; 1455 return c; 1456 } 1457 1458 /* 1459 * Wait continuously for events. We exit only if no events are left. 1460 */ 1461 1462 int 1463 event_dispatch(void) 1464 { 1465 return (event_loop(0)); 1466 } 1467 1468 int 1469 event_base_dispatch(struct event_base *event_base) 1470 { 1471 return (event_base_loop(event_base, 0)); 1472 } 1473 1474 const char * 1475 event_base_get_method(const struct event_base *base) 1476 { 1477 EVUTIL_ASSERT(base); 1478 return (base->evsel->name); 1479 } 1480 1481 /** Callback: used to implement event_base_loopexit by telling the event_base 1482 * that it's time to exit its loop. */ 1483 static void 1484 event_loopexit_cb(evutil_socket_t fd, short what, void *arg) 1485 { 1486 struct event_base *base = arg; 1487 base->event_gotterm = 1; 1488 } 1489 1490 int 1491 event_loopexit(const struct timeval *tv) 1492 { 1493 return (event_once(-1, EV_TIMEOUT, event_loopexit_cb, 1494 current_base, tv)); 1495 } 1496 1497 int 1498 event_base_loopexit(struct event_base *event_base, const struct timeval *tv) 1499 { 1500 return (event_base_once(event_base, -1, EV_TIMEOUT, event_loopexit_cb, 1501 event_base, tv)); 1502 } 1503 1504 int 1505 event_loopbreak(void) 1506 { 1507 return (event_base_loopbreak(current_base)); 1508 } 1509 1510 int 1511 event_base_loopbreak(struct event_base *event_base) 1512 { 1513 int r = 0; 1514 if (event_base == NULL) 1515 return (-1); 1516 1517 EVBASE_ACQUIRE_LOCK(event_base, th_base_lock); 1518 event_base->event_break = 1; 1519 1520 if (EVBASE_NEED_NOTIFY(event_base)) { 1521 r = evthread_notify_base(event_base); 1522 } else { 1523 r = (0); 1524 } 1525 EVBASE_RELEASE_LOCK(event_base, th_base_lock); 1526 return r; 1527 } 1528 1529 int 1530 event_base_got_break(struct event_base *event_base) 1531 { 1532 int res; 1533 EVBASE_ACQUIRE_LOCK(event_base, th_base_lock); 1534 res = event_base->event_break; 1535 EVBASE_RELEASE_LOCK(event_base, th_base_lock); 1536 return res; 1537 } 1538 1539 int 1540 event_base_got_exit(struct event_base *event_base) 1541 { 1542 int res; 1543 EVBASE_ACQUIRE_LOCK(event_base, th_base_lock); 1544 res = event_base->event_gotterm; 1545 EVBASE_RELEASE_LOCK(event_base, th_base_lock); 1546 return res; 1547 } 1548 1549 /* not thread safe */ 1550 1551 int 1552 event_loop(int flags) 1553 { 1554 return event_base_loop(current_base, flags); 1555 } 1556 1557 int 1558 event_base_loop(struct event_base *base, int flags) 1559 { 1560 const struct eventop *evsel = base->evsel; 1561 struct timeval tv; 1562 struct timeval *tv_p; 1563 int res, done, retval = 0; 1564 1565 /* Grab the lock. We will release it inside evsel.dispatch, and again 1566 * as we invoke user callbacks. */ 1567 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 1568 1569 if (base->running_loop) { 1570 event_warnx("%s: reentrant invocation. Only one event_base_loop" 1571 " can run on each event_base at once.", __func__); 1572 EVBASE_RELEASE_LOCK(base, th_base_lock); 1573 return -1; 1574 } 1575 1576 base->running_loop = 1; 1577 1578 clear_time_cache(base); 1579 1580 if (base->sig.ev_signal_added && base->sig.ev_n_signals_added) 1581 evsig_set_base(base); 1582 1583 done = 0; 1584 1585 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 1586 base->th_owner_id = EVTHREAD_GET_ID(); 1587 #endif 1588 1589 base->event_gotterm = base->event_break = 0; 1590 1591 while (!done) { 1592 base->event_continue = 0; 1593 1594 /* Terminate the loop if we have been asked to */ 1595 if (base->event_gotterm) { 1596 break; 1597 } 1598 1599 if (base->event_break) { 1600 break; 1601 } 1602 1603 timeout_correct(base, &tv); 1604 1605 tv_p = &tv; 1606 if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) { 1607 timeout_next(base, &tv_p); 1608 } else { 1609 /* 1610 * if we have active events, we just poll new events 1611 * without waiting. 1612 */ 1613 evutil_timerclear(&tv); 1614 } 1615 1616 /* If we have no events, we just exit */ 1617 if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) { 1618 event_debug(("%s: no events registered.", __func__)); 1619 retval = 1; 1620 goto done; 1621 } 1622 1623 /* update last old time */ 1624 gettime(base, &base->event_tv); 1625 1626 clear_time_cache(base); 1627 1628 res = evsel->dispatch(base, tv_p); 1629 1630 if (res == -1) { 1631 event_debug(("%s: dispatch returned unsuccessfully.", 1632 __func__)); 1633 retval = -1; 1634 goto done; 1635 } 1636 1637 update_time_cache(base); 1638 1639 timeout_process(base); 1640 1641 if (N_ACTIVE_CALLBACKS(base)) { 1642 int n = event_process_active(base); 1643 if ((flags & EVLOOP_ONCE) 1644 && N_ACTIVE_CALLBACKS(base) == 0 1645 && n != 0) 1646 done = 1; 1647 } else if (flags & EVLOOP_NONBLOCK) 1648 done = 1; 1649 } 1650 event_debug(("%s: asked to terminate loop.", __func__)); 1651 1652 done: 1653 clear_time_cache(base); 1654 base->running_loop = 0; 1655 1656 EVBASE_RELEASE_LOCK(base, th_base_lock); 1657 1658 return (retval); 1659 } 1660 1661 /* Sets up an event for processing once */ 1662 struct event_once { 1663 struct event ev; 1664 1665 void (*cb)(evutil_socket_t, short, void *); 1666 void *arg; 1667 }; 1668 1669 /* One-time callback to implement event_base_once: invokes the user callback, 1670 * then deletes the allocated storage */ 1671 static void 1672 event_once_cb(evutil_socket_t fd, short events, void *arg) 1673 { 1674 struct event_once *eonce = arg; 1675 1676 (*eonce->cb)(fd, events, eonce->arg); 1677 event_debug_unassign(&eonce->ev); 1678 mm_free(eonce); 1679 } 1680 1681 /* not threadsafe, event scheduled once. */ 1682 int 1683 event_once(evutil_socket_t fd, short events, 1684 void (*callback)(evutil_socket_t, short, void *), 1685 void *arg, const struct timeval *tv) 1686 { 1687 return event_base_once(current_base, fd, events, callback, arg, tv); 1688 } 1689 1690 /* Schedules an event once */ 1691 int 1692 event_base_once(struct event_base *base, evutil_socket_t fd, short events, 1693 void (*callback)(evutil_socket_t, short, void *), 1694 void *arg, const struct timeval *tv) 1695 { 1696 struct event_once *eonce; 1697 struct timeval etv; 1698 int res = 0; 1699 1700 /* We cannot support signals that just fire once, or persistent 1701 * events. */ 1702 if (events & (EV_SIGNAL|EV_PERSIST)) 1703 return (-1); 1704 1705 if ((eonce = mm_calloc(1, sizeof(struct event_once))) == NULL) 1706 return (-1); 1707 1708 eonce->cb = callback; 1709 eonce->arg = arg; 1710 1711 if (events == EV_TIMEOUT) { 1712 if (tv == NULL) { 1713 evutil_timerclear(&etv); 1714 tv = &etv; 1715 } 1716 1717 evtimer_assign(&eonce->ev, base, event_once_cb, eonce); 1718 } else if (events & (EV_READ|EV_WRITE)) { 1719 events &= EV_READ|EV_WRITE; 1720 1721 event_assign(&eonce->ev, base, fd, events, event_once_cb, eonce); 1722 } else { 1723 /* Bad event combination */ 1724 mm_free(eonce); 1725 return (-1); 1726 } 1727 1728 if (res == 0) 1729 res = event_add(&eonce->ev, tv); 1730 if (res != 0) { 1731 mm_free(eonce); 1732 return (res); 1733 } 1734 1735 return (0); 1736 } 1737 1738 int 1739 event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg) 1740 { 1741 if (!base) 1742 base = current_base; 1743 1744 _event_debug_assert_not_added(ev); 1745 1746 ev->ev_base = base; 1747 1748 ev->ev_callback = callback; 1749 ev->ev_arg = arg; 1750 ev->ev_fd = fd; 1751 ev->ev_events = events; 1752 ev->ev_res = 0; 1753 ev->ev_flags = EVLIST_INIT; 1754 ev->ev_ncalls = 0; 1755 ev->ev_pncalls = NULL; 1756 1757 if (events & EV_SIGNAL) { 1758 if ((events & (EV_READ|EV_WRITE)) != 0) { 1759 event_warnx("%s: EV_SIGNAL is not compatible with " 1760 "EV_READ or EV_WRITE", __func__); 1761 return -1; 1762 } 1763 ev->ev_closure = EV_CLOSURE_SIGNAL; 1764 } else { 1765 if (events & EV_PERSIST) { 1766 evutil_timerclear(&ev->ev_io_timeout); 1767 ev->ev_closure = EV_CLOSURE_PERSIST; 1768 } else { 1769 ev->ev_closure = EV_CLOSURE_NONE; 1770 } 1771 } 1772 1773 min_heap_elem_init(ev); 1774 1775 if (base != NULL) { 1776 /* by default, we put new events into the middle priority */ 1777 ev->ev_pri = base->nactivequeues / 2; 1778 } 1779 1780 _event_debug_note_setup(ev); 1781 1782 return 0; 1783 } 1784 1785 int 1786 event_base_set(struct event_base *base, struct event *ev) 1787 { 1788 /* Only innocent events may be assigned to a different base */ 1789 if (ev->ev_flags != EVLIST_INIT) 1790 return (-1); 1791 1792 _event_debug_assert_is_setup(ev); 1793 1794 ev->ev_base = base; 1795 ev->ev_pri = base->nactivequeues/2; 1796 1797 return (0); 1798 } 1799 1800 void 1801 event_set(struct event *ev, evutil_socket_t fd, short events, 1802 void (*callback)(evutil_socket_t, short, void *), void *arg) 1803 { 1804 int r; 1805 r = event_assign(ev, current_base, fd, events, callback, arg); 1806 EVUTIL_ASSERT(r == 0); 1807 } 1808 1809 struct event * 1810 event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg) 1811 { 1812 struct event *ev; 1813 ev = mm_malloc(sizeof(struct event)); 1814 if (ev == NULL) 1815 return (NULL); 1816 if (event_assign(ev, base, fd, events, cb, arg) < 0) { 1817 mm_free(ev); 1818 return (NULL); 1819 } 1820 1821 return (ev); 1822 } 1823 1824 void 1825 event_free(struct event *ev) 1826 { 1827 _event_debug_assert_is_setup(ev); 1828 1829 /* make sure that this event won't be coming back to haunt us. */ 1830 event_del(ev); 1831 _event_debug_note_teardown(ev); 1832 mm_free(ev); 1833 1834 } 1835 1836 void 1837 event_debug_unassign(struct event *ev) 1838 { 1839 _event_debug_assert_not_added(ev); 1840 _event_debug_note_teardown(ev); 1841 1842 ev->ev_flags &= ~EVLIST_INIT; 1843 } 1844 1845 /* 1846 * Set's the priority of an event - if an event is already scheduled 1847 * changing the priority is going to fail. 1848 */ 1849 1850 int 1851 event_priority_set(struct event *ev, int pri) 1852 { 1853 _event_debug_assert_is_setup(ev); 1854 1855 if (ev->ev_flags & EVLIST_ACTIVE) 1856 return (-1); 1857 if (pri < 0 || pri >= ev->ev_base->nactivequeues) 1858 return (-1); 1859 1860 ev->ev_pri = pri; 1861 1862 return (0); 1863 } 1864 1865 /* 1866 * Checks if a specific event is pending or scheduled. 1867 */ 1868 1869 int 1870 event_pending(const struct event *ev, short event, struct timeval *tv) 1871 { 1872 int flags = 0; 1873 1874 if (EVUTIL_FAILURE_CHECK(ev->ev_base == NULL)) { 1875 event_warnx("%s: event has no event_base set.", __func__); 1876 return 0; 1877 } 1878 1879 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); 1880 _event_debug_assert_is_setup(ev); 1881 1882 if (ev->ev_flags & EVLIST_INSERTED) 1883 flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)); 1884 if (ev->ev_flags & EVLIST_ACTIVE) 1885 flags |= ev->ev_res; 1886 if (ev->ev_flags & EVLIST_TIMEOUT) 1887 flags |= EV_TIMEOUT; 1888 1889 event &= (EV_TIMEOUT|EV_READ|EV_WRITE|EV_SIGNAL); 1890 1891 /* See if there is a timeout that we should report */ 1892 if (tv != NULL && (flags & event & EV_TIMEOUT)) { 1893 struct timeval tmp = ev->ev_timeout; 1894 tmp.tv_usec &= MICROSECONDS_MASK; 1895 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 1896 /* correctly remamp to real time */ 1897 evutil_timeradd(&ev->ev_base->tv_clock_diff, &tmp, tv); 1898 #else 1899 *tv = tmp; 1900 #endif 1901 } 1902 1903 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); 1904 1905 return (flags & event); 1906 } 1907 1908 int 1909 event_initialized(const struct event *ev) 1910 { 1911 if (!(ev->ev_flags & EVLIST_INIT)) 1912 return 0; 1913 1914 return 1; 1915 } 1916 1917 void 1918 event_get_assignment(const struct event *event, struct event_base **base_out, evutil_socket_t *fd_out, short *events_out, event_callback_fn *callback_out, void **arg_out) 1919 { 1920 _event_debug_assert_is_setup(event); 1921 1922 if (base_out) 1923 *base_out = event->ev_base; 1924 if (fd_out) 1925 *fd_out = event->ev_fd; 1926 if (events_out) 1927 *events_out = event->ev_events; 1928 if (callback_out) 1929 *callback_out = event->ev_callback; 1930 if (arg_out) 1931 *arg_out = event->ev_arg; 1932 } 1933 1934 size_t 1935 event_get_struct_event_size(void) 1936 { 1937 return sizeof(struct event); 1938 } 1939 1940 evutil_socket_t 1941 event_get_fd(const struct event *ev) 1942 { 1943 _event_debug_assert_is_setup(ev); 1944 return ev->ev_fd; 1945 } 1946 1947 struct event_base * 1948 event_get_base(const struct event *ev) 1949 { 1950 _event_debug_assert_is_setup(ev); 1951 return ev->ev_base; 1952 } 1953 1954 short 1955 event_get_events(const struct event *ev) 1956 { 1957 _event_debug_assert_is_setup(ev); 1958 return ev->ev_events; 1959 } 1960 1961 event_callback_fn 1962 event_get_callback(const struct event *ev) 1963 { 1964 _event_debug_assert_is_setup(ev); 1965 return ev->ev_callback; 1966 } 1967 1968 void * 1969 event_get_callback_arg(const struct event *ev) 1970 { 1971 _event_debug_assert_is_setup(ev); 1972 return ev->ev_arg; 1973 } 1974 1975 int 1976 event_add(struct event *ev, const struct timeval *tv) 1977 { 1978 int res; 1979 1980 if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) { 1981 event_warnx("%s: event has no event_base set.", __func__); 1982 return -1; 1983 } 1984 1985 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); 1986 1987 res = event_add_internal(ev, tv, 0); 1988 1989 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); 1990 1991 return (res); 1992 } 1993 1994 /* Helper callback: wake an event_base from another thread. This version 1995 * works by writing a byte to one end of a socketpair, so that the event_base 1996 * listening on the other end will wake up as the corresponding event 1997 * triggers */ 1998 static int 1999 evthread_notify_base_default(struct event_base *base) 2000 { 2001 char buf[1]; 2002 int r; 2003 buf[0] = (char) 0; 2004 #ifdef WIN32 2005 r = send(base->th_notify_fd[1], buf, 1, 0); 2006 #else 2007 r = write(base->th_notify_fd[1], buf, 1); 2008 #endif 2009 return (r < 0 && errno != EAGAIN) ? -1 : 0; 2010 } 2011 2012 #if defined(_EVENT_HAVE_EVENTFD) && defined(_EVENT_HAVE_SYS_EVENTFD_H) 2013 /* Helper callback: wake an event_base from another thread. This version 2014 * assumes that you have a working eventfd() implementation. */ 2015 static int 2016 evthread_notify_base_eventfd(struct event_base *base) 2017 { 2018 ev_uint64_t msg = 1; 2019 int r; 2020 do { 2021 r = write(base->th_notify_fd[0], (void*) &msg, sizeof(msg)); 2022 } while (r < 0 && errno == EAGAIN); 2023 2024 return (r < 0) ? -1 : 0; 2025 } 2026 #endif 2027 2028 /** Tell the thread currently running the event_loop for base (if any) that it 2029 * needs to stop waiting in its dispatch function (if it is) and process all 2030 * active events and deferred callbacks (if there are any). */ 2031 static int 2032 evthread_notify_base(struct event_base *base) 2033 { 2034 EVENT_BASE_ASSERT_LOCKED(base); 2035 if (!base->th_notify_fn) 2036 return -1; 2037 if (base->is_notify_pending) 2038 return 0; 2039 base->is_notify_pending = 1; 2040 return base->th_notify_fn(base); 2041 } 2042 2043 /* Implementation function to add an event. Works just like event_add, 2044 * except: 1) it requires that we have the lock. 2) if tv_is_absolute is set, 2045 * we treat tv as an absolute time, not as an interval to add to the current 2046 * time */ 2047 static inline int 2048 event_add_internal(struct event *ev, const struct timeval *tv, 2049 int tv_is_absolute) 2050 { 2051 struct event_base *base = ev->ev_base; 2052 int res = 0; 2053 int notify = 0; 2054 2055 EVENT_BASE_ASSERT_LOCKED(base); 2056 _event_debug_assert_is_setup(ev); 2057 2058 event_debug(( 2059 "event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%scall %p", 2060 ev, 2061 EV_SOCK_ARG(ev->ev_fd), 2062 ev->ev_events & EV_READ ? "EV_READ " : " ", 2063 ev->ev_events & EV_WRITE ? "EV_WRITE " : " ", 2064 tv ? "EV_TIMEOUT " : " ", 2065 ev->ev_callback)); 2066 2067 EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL)); 2068 2069 /* 2070 * prepare for timeout insertion further below, if we get a 2071 * failure on any step, we should not change any state. 2072 */ 2073 if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) { 2074 if (min_heap_reserve(&base->timeheap, 2075 1 + min_heap_size(&base->timeheap)) == -1) 2076 return (-1); /* ENOMEM == errno */ 2077 } 2078 2079 /* If the main thread is currently executing a signal event's 2080 * callback, and we are not the main thread, then we want to wait 2081 * until the callback is done before we mess with the event, or else 2082 * we can race on ev_ncalls and ev_pncalls below. */ 2083 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 2084 if (base->current_event == ev && (ev->ev_events & EV_SIGNAL) 2085 && !EVBASE_IN_THREAD(base)) { 2086 ++base->current_event_waiters; 2087 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); 2088 } 2089 #endif 2090 2091 if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && 2092 !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { 2093 if (ev->ev_events & (EV_READ|EV_WRITE)) 2094 res = evmap_io_add(base, ev->ev_fd, ev); 2095 else if (ev->ev_events & EV_SIGNAL) 2096 res = evmap_signal_add(base, (int)ev->ev_fd, ev); 2097 if (res != -1) 2098 event_queue_insert(base, ev, EVLIST_INSERTED); 2099 if (res == 1) { 2100 /* evmap says we need to notify the main thread. */ 2101 notify = 1; 2102 res = 0; 2103 } 2104 } 2105 2106 /* 2107 * we should change the timeout state only if the previous event 2108 * addition succeeded. 2109 */ 2110 if (res != -1 && tv != NULL) { 2111 struct timeval now; 2112 int common_timeout; 2113 2114 /* 2115 * for persistent timeout events, we remember the 2116 * timeout value and re-add the event. 2117 * 2118 * If tv_is_absolute, this was already set. 2119 */ 2120 if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute) 2121 ev->ev_io_timeout = *tv; 2122 2123 /* 2124 * we already reserved memory above for the case where we 2125 * are not replacing an existing timeout. 2126 */ 2127 if (ev->ev_flags & EVLIST_TIMEOUT) { 2128 /* XXX I believe this is needless. */ 2129 if (min_heap_elt_is_top(ev)) 2130 notify = 1; 2131 event_queue_remove(base, ev, EVLIST_TIMEOUT); 2132 } 2133 2134 /* Check if it is active due to a timeout. Rescheduling 2135 * this timeout before the callback can be executed 2136 * removes it from the active list. */ 2137 if ((ev->ev_flags & EVLIST_ACTIVE) && 2138 (ev->ev_res & EV_TIMEOUT)) { 2139 if (ev->ev_events & EV_SIGNAL) { 2140 /* See if we are just active executing 2141 * this event in a loop 2142 */ 2143 if (ev->ev_ncalls && ev->ev_pncalls) { 2144 /* Abort loop */ 2145 *ev->ev_pncalls = 0; 2146 } 2147 } 2148 2149 event_queue_remove(base, ev, EVLIST_ACTIVE); 2150 } 2151 2152 gettime(base, &now); 2153 2154 common_timeout = is_common_timeout(tv, base); 2155 if (tv_is_absolute) { 2156 ev->ev_timeout = *tv; 2157 } else if (common_timeout) { 2158 struct timeval tmp = *tv; 2159 tmp.tv_usec &= MICROSECONDS_MASK; 2160 evutil_timeradd(&now, &tmp, &ev->ev_timeout); 2161 ev->ev_timeout.tv_usec |= 2162 (tv->tv_usec & ~MICROSECONDS_MASK); 2163 } else { 2164 evutil_timeradd(&now, tv, &ev->ev_timeout); 2165 } 2166 2167 event_debug(( 2168 "event_add: timeout in %d seconds, call %p", 2169 (int)tv->tv_sec, ev->ev_callback)); 2170 2171 event_queue_insert(base, ev, EVLIST_TIMEOUT); 2172 if (common_timeout) { 2173 struct common_timeout_list *ctl = 2174 get_common_timeout_list(base, &ev->ev_timeout); 2175 if (ev == TAILQ_FIRST(&ctl->events)) { 2176 common_timeout_schedule(ctl, &now, ev); 2177 } 2178 } else { 2179 /* See if the earliest timeout is now earlier than it 2180 * was before: if so, we will need to tell the main 2181 * thread to wake up earlier than it would 2182 * otherwise. */ 2183 if (min_heap_elt_is_top(ev)) 2184 notify = 1; 2185 } 2186 } 2187 2188 /* if we are not in the right thread, we need to wake up the loop */ 2189 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) 2190 evthread_notify_base(base); 2191 2192 _event_debug_note_add(ev); 2193 2194 return (res); 2195 } 2196 2197 int 2198 event_del(struct event *ev) 2199 { 2200 int res; 2201 2202 if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) { 2203 event_warnx("%s: event has no event_base set.", __func__); 2204 return -1; 2205 } 2206 2207 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); 2208 2209 res = event_del_internal(ev); 2210 2211 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); 2212 2213 return (res); 2214 } 2215 2216 /* Helper for event_del: always called with th_base_lock held. */ 2217 static inline int 2218 event_del_internal(struct event *ev) 2219 { 2220 struct event_base *base; 2221 int res = 0, notify = 0; 2222 2223 event_debug(("event_del: %p (fd "EV_SOCK_FMT"), callback %p", 2224 ev, EV_SOCK_ARG(ev->ev_fd), ev->ev_callback)); 2225 2226 /* An event without a base has not been added */ 2227 if (ev->ev_base == NULL) 2228 return (-1); 2229 2230 EVENT_BASE_ASSERT_LOCKED(ev->ev_base); 2231 2232 /* If the main thread is currently executing this event's callback, 2233 * and we are not the main thread, then we want to wait until the 2234 * callback is done before we start removing the event. That way, 2235 * when this function returns, it will be safe to free the 2236 * user-supplied argument. */ 2237 base = ev->ev_base; 2238 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 2239 if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { 2240 ++base->current_event_waiters; 2241 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); 2242 } 2243 #endif 2244 2245 EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL)); 2246 2247 /* See if we are just active executing this event in a loop */ 2248 if (ev->ev_events & EV_SIGNAL) { 2249 if (ev->ev_ncalls && ev->ev_pncalls) { 2250 /* Abort loop */ 2251 *ev->ev_pncalls = 0; 2252 } 2253 } 2254 2255 if (ev->ev_flags & EVLIST_TIMEOUT) { 2256 /* NOTE: We never need to notify the main thread because of a 2257 * deleted timeout event: all that could happen if we don't is 2258 * that the dispatch loop might wake up too early. But the 2259 * point of notifying the main thread _is_ to wake up the 2260 * dispatch loop early anyway, so we wouldn't gain anything by 2261 * doing it. 2262 */ 2263 event_queue_remove(base, ev, EVLIST_TIMEOUT); 2264 } 2265 2266 if (ev->ev_flags & EVLIST_ACTIVE) 2267 event_queue_remove(base, ev, EVLIST_ACTIVE); 2268 2269 if (ev->ev_flags & EVLIST_INSERTED) { 2270 event_queue_remove(base, ev, EVLIST_INSERTED); 2271 if (ev->ev_events & (EV_READ|EV_WRITE)) 2272 res = evmap_io_del(base, ev->ev_fd, ev); 2273 else 2274 res = evmap_signal_del(base, (int)ev->ev_fd, ev); 2275 if (res == 1) { 2276 /* evmap says we need to notify the main thread. */ 2277 notify = 1; 2278 res = 0; 2279 } 2280 } 2281 2282 /* if we are not in the right thread, we need to wake up the loop */ 2283 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) 2284 evthread_notify_base(base); 2285 2286 _event_debug_note_del(ev); 2287 2288 return (res); 2289 } 2290 2291 void 2292 event_active(struct event *ev, int res, short ncalls) 2293 { 2294 if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) { 2295 event_warnx("%s: event has no event_base set.", __func__); 2296 return; 2297 } 2298 2299 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); 2300 2301 _event_debug_assert_is_setup(ev); 2302 2303 event_active_nolock(ev, res, ncalls); 2304 2305 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); 2306 } 2307 2308 2309 void 2310 event_active_nolock(struct event *ev, int res, short ncalls) 2311 { 2312 struct event_base *base; 2313 2314 event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p", 2315 ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback)); 2316 2317 2318 /* We get different kinds of events, add them together */ 2319 if (ev->ev_flags & EVLIST_ACTIVE) { 2320 ev->ev_res |= res; 2321 return; 2322 } 2323 2324 base = ev->ev_base; 2325 2326 EVENT_BASE_ASSERT_LOCKED(base); 2327 2328 ev->ev_res = res; 2329 2330 if (ev->ev_pri < base->event_running_priority) 2331 base->event_continue = 1; 2332 2333 if (ev->ev_events & EV_SIGNAL) { 2334 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 2335 if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { 2336 ++base->current_event_waiters; 2337 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); 2338 } 2339 #endif 2340 ev->ev_ncalls = ncalls; 2341 ev->ev_pncalls = NULL; 2342 } 2343 2344 event_queue_insert(base, ev, EVLIST_ACTIVE); 2345 2346 if (EVBASE_NEED_NOTIFY(base)) 2347 evthread_notify_base(base); 2348 } 2349 2350 void 2351 event_deferred_cb_init(struct deferred_cb *cb, deferred_cb_fn fn, void *arg) 2352 { 2353 memset(cb, 0, sizeof(struct deferred_cb)); 2354 cb->cb = fn; 2355 cb->arg = arg; 2356 } 2357 2358 void 2359 event_deferred_cb_cancel(struct deferred_cb_queue *queue, 2360 struct deferred_cb *cb) 2361 { 2362 if (!queue) { 2363 if (current_base) 2364 queue = ¤t_base->defer_queue; 2365 else 2366 return; 2367 } 2368 2369 LOCK_DEFERRED_QUEUE(queue); 2370 if (cb->queued) { 2371 TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); 2372 --queue->active_count; 2373 cb->queued = 0; 2374 } 2375 UNLOCK_DEFERRED_QUEUE(queue); 2376 } 2377 2378 void 2379 event_deferred_cb_schedule(struct deferred_cb_queue *queue, 2380 struct deferred_cb *cb) 2381 { 2382 if (!queue) { 2383 if (current_base) 2384 queue = ¤t_base->defer_queue; 2385 else 2386 return; 2387 } 2388 2389 LOCK_DEFERRED_QUEUE(queue); 2390 if (!cb->queued) { 2391 cb->queued = 1; 2392 TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next); 2393 ++queue->active_count; 2394 if (queue->notify_fn) 2395 queue->notify_fn(queue, queue->notify_arg); 2396 } 2397 UNLOCK_DEFERRED_QUEUE(queue); 2398 } 2399 2400 static int 2401 timeout_next(struct event_base *base, struct timeval **tv_p) 2402 { 2403 /* Caller must hold th_base_lock */ 2404 struct timeval now; 2405 struct event *ev; 2406 struct timeval *tv = *tv_p; 2407 int res = 0; 2408 2409 ev = min_heap_top(&base->timeheap); 2410 2411 if (ev == NULL) { 2412 /* if no time-based events are active wait for I/O */ 2413 *tv_p = NULL; 2414 goto out; 2415 } 2416 2417 if (gettime(base, &now) == -1) { 2418 res = -1; 2419 goto out; 2420 } 2421 2422 if (evutil_timercmp(&ev->ev_timeout, &now, <=)) { 2423 evutil_timerclear(tv); 2424 goto out; 2425 } 2426 2427 evutil_timersub(&ev->ev_timeout, &now, tv); 2428 2429 EVUTIL_ASSERT(tv->tv_sec >= 0); 2430 EVUTIL_ASSERT(tv->tv_usec >= 0); 2431 event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec)); 2432 2433 out: 2434 return (res); 2435 } 2436 2437 /* 2438 * Determines if the time is running backwards by comparing the current time 2439 * against the last time we checked. Not needed when using clock monotonic. 2440 * If time is running backwards, we adjust the firing time of every event by 2441 * the amount that time seems to have jumped. 2442 */ 2443 static void 2444 timeout_correct(struct event_base *base, struct timeval *tv) 2445 { 2446 /* Caller must hold th_base_lock. */ 2447 struct event **pev; 2448 unsigned int size; 2449 struct timeval off; 2450 int i; 2451 2452 if (use_monotonic) 2453 return; 2454 2455 /* Check if time is running backwards */ 2456 gettime(base, tv); 2457 2458 if (evutil_timercmp(tv, &base->event_tv, >=)) { 2459 base->event_tv = *tv; 2460 return; 2461 } 2462 2463 event_debug(("%s: time is running backwards, corrected", 2464 __func__)); 2465 evutil_timersub(&base->event_tv, tv, &off); 2466 2467 /* 2468 * We can modify the key element of the node without destroying 2469 * the minheap property, because we change every element. 2470 */ 2471 pev = base->timeheap.p; 2472 size = base->timeheap.n; 2473 for (; size-- > 0; ++pev) { 2474 struct timeval *ev_tv = &(**pev).ev_timeout; 2475 evutil_timersub(ev_tv, &off, ev_tv); 2476 } 2477 for (i=0; i<base->n_common_timeouts; ++i) { 2478 struct event *ev; 2479 struct common_timeout_list *ctl = 2480 base->common_timeout_queues[i]; 2481 TAILQ_FOREACH(ev, &ctl->events, 2482 ev_timeout_pos.ev_next_with_common_timeout) { 2483 struct timeval *ev_tv = &ev->ev_timeout; 2484 ev_tv->tv_usec &= MICROSECONDS_MASK; 2485 evutil_timersub(ev_tv, &off, ev_tv); 2486 ev_tv->tv_usec |= COMMON_TIMEOUT_MAGIC | 2487 (i<<COMMON_TIMEOUT_IDX_SHIFT); 2488 } 2489 } 2490 2491 /* Now remember what the new time turned out to be. */ 2492 base->event_tv = *tv; 2493 } 2494 2495 /* Activate every event whose timeout has elapsed. */ 2496 static void 2497 timeout_process(struct event_base *base) 2498 { 2499 /* Caller must hold lock. */ 2500 struct timeval now; 2501 struct event *ev; 2502 2503 if (min_heap_empty(&base->timeheap)) { 2504 return; 2505 } 2506 2507 gettime(base, &now); 2508 2509 while ((ev = min_heap_top(&base->timeheap))) { 2510 if (evutil_timercmp(&ev->ev_timeout, &now, >)) 2511 break; 2512 2513 /* delete this event from the I/O queues */ 2514 event_del_internal(ev); 2515 2516 event_debug(("timeout_process: call %p", 2517 ev->ev_callback)); 2518 event_active_nolock(ev, EV_TIMEOUT, 1); 2519 } 2520 } 2521 2522 /* Remove 'ev' from 'queue' (EVLIST_...) in base. */ 2523 static void 2524 event_queue_remove(struct event_base *base, struct event *ev, int queue) 2525 { 2526 EVENT_BASE_ASSERT_LOCKED(base); 2527 2528 if (!(ev->ev_flags & queue)) { 2529 event_errx(1, "%s: %p(fd "EV_SOCK_FMT") not on queue %x", __func__, 2530 ev, EV_SOCK_ARG(ev->ev_fd), queue); 2531 return; 2532 } 2533 2534 if (~ev->ev_flags & EVLIST_INTERNAL) 2535 base->event_count--; 2536 2537 ev->ev_flags &= ~queue; 2538 switch (queue) { 2539 case EVLIST_INSERTED: 2540 TAILQ_REMOVE(&base->eventqueue, ev, ev_next); 2541 break; 2542 case EVLIST_ACTIVE: 2543 base->event_count_active--; 2544 TAILQ_REMOVE(&base->activequeues[ev->ev_pri], 2545 ev, ev_active_next); 2546 break; 2547 case EVLIST_TIMEOUT: 2548 if (is_common_timeout(&ev->ev_timeout, base)) { 2549 struct common_timeout_list *ctl = 2550 get_common_timeout_list(base, &ev->ev_timeout); 2551 TAILQ_REMOVE(&ctl->events, ev, 2552 ev_timeout_pos.ev_next_with_common_timeout); 2553 } else { 2554 min_heap_erase(&base->timeheap, ev); 2555 } 2556 break; 2557 default: 2558 event_errx(1, "%s: unknown queue %x", __func__, queue); 2559 } 2560 } 2561 2562 /* Add 'ev' to the common timeout list in 'ev'. */ 2563 static void 2564 insert_common_timeout_inorder(struct common_timeout_list *ctl, 2565 struct event *ev) 2566 { 2567 struct event *e; 2568 /* By all logic, we should just be able to append 'ev' to the end of 2569 * ctl->events, since the timeout on each 'ev' is set to {the common 2570 * timeout} + {the time when we add the event}, and so the events 2571 * should arrive in order of their timeeouts. But just in case 2572 * there's some wacky threading issue going on, we do a search from 2573 * the end of 'ev' to find the right insertion point. 2574 */ 2575 TAILQ_FOREACH_REVERSE(e, &ctl->events, 2576 event_list, ev_timeout_pos.ev_next_with_common_timeout) { 2577 /* This timercmp is a little sneaky, since both ev and e have 2578 * magic values in tv_usec. Fortunately, they ought to have 2579 * the _same_ magic values in tv_usec. Let's assert for that. 2580 */ 2581 EVUTIL_ASSERT( 2582 is_same_common_timeout(&e->ev_timeout, &ev->ev_timeout)); 2583 if (evutil_timercmp(&ev->ev_timeout, &e->ev_timeout, >=)) { 2584 TAILQ_INSERT_AFTER(&ctl->events, e, ev, 2585 ev_timeout_pos.ev_next_with_common_timeout); 2586 return; 2587 } 2588 } 2589 TAILQ_INSERT_HEAD(&ctl->events, ev, 2590 ev_timeout_pos.ev_next_with_common_timeout); 2591 } 2592 2593 static void 2594 event_queue_insert(struct event_base *base, struct event *ev, int queue) 2595 { 2596 EVENT_BASE_ASSERT_LOCKED(base); 2597 2598 if (ev->ev_flags & queue) { 2599 /* Double insertion is possible for active events */ 2600 if (queue & EVLIST_ACTIVE) 2601 return; 2602 2603 event_errx(1, "%s: %p(fd "EV_SOCK_FMT") already on queue %x", __func__, 2604 ev, EV_SOCK_ARG(ev->ev_fd), queue); 2605 return; 2606 } 2607 2608 if (~ev->ev_flags & EVLIST_INTERNAL) 2609 base->event_count++; 2610 2611 ev->ev_flags |= queue; 2612 switch (queue) { 2613 case EVLIST_INSERTED: 2614 TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next); 2615 break; 2616 case EVLIST_ACTIVE: 2617 base->event_count_active++; 2618 TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri], 2619 ev,ev_active_next); 2620 break; 2621 case EVLIST_TIMEOUT: { 2622 if (is_common_timeout(&ev->ev_timeout, base)) { 2623 struct common_timeout_list *ctl = 2624 get_common_timeout_list(base, &ev->ev_timeout); 2625 insert_common_timeout_inorder(ctl, ev); 2626 } else 2627 min_heap_push(&base->timeheap, ev); 2628 break; 2629 } 2630 default: 2631 event_errx(1, "%s: unknown queue %x", __func__, queue); 2632 } 2633 } 2634 2635 /* Functions for debugging */ 2636 2637 const char * 2638 event_get_version(void) 2639 { 2640 return (_EVENT_VERSION); 2641 } 2642 2643 ev_uint32_t 2644 event_get_version_number(void) 2645 { 2646 return (_EVENT_NUMERIC_VERSION); 2647 } 2648 2649 /* 2650 * No thread-safe interface needed - the information should be the same 2651 * for all threads. 2652 */ 2653 2654 const char * 2655 event_get_method(void) 2656 { 2657 return (current_base->evsel->name); 2658 } 2659 2660 #ifndef _EVENT_DISABLE_MM_REPLACEMENT 2661 static void *(*_mm_malloc_fn)(size_t sz) = NULL; 2662 static void *(*_mm_realloc_fn)(void *p, size_t sz) = NULL; 2663 static void (*_mm_free_fn)(void *p) = NULL; 2664 2665 void * 2666 event_mm_malloc_(size_t sz) 2667 { 2668 if (_mm_malloc_fn) 2669 return _mm_malloc_fn(sz); 2670 else 2671 return malloc(sz); 2672 } 2673 2674 void * 2675 event_mm_calloc_(size_t count, size_t size) 2676 { 2677 if (_mm_malloc_fn) { 2678 size_t sz = count * size; 2679 void *p = _mm_malloc_fn(sz); 2680 if (p) 2681 memset(p, 0, sz); 2682 return p; 2683 } else 2684 return calloc(count, size); 2685 } 2686 2687 char * 2688 event_mm_strdup_(const char *str) 2689 { 2690 if (_mm_malloc_fn) { 2691 size_t ln = strlen(str); 2692 void *p = _mm_malloc_fn(ln+1); 2693 if (p) 2694 memcpy(p, str, ln+1); 2695 return p; 2696 } else 2697 #ifdef WIN32 2698 return _strdup(str); 2699 #else 2700 return strdup(str); 2701 #endif 2702 } 2703 2704 void * 2705 event_mm_realloc_(void *ptr, size_t sz) 2706 { 2707 if (_mm_realloc_fn) 2708 return _mm_realloc_fn(ptr, sz); 2709 else 2710 return realloc(ptr, sz); 2711 } 2712 2713 void 2714 event_mm_free_(void *ptr) 2715 { 2716 if (_mm_free_fn) 2717 _mm_free_fn(ptr); 2718 else 2719 free(ptr); 2720 } 2721 2722 void 2723 event_set_mem_functions(void *(*malloc_fn)(size_t sz), 2724 void *(*realloc_fn)(void *ptr, size_t sz), 2725 void (*free_fn)(void *ptr)) 2726 { 2727 _mm_malloc_fn = malloc_fn; 2728 _mm_realloc_fn = realloc_fn; 2729 _mm_free_fn = free_fn; 2730 } 2731 #endif 2732 2733 #if defined(_EVENT_HAVE_EVENTFD) && defined(_EVENT_HAVE_SYS_EVENTFD_H) 2734 static void 2735 evthread_notify_drain_eventfd(evutil_socket_t fd, short what, void *arg) 2736 { 2737 ev_uint64_t msg; 2738 ev_ssize_t r; 2739 struct event_base *base = arg; 2740 2741 r = read(fd, (void*) &msg, sizeof(msg)); 2742 if (r<0 && errno != EAGAIN) { 2743 event_sock_warn(fd, "Error reading from eventfd"); 2744 } 2745 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 2746 base->is_notify_pending = 0; 2747 EVBASE_RELEASE_LOCK(base, th_base_lock); 2748 } 2749 #endif 2750 2751 static void 2752 evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg) 2753 { 2754 unsigned char buf[1024]; 2755 struct event_base *base = arg; 2756 #ifdef WIN32 2757 while (recv(fd, (char*)buf, sizeof(buf), 0) > 0) 2758 ; 2759 #else 2760 while (read(fd, (char*)buf, sizeof(buf)) > 0) 2761 ; 2762 #endif 2763 2764 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 2765 base->is_notify_pending = 0; 2766 EVBASE_RELEASE_LOCK(base, th_base_lock); 2767 } 2768 2769 int 2770 evthread_make_base_notifiable(struct event_base *base) 2771 { 2772 void (*cb)(evutil_socket_t, short, void *) = evthread_notify_drain_default; 2773 int (*notify)(struct event_base *) = evthread_notify_base_default; 2774 2775 /* XXXX grab the lock here? */ 2776 if (!base) 2777 return -1; 2778 2779 if (base->th_notify_fd[0] >= 0) 2780 return 0; 2781 2782 #if defined(_EVENT_HAVE_EVENTFD) && defined(_EVENT_HAVE_SYS_EVENTFD_H) 2783 #ifndef EFD_CLOEXEC 2784 #define EFD_CLOEXEC 0 2785 #endif 2786 base->th_notify_fd[0] = eventfd(0, EFD_CLOEXEC); 2787 if (base->th_notify_fd[0] >= 0) { 2788 evutil_make_socket_closeonexec(base->th_notify_fd[0]); 2789 notify = evthread_notify_base_eventfd; 2790 cb = evthread_notify_drain_eventfd; 2791 } 2792 #endif 2793 #if defined(_EVENT_HAVE_PIPE) 2794 if (base->th_notify_fd[0] < 0) { 2795 if ((base->evsel->features & EV_FEATURE_FDS)) { 2796 if (pipe(base->th_notify_fd) < 0) { 2797 event_warn("%s: pipe", __func__); 2798 } else { 2799 evutil_make_socket_closeonexec(base->th_notify_fd[0]); 2800 evutil_make_socket_closeonexec(base->th_notify_fd[1]); 2801 } 2802 } 2803 } 2804 #endif 2805 2806 #ifdef WIN32 2807 #define LOCAL_SOCKETPAIR_AF AF_INET 2808 #else 2809 #define LOCAL_SOCKETPAIR_AF AF_UNIX 2810 #endif 2811 if (base->th_notify_fd[0] < 0) { 2812 if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, 2813 base->th_notify_fd) == -1) { 2814 event_sock_warn(-1, "%s: socketpair", __func__); 2815 return (-1); 2816 } else { 2817 evutil_make_socket_closeonexec(base->th_notify_fd[0]); 2818 evutil_make_socket_closeonexec(base->th_notify_fd[1]); 2819 } 2820 } 2821 2822 evutil_make_socket_nonblocking(base->th_notify_fd[0]); 2823 2824 base->th_notify_fn = notify; 2825 2826 /* 2827 Making the second socket nonblocking is a bit subtle, given that we 2828 ignore any EAGAIN returns when writing to it, and you don't usally 2829 do that for a nonblocking socket. But if the kernel gives us EAGAIN, 2830 then there's no need to add any more data to the buffer, since 2831 the main thread is already either about to wake up and drain it, 2832 or woken up and in the process of draining it. 2833 */ 2834 if (base->th_notify_fd[1] > 0) 2835 evutil_make_socket_nonblocking(base->th_notify_fd[1]); 2836 2837 /* prepare an event that we can use for wakeup */ 2838 event_assign(&base->th_notify, base, base->th_notify_fd[0], 2839 EV_READ|EV_PERSIST, cb, base); 2840 2841 /* we need to mark this as internal event */ 2842 base->th_notify.ev_flags |= EVLIST_INTERNAL; 2843 event_priority_set(&base->th_notify, 0); 2844 2845 return event_add(&base->th_notify, NULL); 2846 } 2847 2848 void 2849 event_base_dump_events(struct event_base *base, FILE *output) 2850 { 2851 struct event *e; 2852 int i; 2853 fprintf(output, "Inserted events:\n"); 2854 TAILQ_FOREACH(e, &base->eventqueue, ev_next) { 2855 fprintf(output, " %p [fd "EV_SOCK_FMT"]%s%s%s%s%s\n", 2856 (void*)e, EV_SOCK_ARG(e->ev_fd), 2857 (e->ev_events&EV_READ)?" Read":"", 2858 (e->ev_events&EV_WRITE)?" Write":"", 2859 (e->ev_events&EV_SIGNAL)?" Signal":"", 2860 (e->ev_events&EV_TIMEOUT)?" Timeout":"", 2861 (e->ev_events&EV_PERSIST)?" Persist":""); 2862 2863 } 2864 for (i = 0; i < base->nactivequeues; ++i) { 2865 if (TAILQ_EMPTY(&base->activequeues[i])) 2866 continue; 2867 fprintf(output, "Active events [priority %d]:\n", i); 2868 TAILQ_FOREACH(e, &base->eventqueue, ev_next) { 2869 fprintf(output, " %p [fd "EV_SOCK_FMT"]%s%s%s%s\n", 2870 (void*)e, EV_SOCK_ARG(e->ev_fd), 2871 (e->ev_res&EV_READ)?" Read active":"", 2872 (e->ev_res&EV_WRITE)?" Write active":"", 2873 (e->ev_res&EV_SIGNAL)?" Signal active":"", 2874 (e->ev_res&EV_TIMEOUT)?" Timeout active":""); 2875 } 2876 } 2877 } 2878 2879 void 2880 event_base_add_virtual(struct event_base *base) 2881 { 2882 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 2883 base->virtual_event_count++; 2884 EVBASE_RELEASE_LOCK(base, th_base_lock); 2885 } 2886 2887 void 2888 event_base_del_virtual(struct event_base *base) 2889 { 2890 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 2891 EVUTIL_ASSERT(base->virtual_event_count > 0); 2892 base->virtual_event_count--; 2893 if (base->virtual_event_count == 0 && EVBASE_NEED_NOTIFY(base)) 2894 evthread_notify_base(base); 2895 EVBASE_RELEASE_LOCK(base, th_base_lock); 2896 } 2897 2898 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 2899 int 2900 event_global_setup_locks_(const int enable_locks) 2901 { 2902 #ifndef _EVENT_DISABLE_DEBUG_MODE 2903 EVTHREAD_SETUP_GLOBAL_LOCK(_event_debug_map_lock, 0); 2904 #endif 2905 if (evsig_global_setup_locks_(enable_locks) < 0) 2906 return -1; 2907 if (evutil_secure_rng_global_setup_locks_(enable_locks) < 0) 2908 return -1; 2909 return 0; 2910 } 2911 #endif 2912 2913 void 2914 event_base_assert_ok(struct event_base *base) 2915 { 2916 int i; 2917 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 2918 evmap_check_integrity(base); 2919 2920 /* Check the heap property */ 2921 for (i = 1; i < (int)base->timeheap.n; ++i) { 2922 int parent = (i - 1) / 2; 2923 struct event *ev, *p_ev; 2924 ev = base->timeheap.p[i]; 2925 p_ev = base->timeheap.p[parent]; 2926 EVUTIL_ASSERT(ev->ev_flags & EV_TIMEOUT); 2927 EVUTIL_ASSERT(evutil_timercmp(&p_ev->ev_timeout, &ev->ev_timeout, <=)); 2928 EVUTIL_ASSERT(ev->ev_timeout_pos.min_heap_idx == i); 2929 } 2930 2931 /* Check that the common timeouts are fine */ 2932 for (i = 0; i < base->n_common_timeouts; ++i) { 2933 struct common_timeout_list *ctl = base->common_timeout_queues[i]; 2934 struct event *last=NULL, *ev; 2935 TAILQ_FOREACH(ev, &ctl->events, ev_timeout_pos.ev_next_with_common_timeout) { 2936 if (last) 2937 EVUTIL_ASSERT(evutil_timercmp(&last->ev_timeout, &ev->ev_timeout, <=)); 2938 EVUTIL_ASSERT(ev->ev_flags & EV_TIMEOUT); 2939 EVUTIL_ASSERT(is_common_timeout(&ev->ev_timeout,base)); 2940 EVUTIL_ASSERT(COMMON_TIMEOUT_IDX(&ev->ev_timeout) == i); 2941 last = ev; 2942 } 2943 } 2944 2945 EVBASE_RELEASE_LOCK(base, th_base_lock); 2946 } 2947