1 /* $NetBSD: regress_bufferevent.c,v 1.7 2024/08/18 20:47:23 christos Exp $ */ 2 3 /* 4 * Copyright (c) 2003-2007 Niels Provos <provos@citi.umich.edu> 5 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 #include "util-internal.h" 30 31 /* The old tests here need assertions to work. */ 32 #undef NDEBUG 33 34 /** 35 * - clang supports __has_feature 36 * - gcc supports __SANITIZE_ADDRESS__ 37 * 38 * Let's set __SANITIZE_ADDRESS__ if __has_feature(address_sanitizer) 39 */ 40 #ifndef __has_feature 41 #define __has_feature(x) 0 42 #endif 43 #if !defined(__SANITIZE_ADDRESS__) && __has_feature(address_sanitizer) 44 #define __SANITIZE_ADDRESS__ 45 #endif 46 47 #ifdef _WIN32 48 #include <winsock2.h> 49 #include <windows.h> 50 #endif 51 52 #include "event2/event-config.h" 53 54 #include <sys/types.h> 55 #include <sys/stat.h> 56 #ifdef EVENT__HAVE_SYS_TIME_H 57 #include <sys/time.h> 58 #endif 59 #include <sys/queue.h> 60 #ifndef _WIN32 61 #include <sys/socket.h> 62 #include <sys/wait.h> 63 #include <signal.h> 64 #include <unistd.h> 65 #include <netdb.h> 66 #include <netinet/in.h> 67 #endif 68 #include <fcntl.h> 69 #include <signal.h> 70 #include <stdlib.h> 71 #include <stdio.h> 72 #include <string.h> 73 #include <errno.h> 74 #include <assert.h> 75 76 #ifdef EVENT__HAVE_ARPA_INET_H 77 #include <arpa/inet.h> 78 #endif 79 80 #include "event2/event-config.h" 81 #include "event2/event.h" 82 #include "event2/event_struct.h" 83 #include "event2/event_compat.h" 84 #include "event2/tag.h" 85 #include "event2/buffer.h" 86 #include "event2/bufferevent.h" 87 #include "event2/bufferevent_compat.h" 88 #include "event2/bufferevent_struct.h" 89 #include "event2/listener.h" 90 #include "event2/util.h" 91 92 #include "bufferevent-internal.h" 93 #include "evthread-internal.h" 94 #include "util-internal.h" 95 #ifdef _WIN32 96 #include "iocp-internal.h" 97 #endif 98 99 #include "regress.h" 100 #include "regress_testutils.h" 101 102 /* 103 * simple bufferevent test 104 */ 105 106 static void 107 readcb(struct bufferevent *bev, void *arg) 108 { 109 if (evbuffer_get_length(bev->input) == 8333) { 110 struct evbuffer *evbuf = evbuffer_new(); 111 assert(evbuf != NULL); 112 113 /* gratuitous test of bufferevent_read_buffer */ 114 bufferevent_read_buffer(bev, evbuf); 115 116 bufferevent_disable(bev, EV_READ); 117 118 if (evbuffer_get_length(evbuf) == 8333) { 119 test_ok++; 120 } 121 122 evbuffer_free(evbuf); 123 } 124 } 125 126 static void 127 writecb(struct bufferevent *bev, void *arg) 128 { 129 if (evbuffer_get_length(bev->output) == 0) { 130 test_ok++; 131 } 132 } 133 134 static void 135 errorcb(struct bufferevent *bev, short what, void *arg) 136 { 137 test_ok = -2; 138 } 139 140 static void 141 test_bufferevent_impl(int use_pair, int flush) 142 { 143 struct bufferevent *bev1 = NULL, *bev2 = NULL; 144 char buffer[8333]; 145 int i; 146 int expected = 2; 147 148 if (use_pair) { 149 struct bufferevent *pair[2]; 150 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 151 bev1 = pair[0]; 152 bev2 = pair[1]; 153 bufferevent_setcb(bev1, readcb, writecb, errorcb, bev1); 154 bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL); 155 tt_fd_op(bufferevent_getfd(bev1), ==, EVUTIL_INVALID_SOCKET); 156 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL); 157 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, bev2); 158 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, bev1); 159 } else { 160 bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL); 161 bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL); 162 tt_fd_op(bufferevent_getfd(bev1), ==, pair[0]); 163 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL); 164 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL); 165 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL); 166 } 167 168 { 169 /* Test getcb. */ 170 bufferevent_data_cb r, w; 171 bufferevent_event_cb e; 172 void *a; 173 bufferevent_getcb(bev1, &r, &w, &e, &a); 174 tt_ptr_op(r, ==, readcb); 175 tt_ptr_op(w, ==, writecb); 176 tt_ptr_op(e, ==, errorcb); 177 tt_ptr_op(a, ==, use_pair ? bev1 : NULL); 178 } 179 180 bufferevent_disable(bev1, EV_READ); 181 bufferevent_enable(bev2, EV_READ); 182 183 tt_int_op(bufferevent_get_enabled(bev1), ==, EV_WRITE); 184 tt_int_op(bufferevent_get_enabled(bev2), ==, EV_WRITE|EV_READ); 185 186 for (i = 0; i < (int)sizeof(buffer); i++) 187 buffer[i] = i; 188 189 bufferevent_write(bev1, buffer, sizeof(buffer)); 190 if (flush >= 0) { 191 tt_int_op(bufferevent_flush(bev1, EV_WRITE, flush), >=, 0); 192 } 193 194 event_dispatch(); 195 196 bufferevent_free(bev2); 197 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL); 198 bufferevent_free(bev1); 199 200 /** Only pair call errorcb for BEV_FINISHED */ 201 if (use_pair && flush == BEV_FINISHED) { 202 expected = -1; 203 } 204 if (test_ok != expected) 205 test_ok = 0; 206 end: 207 ; 208 } 209 210 static void test_bufferevent(void) { test_bufferevent_impl(0, -1); } 211 static void test_bufferevent_pair(void) { test_bufferevent_impl(1, -1); } 212 213 static void test_bufferevent_flush_normal(void) { test_bufferevent_impl(0, BEV_NORMAL); } 214 static void test_bufferevent_flush_flush(void) { test_bufferevent_impl(0, BEV_FLUSH); } 215 static void test_bufferevent_flush_finished(void) { test_bufferevent_impl(0, BEV_FINISHED); } 216 217 static void test_bufferevent_pair_flush_normal(void) { test_bufferevent_impl(1, BEV_NORMAL); } 218 static void test_bufferevent_pair_flush_flush(void) { test_bufferevent_impl(1, BEV_FLUSH); } 219 static void test_bufferevent_pair_flush_finished(void) { test_bufferevent_impl(1, BEV_FINISHED); } 220 221 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) && !defined(__SANITIZE_ADDRESS__) 222 /** 223 * Trace lock/unlock/alloc/free for locks. 224 * (More heavier then evthread_debug*) 225 */ 226 typedef struct 227 { 228 void *lock; 229 enum { 230 ALLOC, FREE, 231 } status; 232 size_t locked /** allow recursive locking */; 233 } lock_wrapper; 234 struct lock_unlock_base 235 { 236 /* Original callbacks */ 237 struct evthread_lock_callbacks cbs; 238 /* Map of locks */ 239 lock_wrapper *locks; 240 size_t nr_locks; 241 } lu_base = { 242 .locks = NULL, 243 }; 244 245 static lock_wrapper *lu_find(void *lock_) 246 { 247 size_t i; 248 for (i = 0; i < lu_base.nr_locks; ++i) { 249 lock_wrapper *lock = &lu_base.locks[i]; 250 if (lock->lock == lock_) 251 return lock; 252 } 253 return NULL; 254 } 255 256 static void *trace_lock_alloc(unsigned locktype) 257 { 258 void *lock; 259 ++lu_base.nr_locks; 260 lu_base.locks = realloc(lu_base.locks, 261 sizeof(lock_wrapper) * lu_base.nr_locks); 262 lock = lu_base.cbs.alloc(locktype); 263 lu_base.locks[lu_base.nr_locks - 1] = (lock_wrapper){ lock, ALLOC, 0 }; 264 return lock; 265 } 266 static void trace_lock_free(void *lock_, unsigned locktype) 267 { 268 lock_wrapper *lock = lu_find(lock_); 269 if (!lock || lock->status == FREE || lock->locked) { 270 TT_FAIL(("lock: free error")); 271 } else { 272 lock->status = FREE; 273 lu_base.cbs.free(lock_, locktype); 274 } 275 } 276 static int trace_lock_lock(unsigned mode, void *lock_) 277 { 278 lock_wrapper *lock = lu_find(lock_); 279 if (!lock || lock->status == FREE) { 280 TT_FAIL(("lock: lock error")); 281 return -1; 282 } else { 283 ++lock->locked; 284 return lu_base.cbs.lock(mode, lock_); 285 } 286 } 287 static int trace_lock_unlock(unsigned mode, void *lock_) 288 { 289 lock_wrapper *lock = lu_find(lock_); 290 if (!lock || lock->status == FREE || !lock->locked) { 291 TT_FAIL(("lock: unlock error")); 292 return -1; 293 } else { 294 --lock->locked; 295 return lu_base.cbs.unlock(mode, lock_); 296 } 297 } 298 static void lock_unlock_free_thread_cbs(void) 299 { 300 event_base_free(NULL); 301 302 if (libevent_tests_running_in_debug_mode) 303 libevent_global_shutdown(); 304 305 /** drop immutable flag */ 306 evthread_set_lock_callbacks(NULL); 307 /** avoid calling of event_global_setup_locks_() for new cbs */ 308 libevent_global_shutdown(); 309 /** drop immutable flag for non-debug ops (since called after shutdown) */ 310 evthread_set_lock_callbacks(NULL); 311 } 312 313 static int use_lock_unlock_profiler(void) 314 { 315 struct evthread_lock_callbacks cbs = { 316 EVTHREAD_LOCK_API_VERSION, 317 EVTHREAD_LOCKTYPE_RECURSIVE, 318 trace_lock_alloc, 319 trace_lock_free, 320 trace_lock_lock, 321 trace_lock_unlock, 322 }; 323 memcpy(&lu_base.cbs, evthread_get_lock_callbacks(), 324 sizeof(lu_base.cbs)); 325 { 326 lock_unlock_free_thread_cbs(); 327 328 evthread_set_lock_callbacks(&cbs); 329 /** re-create debug locks correctly */ 330 evthread_enable_lock_debugging(); 331 332 event_init(); 333 } 334 return 0; 335 } 336 static void free_lock_unlock_profiler(struct basic_test_data *data) 337 { 338 /** fix "held_by" for kqueue */ 339 evthread_set_lock_callbacks(NULL); 340 341 lock_unlock_free_thread_cbs(); 342 free(lu_base.locks); 343 data->base = NULL; 344 } 345 346 static void test_bufferevent_pair_release_lock(void *arg) 347 { 348 struct basic_test_data *data = arg; 349 use_lock_unlock_profiler(); 350 { 351 struct bufferevent *pair[2]; 352 if (!bufferevent_pair_new(NULL, BEV_OPT_THREADSAFE, pair)) { 353 bufferevent_free(pair[0]); 354 bufferevent_free(pair[1]); 355 } else 356 tt_abort_perror("bufferevent_pair_new"); 357 } 358 free_lock_unlock_profiler(data); 359 end: 360 ; 361 } 362 #endif 363 364 /* 365 * test watermarks and bufferevent 366 */ 367 368 static void 369 wm_readcb(struct bufferevent *bev, void *arg) 370 { 371 struct evbuffer *evbuf = evbuffer_new(); 372 int len = (int)evbuffer_get_length(bev->input); 373 static int nread; 374 375 assert(len >= 10 && len <= 20); 376 377 assert(evbuf != NULL); 378 379 /* gratuitous test of bufferevent_read_buffer */ 380 bufferevent_read_buffer(bev, evbuf); 381 382 nread += len; 383 if (nread == 65000) { 384 bufferevent_disable(bev, EV_READ); 385 test_ok++; 386 } 387 388 evbuffer_free(evbuf); 389 } 390 391 static void 392 wm_writecb(struct bufferevent *bev, void *arg) 393 { 394 assert(evbuffer_get_length(bev->output) <= 100); 395 if (evbuffer_get_length(bev->output) == 0) { 396 evbuffer_drain(bev->output, evbuffer_get_length(bev->output)); 397 test_ok++; 398 } 399 } 400 401 static void 402 wm_errorcb(struct bufferevent *bev, short what, void *arg) 403 { 404 test_ok = -2; 405 } 406 407 static void 408 test_bufferevent_watermarks_impl(int use_pair) 409 { 410 struct bufferevent *bev1 = NULL, *bev2 = NULL; 411 char buffer[65000]; 412 size_t low, high; 413 int i; 414 test_ok = 0; 415 416 if (use_pair) { 417 struct bufferevent *pair[2]; 418 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 419 bev1 = pair[0]; 420 bev2 = pair[1]; 421 bufferevent_setcb(bev1, NULL, wm_writecb, errorcb, NULL); 422 bufferevent_setcb(bev2, wm_readcb, NULL, errorcb, NULL); 423 } else { 424 bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL); 425 bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL); 426 } 427 tt_assert(bev1); 428 tt_assert(bev2); 429 bufferevent_disable(bev1, EV_READ); 430 bufferevent_enable(bev2, EV_READ); 431 432 /* By default, low watermarks are set to 0 */ 433 bufferevent_getwatermark(bev1, EV_READ, &low, NULL); 434 tt_int_op(low, ==, 0); 435 bufferevent_getwatermark(bev2, EV_WRITE, &low, NULL); 436 tt_int_op(low, ==, 0); 437 438 for (i = 0; i < (int)sizeof(buffer); i++) 439 buffer[i] = (char)i; 440 441 /* limit the reading on the receiving bufferevent */ 442 bufferevent_setwatermark(bev2, EV_READ, 10, 20); 443 444 bufferevent_getwatermark(bev2, EV_READ, &low, &high); 445 tt_int_op(low, ==, 10); 446 tt_int_op(high, ==, 20); 447 448 /* Tell the sending bufferevent not to notify us till it's down to 449 100 bytes. */ 450 bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000); 451 452 bufferevent_getwatermark(bev1, EV_WRITE, &low, &high); 453 tt_int_op(low, ==, 100); 454 tt_int_op(high, ==, 2000); 455 456 { 457 int r = bufferevent_getwatermark(bev1, EV_WRITE | EV_READ, &low, &high); 458 tt_int_op(r, !=, 0); 459 } 460 461 bufferevent_write(bev1, buffer, sizeof(buffer)); 462 463 event_dispatch(); 464 465 tt_int_op(test_ok, ==, 2); 466 467 /* The write callback drained all the data from outbuf, so we 468 * should have removed the write event... */ 469 tt_assert(!event_pending(&bev2->ev_write, EV_WRITE, NULL)); 470 471 end: 472 if (bev1) 473 bufferevent_free(bev1); 474 if (bev2) 475 bufferevent_free(bev2); 476 } 477 478 static void 479 test_bufferevent_watermarks(void) 480 { 481 test_bufferevent_watermarks_impl(0); 482 } 483 484 static void 485 test_bufferevent_pair_watermarks(void) 486 { 487 test_bufferevent_watermarks_impl(1); 488 } 489 490 /* 491 * Test bufferevent filters 492 */ 493 494 /* strip an 'x' from each byte */ 495 496 static enum bufferevent_filter_result 497 bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst, 498 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx) 499 { 500 const unsigned char *buffer; 501 unsigned i; 502 503 buffer = evbuffer_pullup(src, evbuffer_get_length(src)); 504 for (i = 0; i < evbuffer_get_length(src); i += 2) { 505 if (buffer[i] == '-') 506 continue; 507 508 assert(buffer[i] == 'x'); 509 evbuffer_add(dst, buffer + i + 1, 1); 510 } 511 512 evbuffer_drain(src, i); 513 return (BEV_OK); 514 } 515 516 /* add an 'x' before each byte */ 517 518 static enum bufferevent_filter_result 519 bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst, 520 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx) 521 { 522 const unsigned char *buffer; 523 unsigned i; 524 struct bufferevent **bevp = ctx; 525 526 ++test_ok; 527 528 if (test_ok == 1) { 529 buffer = evbuffer_pullup(src, evbuffer_get_length(src)); 530 for (i = 0; i < evbuffer_get_length(src); ++i) { 531 evbuffer_add(dst, "x", 1); 532 evbuffer_add(dst, buffer + i, 1); 533 } 534 evbuffer_drain(src, evbuffer_get_length(src)); 535 } else { 536 return BEV_ERROR; 537 } 538 539 if (bevp && test_ok == 1) { 540 int prev = ++test_ok; 541 bufferevent_write(*bevp, "-", 1); 542 /* check that during this bufferevent_write() 543 * bufferevent_output_filter() will not be called again */ 544 assert(test_ok == prev); 545 --test_ok; 546 } 547 548 return (BEV_OK); 549 } 550 551 static void 552 test_bufferevent_filters_impl(int use_pair, int disable) 553 { 554 struct bufferevent *bev1 = NULL, *bev2 = NULL; 555 struct bufferevent *bev1_base = NULL, *bev2_base = NULL; 556 char buffer[8333]; 557 int i; 558 559 test_ok = 0; 560 561 if (use_pair) { 562 struct bufferevent *pair[2]; 563 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 564 bev1 = pair[0]; 565 bev2 = pair[1]; 566 } else { 567 bev1 = bufferevent_socket_new(NULL, pair[0], 0); 568 bev2 = bufferevent_socket_new(NULL, pair[1], 0); 569 } 570 bev1_base = bev1; 571 bev2_base = bev2; 572 573 for (i = 0; i < (int)sizeof(buffer); i++) 574 buffer[i] = i; 575 576 bev1 = bufferevent_filter_new(bev1, NULL, bufferevent_output_filter, 577 BEV_OPT_CLOSE_ON_FREE, NULL, 578 disable ? &bev1 : NULL); 579 580 bev2 = bufferevent_filter_new(bev2, bufferevent_input_filter, 581 NULL, BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 582 bufferevent_setcb(bev1, NULL, writecb, errorcb, NULL); 583 bufferevent_setcb(bev2, readcb, NULL, errorcb, NULL); 584 585 tt_ptr_op(bufferevent_get_underlying(bev1), ==, bev1_base); 586 tt_ptr_op(bufferevent_get_underlying(bev2), ==, bev2_base); 587 tt_fd_op(bufferevent_getfd(bev1), ==, bufferevent_getfd(bev1_base)); 588 tt_fd_op(bufferevent_getfd(bev2), ==, bufferevent_getfd(bev2_base)); 589 590 bufferevent_disable(bev1, EV_READ); 591 bufferevent_enable(bev2, EV_READ); 592 /* insert some filters */ 593 bufferevent_write(bev1, buffer, sizeof(buffer)); 594 595 event_dispatch(); 596 597 if (test_ok != 3 + !!disable) 598 test_ok = 0; 599 600 end: 601 if (bev1) 602 bufferevent_free(bev1); 603 if (bev2) 604 bufferevent_free(bev2); 605 606 } 607 608 static void test_bufferevent_filters(void) 609 { test_bufferevent_filters_impl(0, 0); } 610 static void test_bufferevent_pair_filters(void) 611 { test_bufferevent_filters_impl(1, 0); } 612 static void test_bufferevent_filters_disable(void) 613 { test_bufferevent_filters_impl(0, 1); } 614 static void test_bufferevent_pair_filters_disable(void) 615 { test_bufferevent_filters_impl(1, 1); } 616 617 618 static void 619 sender_writecb(struct bufferevent *bev, void *ctx) 620 { 621 if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) { 622 bufferevent_disable(bev,EV_READ|EV_WRITE); 623 TT_BLATHER(("Flushed %d: freeing it.", (int)bufferevent_getfd(bev))); 624 bufferevent_free(bev); 625 } 626 } 627 628 static void 629 sender_errorcb(struct bufferevent *bev, short what, void *ctx) 630 { 631 TT_FAIL(("Got sender error %d",(int)what)); 632 } 633 634 static int bufferevent_connect_test_flags = 0; 635 static int bufferevent_trigger_test_flags = 0; 636 static int n_strings_read = 0; 637 static int n_reads_invoked = 0; 638 static int n_events_invoked = 0; 639 640 #define TEST_STR "Now is the time for all good events to signal for " \ 641 "the good of their protocol" 642 static void 643 listen_cb(struct evconnlistener *listener, evutil_socket_t fd, 644 struct sockaddr *sa, int socklen, void *arg) 645 { 646 struct event_base *base = arg; 647 struct bufferevent *bev; 648 const char s[] = TEST_STR; 649 TT_BLATHER(("Got a request on socket %d", (int)fd )); 650 bev = bufferevent_socket_new(base, fd, bufferevent_connect_test_flags); 651 tt_assert(bev); 652 bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL); 653 bufferevent_write(bev, s, sizeof(s)); 654 end: 655 ; 656 } 657 658 static evutil_socket_t 659 fake_listener_create(struct sockaddr_in *localhost) 660 { 661 struct sockaddr *sa = (struct sockaddr *)localhost; 662 evutil_socket_t fd = -1; 663 ev_socklen_t slen = sizeof(*localhost); 664 665 memset(localhost, 0, sizeof(*localhost)); 666 localhost->sin_port = 0; /* have the kernel pick a port */ 667 localhost->sin_addr.s_addr = htonl(0x7f000001L); 668 localhost->sin_family = AF_INET; 669 670 /* bind, but don't listen or accept. should trigger 671 "Connection refused" reliably on most platforms. */ 672 fd = socket(localhost->sin_family, SOCK_STREAM, 0); 673 tt_assert(fd >= 0); 674 tt_assert(bind(fd, sa, slen) == 0); 675 tt_assert(getsockname(fd, sa, &slen) == 0); 676 677 return fd; 678 679 end: 680 return -1; 681 } 682 683 static void 684 reader_eventcb(struct bufferevent *bev, short what, void *ctx) 685 { 686 struct event_base *base = ctx; 687 if (what & BEV_EVENT_ERROR) { 688 perror("foobar"); 689 TT_FAIL(("got connector error %d", (int)what)); 690 return; 691 } 692 if (what & BEV_EVENT_CONNECTED) { 693 TT_BLATHER(("connected on %d", (int)bufferevent_getfd(bev))); 694 bufferevent_enable(bev, EV_READ); 695 } 696 if (what & BEV_EVENT_EOF) { 697 char buf[512]; 698 size_t n; 699 n = bufferevent_read(bev, buf, sizeof(buf)-1); 700 tt_int_op(n, >=, 0); 701 buf[n] = '\0'; 702 tt_str_op(buf, ==, TEST_STR); 703 if (++n_strings_read == 2) 704 event_base_loopexit(base, NULL); 705 TT_BLATHER(("EOF on %d: %d strings read.", 706 (int)bufferevent_getfd(bev), n_strings_read)); 707 } 708 end: 709 ; 710 } 711 712 static void 713 reader_eventcb_simple(struct bufferevent *bev, short what, void *ctx) 714 { 715 TT_BLATHER(("Read eventcb simple invoked on %d.", 716 (int)bufferevent_getfd(bev))); 717 n_events_invoked++; 718 } 719 720 static void 721 reader_readcb(struct bufferevent *bev, void *ctx) 722 { 723 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev))); 724 n_reads_invoked++; 725 } 726 727 static void 728 test_bufferevent_connect(void *arg) 729 { 730 struct basic_test_data *data = arg; 731 struct evconnlistener *lev=NULL; 732 struct bufferevent *bev1=NULL, *bev2=NULL; 733 struct sockaddr_in localhost; 734 struct sockaddr_storage ss; 735 struct sockaddr *sa; 736 ev_socklen_t slen; 737 738 int be_flags=BEV_OPT_CLOSE_ON_FREE; 739 740 if (strstr((char*)data->setup_data, "defer")) { 741 be_flags |= BEV_OPT_DEFER_CALLBACKS; 742 } 743 if (strstr((char*)data->setup_data, "unlocked")) { 744 be_flags |= BEV_OPT_UNLOCK_CALLBACKS; 745 } 746 if (strstr((char*)data->setup_data, "lock")) { 747 be_flags |= BEV_OPT_THREADSAFE; 748 } 749 bufferevent_connect_test_flags = be_flags; 750 #ifdef _WIN32 751 if (!strcmp((char*)data->setup_data, "unset_connectex")) { 752 struct win32_extension_fns *ext = 753 (struct win32_extension_fns *) 754 event_get_win32_extension_fns_(); 755 ext->ConnectEx = NULL; 756 } 757 #endif 758 759 memset(&localhost, 0, sizeof(localhost)); 760 761 localhost.sin_port = 0; /* pick-a-port */ 762 localhost.sin_addr.s_addr = htonl(0x7f000001L); 763 localhost.sin_family = AF_INET; 764 sa = (struct sockaddr *)&localhost; 765 lev = evconnlistener_new_bind(data->base, listen_cb, data->base, 766 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, 767 16, sa, sizeof(localhost)); 768 tt_assert(lev); 769 770 sa = (struct sockaddr *)&ss; 771 slen = sizeof(ss); 772 if (regress_get_listener_addr(lev, sa, &slen) < 0) { 773 tt_abort_perror("getsockname"); 774 } 775 776 tt_assert(!evconnlistener_enable(lev)); 777 bev1 = bufferevent_socket_new(data->base, -1, be_flags); 778 bev2 = bufferevent_socket_new(data->base, -1, be_flags); 779 tt_assert(bev1); 780 tt_assert(bev2); 781 bufferevent_setcb(bev1, reader_readcb,NULL, reader_eventcb, data->base); 782 bufferevent_setcb(bev2, reader_readcb,NULL, reader_eventcb, data->base); 783 784 bufferevent_enable(bev1, EV_READ); 785 bufferevent_enable(bev2, EV_READ); 786 787 tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost))); 788 tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost))); 789 790 event_base_dispatch(data->base); 791 792 tt_int_op(n_strings_read, ==, 2); 793 tt_int_op(n_reads_invoked, >=, 2); 794 end: 795 if (lev) 796 evconnlistener_free(lev); 797 798 if (bev1) 799 bufferevent_free(bev1); 800 801 if (bev2) 802 bufferevent_free(bev2); 803 } 804 805 static void 806 close_socket_cb(evutil_socket_t fd, short what, void *arg) 807 { 808 evutil_socket_t *fdp = arg; 809 if (*fdp >= 0) { 810 evutil_closesocket(*fdp); 811 *fdp = -1; 812 } 813 } 814 815 static void 816 test_bufferevent_connect_fail_eventcb(void *arg) 817 { 818 struct basic_test_data *data = arg; 819 int flags = BEV_OPT_CLOSE_ON_FREE | (long)data->setup_data; 820 struct event close_listener_event; 821 struct bufferevent *bev = NULL; 822 struct evconnlistener *lev = NULL; 823 struct sockaddr_in localhost; 824 struct timeval close_timeout = { 0, 300000 }; 825 ev_socklen_t slen = sizeof(localhost); 826 evutil_socket_t fake_listener = -1; 827 int r; 828 829 fake_listener = fake_listener_create(&localhost); 830 831 tt_int_op(n_events_invoked, ==, 0); 832 833 bev = bufferevent_socket_new(data->base, -1, flags); 834 tt_assert(bev); 835 bufferevent_setcb(bev, reader_readcb, reader_readcb, 836 reader_eventcb_simple, data->base); 837 bufferevent_enable(bev, EV_READ|EV_WRITE); 838 tt_int_op(n_events_invoked, ==, 0); 839 tt_int_op(n_reads_invoked, ==, 0); 840 841 /** @see also test_bufferevent_connect_fail() */ 842 r = bufferevent_socket_connect(bev, (struct sockaddr *)&localhost, slen); 843 /* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells 844 * detects the error immediately, which is not really wrong of it. */ 845 tt_want(r == 0 || r == -1); 846 847 tt_int_op(n_events_invoked, ==, 0); 848 tt_int_op(n_reads_invoked, ==, 0); 849 850 /* Close the listener socket after a delay. This should trigger 851 "connection refused" on some other platforms, including OSX. */ 852 evtimer_assign(&close_listener_event, data->base, close_socket_cb, 853 &fake_listener); 854 event_add(&close_listener_event, &close_timeout); 855 856 event_base_dispatch(data->base); 857 tt_int_op(n_events_invoked, ==, 1); 858 tt_int_op(n_reads_invoked, ==, 0); 859 860 end: 861 if (lev) 862 evconnlistener_free(lev); 863 if (bev) 864 bufferevent_free(bev); 865 if (fake_listener >= 0) 866 evutil_closesocket(fake_listener); 867 } 868 869 static void 870 want_fail_eventcb(struct bufferevent *bev, short what, void *ctx) 871 { 872 struct event_base *base = ctx; 873 const char *err; 874 evutil_socket_t s; 875 876 if (what & BEV_EVENT_ERROR) { 877 s = bufferevent_getfd(bev); 878 err = evutil_socket_error_to_string(evutil_socket_geterror(s)); 879 TT_BLATHER(("connection failure on "EV_SOCK_FMT": %s", 880 EV_SOCK_ARG(s), err)); 881 test_ok = 1; 882 } else { 883 TT_FAIL(("didn't fail? what %hd", what)); 884 } 885 886 event_base_loopexit(base, NULL); 887 } 888 889 static void 890 test_bufferevent_connect_fail(void *arg) 891 { 892 struct basic_test_data *data = (struct basic_test_data *)arg; 893 struct bufferevent *bev=NULL; 894 struct event close_listener_event; 895 int close_listener_event_added = 0; 896 struct timeval close_timeout = { 0, 300000 }; 897 struct sockaddr_in localhost; 898 ev_socklen_t slen = sizeof(localhost); 899 evutil_socket_t fake_listener = -1; 900 int r; 901 902 test_ok = 0; 903 904 fake_listener = fake_listener_create(&localhost); 905 bev = bufferevent_socket_new(data->base, -1, 906 BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS); 907 tt_assert(bev); 908 bufferevent_setcb(bev, NULL, NULL, want_fail_eventcb, data->base); 909 910 r = bufferevent_socket_connect(bev, (struct sockaddr *)&localhost, slen); 911 /* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells 912 * detects the error immediately, which is not really wrong of it. */ 913 tt_want(r == 0 || r == -1); 914 915 /* Close the listener socket after a delay. This should trigger 916 "connection refused" on some other platforms, including OSX. */ 917 evtimer_assign(&close_listener_event, data->base, close_socket_cb, 918 &fake_listener); 919 event_add(&close_listener_event, &close_timeout); 920 close_listener_event_added = 1; 921 922 event_base_dispatch(data->base); 923 924 tt_int_op(test_ok, ==, 1); 925 926 end: 927 if (fake_listener >= 0) 928 evutil_closesocket(fake_listener); 929 930 if (bev) 931 bufferevent_free(bev); 932 933 if (close_listener_event_added) 934 event_del(&close_listener_event); 935 } 936 937 struct timeout_cb_result { 938 struct timeval read_timeout_at; 939 struct timeval write_timeout_at; 940 struct timeval last_wrote_at; 941 struct timeval last_read_at; 942 int n_read_timeouts; 943 int n_write_timeouts; 944 int total_calls; 945 }; 946 947 static void 948 bev_timeout_read_cb(struct bufferevent *bev, void *arg) 949 { 950 struct timeout_cb_result *res = arg; 951 evutil_gettimeofday(&res->last_read_at, NULL); 952 } 953 static void 954 bev_timeout_write_cb(struct bufferevent *bev, void *arg) 955 { 956 struct timeout_cb_result *res = arg; 957 evutil_gettimeofday(&res->last_wrote_at, NULL); 958 } 959 static void 960 bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg) 961 { 962 struct timeout_cb_result *res = arg; 963 ++res->total_calls; 964 965 if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) 966 == (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) { 967 evutil_gettimeofday(&res->read_timeout_at, NULL); 968 ++res->n_read_timeouts; 969 } 970 if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) 971 == (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) { 972 evutil_gettimeofday(&res->write_timeout_at, NULL); 973 ++res->n_write_timeouts; 974 } 975 } 976 977 static void 978 test_bufferevent_timeouts(void *arg) 979 { 980 /* "arg" is a string containing "pair" and/or "filter". */ 981 struct bufferevent *bev1 = NULL, *bev2 = NULL; 982 struct basic_test_data *data = arg; 983 int use_pair = 0, use_filter = 0; 984 struct timeval tv_w, tv_r, started_at; 985 struct timeout_cb_result res1, res2; 986 987 memset(&res1, 0, sizeof(res1)); 988 memset(&res2, 0, sizeof(res2)); 989 990 if (strstr((char*)data->setup_data, "pair")) 991 use_pair = 1; 992 if (strstr((char*)data->setup_data, "filter")) 993 use_filter = 1; 994 995 if (use_pair) { 996 struct bufferevent *p[2]; 997 tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p)); 998 bev1 = p[0]; 999 bev2 = p[1]; 1000 } else { 1001 bev1 = bufferevent_socket_new(data->base, data->pair[0], 0); 1002 bev2 = bufferevent_socket_new(data->base, data->pair[1], 0); 1003 } 1004 tt_assert(bev1); 1005 tt_assert(bev2); 1006 1007 if (use_filter) { 1008 struct bufferevent *bevf1, *bevf2; 1009 bevf1 = bufferevent_filter_new(bev1, NULL, NULL, 1010 BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 1011 bevf2 = bufferevent_filter_new(bev2, NULL, NULL, 1012 BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 1013 tt_assert(bevf1); 1014 tt_assert(bevf2); 1015 bev1 = bevf1; 1016 bev2 = bevf2; 1017 } 1018 1019 /* Do this nice and early. */ 1020 bufferevent_disable(bev2, EV_READ); 1021 1022 /* bev1 will try to write and read. Both will time out. */ 1023 evutil_gettimeofday(&started_at, NULL); 1024 tv_w.tv_sec = tv_r.tv_sec = 0; 1025 tv_w.tv_usec = 100*1000; 1026 tv_r.tv_usec = 150*1000; 1027 bufferevent_setcb(bev1, bev_timeout_read_cb, bev_timeout_write_cb, 1028 bev_timeout_event_cb, &res1); 1029 bufferevent_set_timeouts(bev1, &tv_r, &tv_w); 1030 bufferevent_write(bev1, "ABCDEFG", 7); 1031 bufferevent_enable(bev1, EV_READ|EV_WRITE); 1032 1033 /* bev2 has nothing to say, and isn't listening. */ 1034 bufferevent_setcb(bev2, bev_timeout_read_cb, bev_timeout_write_cb, 1035 bev_timeout_event_cb, &res2); 1036 tv_w.tv_sec = tv_r.tv_sec = 0; 1037 tv_w.tv_usec = 200*1000; 1038 tv_r.tv_usec = 100*1000; 1039 bufferevent_set_timeouts(bev2, &tv_r, &tv_w); 1040 bufferevent_enable(bev2, EV_WRITE); 1041 1042 tv_r.tv_sec = 0; 1043 tv_r.tv_usec = 350000; 1044 1045 event_base_loopexit(data->base, &tv_r); 1046 event_base_dispatch(data->base); 1047 1048 /* XXXX Test that actually reading or writing a little resets the 1049 * timeouts. */ 1050 1051 tt_want(res1.total_calls == 2); 1052 tt_want(res1.n_read_timeouts == 1); 1053 tt_want(res1.n_write_timeouts == 1); 1054 tt_want(res2.total_calls == !(use_pair && !use_filter)); 1055 tt_want(res2.n_write_timeouts == !(use_pair && !use_filter)); 1056 tt_want(!res2.n_read_timeouts); 1057 1058 test_timeval_diff_eq(&started_at, &res1.read_timeout_at, 150); 1059 test_timeval_diff_eq(&started_at, &res1.write_timeout_at, 100); 1060 1061 #define tt_assert_timeval_empty(tv) do { \ 1062 tt_int_op((tv).tv_sec, ==, 0); \ 1063 tt_int_op((tv).tv_usec, ==, 0); \ 1064 } while(0) 1065 tt_assert_timeval_empty(res1.last_read_at); 1066 tt_assert_timeval_empty(res2.last_read_at); 1067 tt_assert_timeval_empty(res2.last_wrote_at); 1068 tt_assert_timeval_empty(res2.last_wrote_at); 1069 #undef tt_assert_timeval_empty 1070 1071 end: 1072 if (bev1) 1073 bufferevent_free(bev1); 1074 if (bev2) 1075 bufferevent_free(bev2); 1076 } 1077 1078 static void 1079 trigger_failure_cb(evutil_socket_t fd, short what, void *ctx) 1080 { 1081 TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout).")); 1082 } 1083 1084 static void 1085 trigger_eventcb(struct bufferevent *bev, short what, void *ctx) 1086 { 1087 struct event_base *base = ctx; 1088 if (what == ~0) { 1089 TT_BLATHER(("Event successfully triggered.")); 1090 event_base_loopexit(base, NULL); 1091 return; 1092 } 1093 reader_eventcb(bev, what, ctx); 1094 } 1095 1096 static void 1097 trigger_readcb_triggered(struct bufferevent *bev, void *ctx) 1098 { 1099 TT_BLATHER(("Read successfully triggered.")); 1100 n_reads_invoked++; 1101 bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags); 1102 } 1103 1104 static void 1105 trigger_readcb(struct bufferevent *bev, void *ctx) 1106 { 1107 struct timeval timeout = { 30, 0 }; 1108 struct event_base *base = ctx; 1109 size_t low, high, len; 1110 int expected_reads; 1111 1112 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev))); 1113 expected_reads = ++n_reads_invoked; 1114 1115 bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx); 1116 1117 bufferevent_getwatermark(bev, EV_READ, &low, &high); 1118 len = evbuffer_get_length(bufferevent_get_input(bev)); 1119 1120 bufferevent_setwatermark(bev, EV_READ, len + 1, 0); 1121 bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags); 1122 /* no callback expected */ 1123 tt_int_op(n_reads_invoked, ==, expected_reads); 1124 1125 if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) || 1126 (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) { 1127 /* will be deferred */ 1128 } else { 1129 expected_reads++; 1130 } 1131 1132 event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout); 1133 1134 bufferevent_trigger(bev, EV_READ, 1135 bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS); 1136 tt_int_op(n_reads_invoked, ==, expected_reads); 1137 1138 bufferevent_setwatermark(bev, EV_READ, low, high); 1139 end: 1140 ; 1141 } 1142 1143 static void 1144 test_bufferevent_trigger(void *arg) 1145 { 1146 struct basic_test_data *data = arg; 1147 struct evconnlistener *lev=NULL; 1148 struct bufferevent *bev=NULL; 1149 struct sockaddr_in localhost; 1150 struct sockaddr_storage ss; 1151 struct sockaddr *sa; 1152 ev_socklen_t slen; 1153 1154 int be_flags=BEV_OPT_CLOSE_ON_FREE; 1155 int trig_flags=0; 1156 1157 if (strstr((char*)data->setup_data, "defer")) { 1158 be_flags |= BEV_OPT_DEFER_CALLBACKS; 1159 } 1160 bufferevent_connect_test_flags = be_flags; 1161 1162 if (strstr((char*)data->setup_data, "postpone")) { 1163 trig_flags |= BEV_TRIG_DEFER_CALLBACKS; 1164 } 1165 bufferevent_trigger_test_flags = trig_flags; 1166 1167 memset(&localhost, 0, sizeof(localhost)); 1168 1169 localhost.sin_port = 0; /* pick-a-port */ 1170 localhost.sin_addr.s_addr = htonl(0x7f000001L); 1171 localhost.sin_family = AF_INET; 1172 sa = (struct sockaddr *)&localhost; 1173 lev = evconnlistener_new_bind(data->base, listen_cb, data->base, 1174 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, 1175 16, sa, sizeof(localhost)); 1176 tt_assert(lev); 1177 1178 sa = (struct sockaddr *)&ss; 1179 slen = sizeof(ss); 1180 if (regress_get_listener_addr(lev, sa, &slen) < 0) { 1181 tt_abort_perror("getsockname"); 1182 } 1183 1184 tt_assert(!evconnlistener_enable(lev)); 1185 bev = bufferevent_socket_new(data->base, -1, be_flags); 1186 tt_assert(bev); 1187 bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base); 1188 1189 bufferevent_enable(bev, EV_READ); 1190 1191 tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost))); 1192 1193 event_base_dispatch(data->base); 1194 1195 tt_int_op(n_reads_invoked, ==, 2); 1196 end: 1197 if (lev) 1198 evconnlistener_free(lev); 1199 1200 if (bev) 1201 bufferevent_free(bev); 1202 } 1203 1204 static void 1205 test_bufferevent_socket_filter_inactive(void *arg) 1206 { 1207 struct basic_test_data *data = arg; 1208 struct bufferevent *bev = NULL, *bevf = NULL; 1209 1210 bev = bufferevent_socket_new(data->base, -1, 0); 1211 tt_assert(bev); 1212 bevf = bufferevent_filter_new(bev, NULL, NULL, 0, NULL, NULL); 1213 tt_assert(bevf); 1214 1215 end: 1216 if (bevf) 1217 bufferevent_free(bevf); 1218 if (bev) 1219 bufferevent_free(bev); 1220 } 1221 1222 static void 1223 pair_flush_eventcb(struct bufferevent *bev, short what, void *ctx) 1224 { 1225 int *callback_what = ctx; 1226 *callback_what = what; 1227 } 1228 1229 static void 1230 test_bufferevent_pair_flush(void *arg) 1231 { 1232 struct basic_test_data *data = arg; 1233 struct bufferevent *pair[2]; 1234 struct bufferevent *bev1 = NULL; 1235 struct bufferevent *bev2 = NULL; 1236 int callback_what = 0; 1237 1238 tt_assert(0 == bufferevent_pair_new(data->base, 0, pair)); 1239 bev1 = pair[0]; 1240 bev2 = pair[1]; 1241 tt_assert(0 == bufferevent_enable(bev1, EV_WRITE)); 1242 tt_assert(0 == bufferevent_enable(bev2, EV_READ)); 1243 1244 bufferevent_setcb(bev2, NULL, NULL, pair_flush_eventcb, &callback_what); 1245 1246 bufferevent_flush(bev1, EV_WRITE, BEV_FINISHED); 1247 1248 event_base_loop(data->base, EVLOOP_ONCE); 1249 1250 tt_assert(callback_what == (BEV_EVENT_READING | BEV_EVENT_EOF)); 1251 1252 end: 1253 if (bev1) 1254 bufferevent_free(bev1); 1255 if (bev2) 1256 bufferevent_free(bev2); 1257 } 1258 1259 struct bufferevent_filter_data_stuck { 1260 size_t header_size; 1261 size_t total_read; 1262 }; 1263 1264 static void 1265 bufferevent_filter_data_stuck_readcb(struct bufferevent *bev, void *arg) 1266 { 1267 struct bufferevent_filter_data_stuck *filter_data = arg; 1268 struct evbuffer *input = bufferevent_get_input(bev); 1269 size_t read_size = evbuffer_get_length(input); 1270 evbuffer_drain(input, read_size); 1271 filter_data->total_read += read_size; 1272 } 1273 1274 /** 1275 * This filter prepends header once before forwarding data. 1276 */ 1277 static enum bufferevent_filter_result 1278 bufferevent_filter_data_stuck_inputcb( 1279 struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit, 1280 enum bufferevent_flush_mode mode, void *ctx) 1281 { 1282 struct bufferevent_filter_data_stuck *filter_data = ctx; 1283 static int header_inserted = 0; 1284 size_t payload_size; 1285 size_t header_size = 0; 1286 1287 if (!header_inserted) { 1288 char *header = calloc(filter_data->header_size, 1); 1289 evbuffer_add(dst, header, filter_data->header_size); 1290 free(header); 1291 header_size = filter_data->header_size; 1292 header_inserted = 1; 1293 } 1294 1295 payload_size = evbuffer_get_length(src); 1296 if (payload_size > dst_limit - header_size) { 1297 payload_size = dst_limit - header_size; 1298 } 1299 1300 tt_int_op(payload_size, ==, evbuffer_remove_buffer(src, dst, payload_size)); 1301 1302 end: 1303 return BEV_OK; 1304 } 1305 1306 static void 1307 test_bufferevent_filter_data_stuck(void *arg) 1308 { 1309 const size_t read_high_wm = 4096; 1310 struct bufferevent_filter_data_stuck filter_data; 1311 struct basic_test_data *data = arg; 1312 struct bufferevent *pair[2]; 1313 struct bufferevent *filter = NULL; 1314 1315 int options = BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS; 1316 1317 char payload[4096]; 1318 int payload_size = sizeof(payload); 1319 1320 memset(&filter_data, 0, sizeof(filter_data)); 1321 filter_data.header_size = 20; 1322 1323 tt_assert(bufferevent_pair_new(data->base, options, pair) == 0); 1324 1325 bufferevent_setwatermark(pair[0], EV_READ, 0, read_high_wm); 1326 bufferevent_setwatermark(pair[1], EV_READ, 0, read_high_wm); 1327 1328 tt_assert( 1329 filter = 1330 bufferevent_filter_new(pair[1], 1331 bufferevent_filter_data_stuck_inputcb, 1332 NULL, 1333 options, 1334 NULL, 1335 &filter_data)); 1336 1337 bufferevent_setcb(filter, 1338 bufferevent_filter_data_stuck_readcb, 1339 NULL, 1340 NULL, 1341 &filter_data); 1342 1343 tt_assert(bufferevent_enable(filter, EV_READ|EV_WRITE) == 0); 1344 1345 bufferevent_setwatermark(filter, EV_READ, 0, read_high_wm); 1346 1347 tt_assert(bufferevent_write(pair[0], payload, sizeof(payload)) == 0); 1348 1349 event_base_dispatch(data->base); 1350 1351 tt_int_op(filter_data.total_read, ==, payload_size + filter_data.header_size); 1352 end: 1353 if (pair[0]) 1354 bufferevent_free(pair[0]); 1355 if (filter) 1356 bufferevent_free(filter); 1357 } 1358 1359 struct testcase_t bufferevent_testcases[] = { 1360 1361 LEGACY(bufferevent, TT_ISOLATED), 1362 LEGACY(bufferevent_pair, TT_ISOLATED), 1363 LEGACY(bufferevent_flush_normal, TT_ISOLATED), 1364 LEGACY(bufferevent_flush_flush, TT_ISOLATED), 1365 LEGACY(bufferevent_flush_finished, TT_ISOLATED), 1366 LEGACY(bufferevent_pair_flush_normal, TT_ISOLATED), 1367 LEGACY(bufferevent_pair_flush_flush, TT_ISOLATED), 1368 LEGACY(bufferevent_pair_flush_finished, TT_ISOLATED), 1369 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) && !defined(__SANITIZE_ADDRESS__) 1370 { "bufferevent_pair_release_lock", test_bufferevent_pair_release_lock, 1371 TT_FORK|TT_ISOLATED|TT_NEED_THREADS|TT_NEED_BASE|TT_LEGACY|TT_NO_LOGS, 1372 &basic_setup, NULL }, 1373 #endif 1374 LEGACY(bufferevent_watermarks, TT_ISOLATED), 1375 LEGACY(bufferevent_pair_watermarks, TT_ISOLATED), 1376 LEGACY(bufferevent_filters, TT_ISOLATED), 1377 LEGACY(bufferevent_pair_filters, TT_ISOLATED), 1378 LEGACY(bufferevent_filters_disable, TT_ISOLATED), 1379 LEGACY(bufferevent_pair_filters_disable, TT_ISOLATED), 1380 { "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE, 1381 &basic_setup, (void*)"" }, 1382 { "bufferevent_connect_defer", test_bufferevent_connect, 1383 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" }, 1384 { "bufferevent_connect_lock", test_bufferevent_connect, 1385 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" }, 1386 { "bufferevent_connect_lock_defer", test_bufferevent_connect, 1387 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1388 (void*)"defer lock" }, 1389 { "bufferevent_connect_unlocked_cbs", test_bufferevent_connect, 1390 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1391 (void*)"lock defer unlocked" }, 1392 { "bufferevent_connect_fail", test_bufferevent_connect_fail, 1393 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1394 { "bufferevent_timeout", test_bufferevent_timeouts, 1395 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"" }, 1396 { "bufferevent_timeout_pair", test_bufferevent_timeouts, 1397 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" }, 1398 { "bufferevent_timeout_filter", test_bufferevent_timeouts, 1399 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" }, 1400 { "bufferevent_timeout_filter_pair", test_bufferevent_timeouts, 1401 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" }, 1402 { "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE, 1403 &basic_setup, (void*)"" }, 1404 { "bufferevent_trigger_defer", test_bufferevent_trigger, 1405 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" }, 1406 { "bufferevent_trigger_postpone", test_bufferevent_trigger, 1407 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1408 (void*)"postpone" }, 1409 { "bufferevent_trigger_defer_postpone", test_bufferevent_trigger, 1410 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1411 (void*)"defer postpone" }, 1412 #ifdef EVENT__HAVE_LIBZ 1413 LEGACY(bufferevent_zlib, TT_ISOLATED), 1414 #else 1415 { "bufferevent_zlib", NULL, TT_SKIP, NULL, NULL }, 1416 #endif 1417 1418 { "bufferevent_connect_fail_eventcb_defer", 1419 test_bufferevent_connect_fail_eventcb, 1420 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)BEV_OPT_DEFER_CALLBACKS }, 1421 { "bufferevent_connect_fail_eventcb", 1422 test_bufferevent_connect_fail_eventcb, 1423 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1424 1425 { "bufferevent_socket_filter_inactive", 1426 test_bufferevent_socket_filter_inactive, 1427 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1428 { "bufferevent_pair_flush", 1429 test_bufferevent_pair_flush, 1430 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1431 { "bufferevent_filter_data_stuck", 1432 test_bufferevent_filter_data_stuck, 1433 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1434 1435 END_OF_TESTCASES, 1436 }; 1437 1438 #define TT_IOCP (TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP) 1439 #define TT_IOCP_LEGACY (TT_ISOLATED|TT_ENABLE_IOCP) 1440 struct testcase_t bufferevent_iocp_testcases[] = { 1441 LEGACY(bufferevent, TT_IOCP_LEGACY), 1442 LEGACY(bufferevent_flush_normal, TT_ISOLATED), 1443 LEGACY(bufferevent_flush_flush, TT_ISOLATED), 1444 LEGACY(bufferevent_flush_finished, TT_ISOLATED), 1445 LEGACY(bufferevent_watermarks, TT_IOCP_LEGACY), 1446 LEGACY(bufferevent_filters, TT_IOCP_LEGACY), 1447 LEGACY(bufferevent_filters_disable, TT_IOCP_LEGACY), 1448 1449 { "bufferevent_connect", test_bufferevent_connect, 1450 TT_IOCP, &basic_setup, (void*)"" }, 1451 { "bufferevent_connect_defer", test_bufferevent_connect, 1452 TT_IOCP, &basic_setup, (void*)"defer" }, 1453 { "bufferevent_connect_lock", test_bufferevent_connect, 1454 TT_IOCP, &basic_setup, (void*)"lock" }, 1455 { "bufferevent_connect_lock_defer", test_bufferevent_connect, 1456 TT_IOCP, &basic_setup, (void*)"defer lock" }, 1457 { "bufferevent_connect_fail", test_bufferevent_connect_fail, 1458 TT_IOCP, &basic_setup, NULL }, 1459 { "bufferevent_connect_nonblocking", test_bufferevent_connect, 1460 TT_IOCP, &basic_setup, (void*)"unset_connectex" }, 1461 1462 { "bufferevent_connect_fail_eventcb_defer", 1463 test_bufferevent_connect_fail_eventcb, 1464 TT_IOCP, &basic_setup, (void*)BEV_OPT_DEFER_CALLBACKS }, 1465 { "bufferevent_connect_fail_eventcb", 1466 test_bufferevent_connect_fail_eventcb, TT_IOCP, &basic_setup, NULL }, 1467 1468 END_OF_TESTCASES, 1469 }; 1470