1 /* $NetBSD: regress_bufferevent.c,v 1.6 2020/05/25 20:47:34 christos Exp $ */ 2 3 /* 4 * Copyright (c) 2003-2007 Niels Provos <provos@citi.umich.edu> 5 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 #include "util-internal.h" 30 31 /* The old tests here need assertions to work. */ 32 #undef NDEBUG 33 34 #ifdef _WIN32 35 #include <winsock2.h> 36 #include <windows.h> 37 #endif 38 39 #include "event2/event-config.h" 40 41 #include <sys/types.h> 42 #include <sys/stat.h> 43 #ifdef EVENT__HAVE_SYS_TIME_H 44 #include <sys/time.h> 45 #endif 46 #include <sys/queue.h> 47 #ifndef _WIN32 48 #include <sys/socket.h> 49 #include <sys/wait.h> 50 #include <signal.h> 51 #include <unistd.h> 52 #include <netdb.h> 53 #include <netinet/in.h> 54 #endif 55 #include <fcntl.h> 56 #include <signal.h> 57 #include <stdlib.h> 58 #include <stdio.h> 59 #include <string.h> 60 #include <errno.h> 61 #include <assert.h> 62 63 #ifdef EVENT__HAVE_ARPA_INET_H 64 #include <arpa/inet.h> 65 #endif 66 67 #include "event2/event-config.h" 68 #include "event2/event.h" 69 #include "event2/event_struct.h" 70 #include "event2/event_compat.h" 71 #include "event2/tag.h" 72 #include "event2/buffer.h" 73 #include "event2/bufferevent.h" 74 #include "event2/bufferevent_compat.h" 75 #include "event2/bufferevent_struct.h" 76 #include "event2/listener.h" 77 #include "event2/util.h" 78 79 #include "bufferevent-internal.h" 80 #include "evthread-internal.h" 81 #include "util-internal.h" 82 #ifdef _WIN32 83 #include "iocp-internal.h" 84 #endif 85 86 #include "regress.h" 87 #include "regress_testutils.h" 88 89 /* 90 * simple bufferevent test 91 */ 92 93 static void 94 readcb(struct bufferevent *bev, void *arg) 95 { 96 if (evbuffer_get_length(bev->input) == 8333) { 97 struct evbuffer *evbuf = evbuffer_new(); 98 assert(evbuf != NULL); 99 100 /* gratuitous test of bufferevent_read_buffer */ 101 bufferevent_read_buffer(bev, evbuf); 102 103 bufferevent_disable(bev, EV_READ); 104 105 if (evbuffer_get_length(evbuf) == 8333) { 106 test_ok++; 107 } 108 109 evbuffer_free(evbuf); 110 } 111 } 112 113 static void 114 writecb(struct bufferevent *bev, void *arg) 115 { 116 if (evbuffer_get_length(bev->output) == 0) { 117 test_ok++; 118 } 119 } 120 121 static void 122 errorcb(struct bufferevent *bev, short what, void *arg) 123 { 124 test_ok = -2; 125 } 126 127 static void 128 test_bufferevent_impl(int use_pair) 129 { 130 struct bufferevent *bev1 = NULL, *bev2 = NULL; 131 char buffer[8333]; 132 int i; 133 134 if (use_pair) { 135 struct bufferevent *pair[2]; 136 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 137 bev1 = pair[0]; 138 bev2 = pair[1]; 139 bufferevent_setcb(bev1, readcb, writecb, errorcb, bev1); 140 bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL); 141 tt_int_op(bufferevent_getfd(bev1), ==, -1); 142 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL); 143 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, bev2); 144 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, bev1); 145 } else { 146 bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL); 147 bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL); 148 tt_int_op(bufferevent_getfd(bev1), ==, pair[0]); 149 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL); 150 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL); 151 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL); 152 } 153 154 { 155 /* Test getcb. */ 156 bufferevent_data_cb r, w; 157 bufferevent_event_cb e; 158 void *a; 159 bufferevent_getcb(bev1, &r, &w, &e, &a); 160 tt_ptr_op(r, ==, readcb); 161 tt_ptr_op(w, ==, writecb); 162 tt_ptr_op(e, ==, errorcb); 163 tt_ptr_op(a, ==, use_pair ? bev1 : NULL); 164 } 165 166 bufferevent_disable(bev1, EV_READ); 167 bufferevent_enable(bev2, EV_READ); 168 169 tt_int_op(bufferevent_get_enabled(bev1), ==, EV_WRITE); 170 tt_int_op(bufferevent_get_enabled(bev2), ==, EV_WRITE|EV_READ); 171 172 for (i = 0; i < (int)sizeof(buffer); i++) 173 buffer[i] = i; 174 175 bufferevent_write(bev1, buffer, sizeof(buffer)); 176 177 event_dispatch(); 178 179 bufferevent_free(bev2); 180 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL); 181 bufferevent_free(bev1); 182 183 if (test_ok != 2) 184 test_ok = 0; 185 end: 186 ; 187 } 188 189 static void 190 test_bufferevent(void) 191 { 192 test_bufferevent_impl(0); 193 } 194 195 static void 196 test_bufferevent_pair(void) 197 { 198 test_bufferevent_impl(1); 199 } 200 201 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) 202 /** 203 * Trace lock/unlock/alloc/free for locks. 204 * (More heavier then evthread_debug*) 205 */ 206 typedef struct 207 { 208 void *lock; 209 enum { 210 ALLOC, FREE, 211 } status; 212 size_t locked /** allow recursive locking */; 213 } lock_wrapper; 214 struct lock_unlock_base 215 { 216 /* Original callbacks */ 217 struct evthread_lock_callbacks cbs; 218 /* Map of locks */ 219 lock_wrapper *locks; 220 size_t nr_locks; 221 } lu_base = { 222 .locks = NULL, 223 }; 224 225 static lock_wrapper *lu_find(void *lock_) 226 { 227 size_t i; 228 for (i = 0; i < lu_base.nr_locks; ++i) { 229 lock_wrapper *lock = &lu_base.locks[i]; 230 if (lock->lock == lock_) 231 return lock; 232 } 233 return NULL; 234 } 235 236 static void *trace_lock_alloc(unsigned locktype) 237 { 238 ++lu_base.nr_locks; 239 lu_base.locks = realloc(lu_base.locks, 240 sizeof(lock_wrapper) * lu_base.nr_locks); 241 void *lock = lu_base.cbs.alloc(locktype); 242 lu_base.locks[lu_base.nr_locks - 1] = (lock_wrapper){ lock, ALLOC, 0 }; 243 return lock; 244 } 245 static void trace_lock_free(void *lock_, unsigned locktype) 246 { 247 lock_wrapper *lock = lu_find(lock_); 248 if (!lock || lock->status == FREE || lock->locked) { 249 __asm__("int3"); 250 TT_FAIL(("lock: free error")); 251 } else { 252 lock->status = FREE; 253 lu_base.cbs.free(lock_, locktype); 254 } 255 } 256 static int trace_lock_lock(unsigned mode, void *lock_) 257 { 258 lock_wrapper *lock = lu_find(lock_); 259 if (!lock || lock->status == FREE) { 260 TT_FAIL(("lock: lock error")); 261 return -1; 262 } else { 263 ++lock->locked; 264 return lu_base.cbs.lock(mode, lock_); 265 } 266 } 267 static int trace_lock_unlock(unsigned mode, void *lock_) 268 { 269 lock_wrapper *lock = lu_find(lock_); 270 if (!lock || lock->status == FREE || !lock->locked) { 271 TT_FAIL(("lock: unlock error")); 272 return -1; 273 } else { 274 --lock->locked; 275 return lu_base.cbs.unlock(mode, lock_); 276 } 277 } 278 static void lock_unlock_free_thread_cbs() 279 { 280 event_base_free(NULL); 281 282 /** drop immutable flag */ 283 evthread_set_lock_callbacks(NULL); 284 /** avoid calling of event_global_setup_locks_() for new cbs */ 285 libevent_global_shutdown(); 286 /** drop immutable flag for non-debug ops (since called after shutdown) */ 287 evthread_set_lock_callbacks(NULL); 288 } 289 290 static int use_lock_unlock_profiler(void) 291 { 292 struct evthread_lock_callbacks cbs = { 293 EVTHREAD_LOCK_API_VERSION, 294 EVTHREAD_LOCKTYPE_RECURSIVE, 295 trace_lock_alloc, 296 trace_lock_free, 297 trace_lock_lock, 298 trace_lock_unlock, 299 }; 300 memcpy(&lu_base.cbs, evthread_get_lock_callbacks(), 301 sizeof(lu_base.cbs)); 302 { 303 lock_unlock_free_thread_cbs(); 304 305 evthread_set_lock_callbacks(&cbs); 306 /** re-create debug locks correctly */ 307 evthread_enable_lock_debugging(); 308 309 event_init(); 310 } 311 return 0; 312 } 313 static void free_lock_unlock_profiler(struct basic_test_data *data) 314 { 315 lock_unlock_free_thread_cbs(); 316 free(lu_base.locks); 317 data->base = NULL; 318 } 319 320 static void test_bufferevent_pair_release_lock(void *arg) 321 { 322 struct basic_test_data *data = arg; 323 use_lock_unlock_profiler(); 324 { 325 struct bufferevent *pair[2]; 326 if (!bufferevent_pair_new(NULL, BEV_OPT_THREADSAFE, pair)) { 327 bufferevent_free(pair[0]); 328 bufferevent_free(pair[1]); 329 } else 330 tt_abort_perror("bufferevent_pair_new"); 331 } 332 free_lock_unlock_profiler(data); 333 end: 334 ; 335 } 336 #endif 337 338 /* 339 * test watermarks and bufferevent 340 */ 341 342 static void 343 wm_readcb(struct bufferevent *bev, void *arg) 344 { 345 struct evbuffer *evbuf = evbuffer_new(); 346 int len = (int)evbuffer_get_length(bev->input); 347 static int nread; 348 349 assert(len >= 10 && len <= 20); 350 351 assert(evbuf != NULL); 352 353 /* gratuitous test of bufferevent_read_buffer */ 354 bufferevent_read_buffer(bev, evbuf); 355 356 nread += len; 357 if (nread == 65000) { 358 bufferevent_disable(bev, EV_READ); 359 test_ok++; 360 } 361 362 evbuffer_free(evbuf); 363 } 364 365 static void 366 wm_writecb(struct bufferevent *bev, void *arg) 367 { 368 assert(evbuffer_get_length(bev->output) <= 100); 369 if (evbuffer_get_length(bev->output) == 0) { 370 evbuffer_drain(bev->output, evbuffer_get_length(bev->output)); 371 test_ok++; 372 } 373 } 374 375 static void 376 wm_errorcb(struct bufferevent *bev, short what, void *arg) 377 { 378 test_ok = -2; 379 } 380 381 static void 382 test_bufferevent_watermarks_impl(int use_pair) 383 { 384 struct bufferevent *bev1 = NULL, *bev2 = NULL; 385 char buffer[65000]; 386 size_t low, high; 387 int i; 388 test_ok = 0; 389 390 if (use_pair) { 391 struct bufferevent *pair[2]; 392 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 393 bev1 = pair[0]; 394 bev2 = pair[1]; 395 bufferevent_setcb(bev1, NULL, wm_writecb, errorcb, NULL); 396 bufferevent_setcb(bev2, wm_readcb, NULL, errorcb, NULL); 397 } else { 398 bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL); 399 bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL); 400 } 401 tt_assert(bev1); 402 tt_assert(bev2); 403 bufferevent_disable(bev1, EV_READ); 404 bufferevent_enable(bev2, EV_READ); 405 406 /* By default, low watermarks are set to 0 */ 407 bufferevent_getwatermark(bev1, EV_READ, &low, NULL); 408 tt_int_op(low, ==, 0); 409 bufferevent_getwatermark(bev2, EV_WRITE, &low, NULL); 410 tt_int_op(low, ==, 0); 411 412 for (i = 0; i < (int)sizeof(buffer); i++) 413 buffer[i] = (char)i; 414 415 /* limit the reading on the receiving bufferevent */ 416 bufferevent_setwatermark(bev2, EV_READ, 10, 20); 417 418 bufferevent_getwatermark(bev2, EV_READ, &low, &high); 419 tt_int_op(low, ==, 10); 420 tt_int_op(high, ==, 20); 421 422 /* Tell the sending bufferevent not to notify us till it's down to 423 100 bytes. */ 424 bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000); 425 426 bufferevent_getwatermark(bev1, EV_WRITE, &low, &high); 427 tt_int_op(low, ==, 100); 428 tt_int_op(high, ==, 2000); 429 430 { 431 int r = bufferevent_getwatermark(bev1, EV_WRITE | EV_READ, &low, &high); 432 tt_int_op(r, !=, 0); 433 } 434 435 bufferevent_write(bev1, buffer, sizeof(buffer)); 436 437 event_dispatch(); 438 439 tt_int_op(test_ok, ==, 2); 440 441 /* The write callback drained all the data from outbuf, so we 442 * should have removed the write event... */ 443 tt_assert(!event_pending(&bev2->ev_write, EV_WRITE, NULL)); 444 445 end: 446 if (bev1) 447 bufferevent_free(bev1); 448 if (bev2) 449 bufferevent_free(bev2); 450 } 451 452 static void 453 test_bufferevent_watermarks(void) 454 { 455 test_bufferevent_watermarks_impl(0); 456 } 457 458 static void 459 test_bufferevent_pair_watermarks(void) 460 { 461 test_bufferevent_watermarks_impl(1); 462 } 463 464 /* 465 * Test bufferevent filters 466 */ 467 468 /* strip an 'x' from each byte */ 469 470 static enum bufferevent_filter_result 471 bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst, 472 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx) 473 { 474 const unsigned char *buffer; 475 unsigned i; 476 477 buffer = evbuffer_pullup(src, evbuffer_get_length(src)); 478 for (i = 0; i < evbuffer_get_length(src); i += 2) { 479 assert(buffer[i] == 'x'); 480 evbuffer_add(dst, buffer + i + 1, 1); 481 482 if (i + 2 > evbuffer_get_length(src)) 483 break; 484 } 485 486 evbuffer_drain(src, i); 487 return (BEV_OK); 488 } 489 490 /* add an 'x' before each byte */ 491 492 static enum bufferevent_filter_result 493 bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst, 494 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx) 495 { 496 const unsigned char *buffer; 497 unsigned i; 498 499 buffer = evbuffer_pullup(src, evbuffer_get_length(src)); 500 for (i = 0; i < evbuffer_get_length(src); ++i) { 501 evbuffer_add(dst, "x", 1); 502 evbuffer_add(dst, buffer + i, 1); 503 } 504 505 evbuffer_drain(src, evbuffer_get_length(src)); 506 return (BEV_OK); 507 } 508 509 static void 510 test_bufferevent_filters_impl(int use_pair) 511 { 512 struct bufferevent *bev1 = NULL, *bev2 = NULL; 513 struct bufferevent *bev1_base = NULL, *bev2_base = NULL; 514 char buffer[8333]; 515 int i; 516 517 test_ok = 0; 518 519 if (use_pair) { 520 struct bufferevent *pair[2]; 521 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 522 bev1 = pair[0]; 523 bev2 = pair[1]; 524 } else { 525 bev1 = bufferevent_socket_new(NULL, pair[0], 0); 526 bev2 = bufferevent_socket_new(NULL, pair[1], 0); 527 } 528 bev1_base = bev1; 529 bev2_base = bev2; 530 531 for (i = 0; i < (int)sizeof(buffer); i++) 532 buffer[i] = i; 533 534 bev1 = bufferevent_filter_new(bev1, NULL, bufferevent_output_filter, 535 BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 536 537 bev2 = bufferevent_filter_new(bev2, bufferevent_input_filter, 538 NULL, BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 539 bufferevent_setcb(bev1, NULL, writecb, errorcb, NULL); 540 bufferevent_setcb(bev2, readcb, NULL, errorcb, NULL); 541 542 tt_ptr_op(bufferevent_get_underlying(bev1), ==, bev1_base); 543 tt_ptr_op(bufferevent_get_underlying(bev2), ==, bev2_base); 544 tt_int_op(bufferevent_getfd(bev1), ==, -1); 545 tt_int_op(bufferevent_getfd(bev2), ==, -1); 546 547 bufferevent_disable(bev1, EV_READ); 548 bufferevent_enable(bev2, EV_READ); 549 /* insert some filters */ 550 bufferevent_write(bev1, buffer, sizeof(buffer)); 551 552 event_dispatch(); 553 554 if (test_ok != 2) 555 test_ok = 0; 556 557 end: 558 if (bev1) 559 bufferevent_free(bev1); 560 if (bev2) 561 bufferevent_free(bev2); 562 563 } 564 565 static void 566 test_bufferevent_filters(void) 567 { 568 test_bufferevent_filters_impl(0); 569 } 570 571 static void 572 test_bufferevent_pair_filters(void) 573 { 574 test_bufferevent_filters_impl(1); 575 } 576 577 578 static void 579 sender_writecb(struct bufferevent *bev, void *ctx) 580 { 581 if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) { 582 bufferevent_disable(bev,EV_READ|EV_WRITE); 583 TT_BLATHER(("Flushed %d: freeing it.", (int)bufferevent_getfd(bev))); 584 bufferevent_free(bev); 585 } 586 } 587 588 static void 589 sender_errorcb(struct bufferevent *bev, short what, void *ctx) 590 { 591 TT_FAIL(("Got sender error %d",(int)what)); 592 } 593 594 static int bufferevent_connect_test_flags = 0; 595 static int bufferevent_trigger_test_flags = 0; 596 static int n_strings_read = 0; 597 static int n_reads_invoked = 0; 598 599 #define TEST_STR "Now is the time for all good events to signal for " \ 600 "the good of their protocol" 601 static void 602 listen_cb(struct evconnlistener *listener, evutil_socket_t fd, 603 struct sockaddr *sa, int socklen, void *arg) 604 { 605 struct event_base *base = arg; 606 struct bufferevent *bev; 607 const char s[] = TEST_STR; 608 TT_BLATHER(("Got a request on socket %d", (int)fd )); 609 bev = bufferevent_socket_new(base, fd, bufferevent_connect_test_flags); 610 tt_assert(bev); 611 bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL); 612 bufferevent_write(bev, s, sizeof(s)); 613 end: 614 ; 615 } 616 617 static void 618 reader_eventcb(struct bufferevent *bev, short what, void *ctx) 619 { 620 struct event_base *base = ctx; 621 if (what & BEV_EVENT_ERROR) { 622 perror("foobar"); 623 TT_FAIL(("got connector error %d", (int)what)); 624 return; 625 } 626 if (what & BEV_EVENT_CONNECTED) { 627 TT_BLATHER(("connected on %d", (int)bufferevent_getfd(bev))); 628 bufferevent_enable(bev, EV_READ); 629 } 630 if (what & BEV_EVENT_EOF) { 631 char buf[512]; 632 size_t n; 633 n = bufferevent_read(bev, buf, sizeof(buf)-1); 634 tt_int_op(n, >=, 0); 635 buf[n] = '\0'; 636 tt_str_op(buf, ==, TEST_STR); 637 if (++n_strings_read == 2) 638 event_base_loopexit(base, NULL); 639 TT_BLATHER(("EOF on %d: %d strings read.", 640 (int)bufferevent_getfd(bev), n_strings_read)); 641 } 642 end: 643 ; 644 } 645 646 static void 647 reader_readcb(struct bufferevent *bev, void *ctx) 648 { 649 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev))); 650 n_reads_invoked++; 651 } 652 653 static void 654 test_bufferevent_connect(void *arg) 655 { 656 struct basic_test_data *data = arg; 657 struct evconnlistener *lev=NULL; 658 struct bufferevent *bev1=NULL, *bev2=NULL; 659 struct sockaddr_in localhost; 660 struct sockaddr_storage ss; 661 struct sockaddr *sa; 662 ev_socklen_t slen; 663 664 int be_flags=BEV_OPT_CLOSE_ON_FREE; 665 666 if (strstr((char*)data->setup_data, "defer")) { 667 be_flags |= BEV_OPT_DEFER_CALLBACKS; 668 } 669 if (strstr((char*)data->setup_data, "unlocked")) { 670 be_flags |= BEV_OPT_UNLOCK_CALLBACKS; 671 } 672 if (strstr((char*)data->setup_data, "lock")) { 673 be_flags |= BEV_OPT_THREADSAFE; 674 } 675 bufferevent_connect_test_flags = be_flags; 676 #ifdef _WIN32 677 if (!strcmp((char*)data->setup_data, "unset_connectex")) { 678 struct win32_extension_fns *ext = 679 (struct win32_extension_fns *) 680 event_get_win32_extension_fns_(); 681 ext->ConnectEx = NULL; 682 } 683 #endif 684 685 memset(&localhost, 0, sizeof(localhost)); 686 687 localhost.sin_port = 0; /* pick-a-port */ 688 localhost.sin_addr.s_addr = htonl(0x7f000001L); 689 localhost.sin_family = AF_INET; 690 sa = (struct sockaddr *)&localhost; 691 lev = evconnlistener_new_bind(data->base, listen_cb, data->base, 692 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, 693 16, sa, sizeof(localhost)); 694 tt_assert(lev); 695 696 sa = (struct sockaddr *)&ss; 697 slen = sizeof(ss); 698 if (regress_get_listener_addr(lev, sa, &slen) < 0) { 699 tt_abort_perror("getsockname"); 700 } 701 702 tt_assert(!evconnlistener_enable(lev)); 703 bev1 = bufferevent_socket_new(data->base, -1, be_flags); 704 bev2 = bufferevent_socket_new(data->base, -1, be_flags); 705 tt_assert(bev1); 706 tt_assert(bev2); 707 bufferevent_setcb(bev1, reader_readcb,NULL, reader_eventcb, data->base); 708 bufferevent_setcb(bev2, reader_readcb,NULL, reader_eventcb, data->base); 709 710 bufferevent_enable(bev1, EV_READ); 711 bufferevent_enable(bev2, EV_READ); 712 713 tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost))); 714 tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost))); 715 716 event_base_dispatch(data->base); 717 718 tt_int_op(n_strings_read, ==, 2); 719 tt_int_op(n_reads_invoked, >=, 2); 720 end: 721 if (lev) 722 evconnlistener_free(lev); 723 724 if (bev1) 725 bufferevent_free(bev1); 726 727 if (bev2) 728 bufferevent_free(bev2); 729 } 730 731 static void 732 want_fail_eventcb(struct bufferevent *bev, short what, void *ctx) 733 { 734 struct event_base *base = ctx; 735 const char *err; 736 evutil_socket_t s; 737 738 if (what & BEV_EVENT_ERROR) { 739 s = bufferevent_getfd(bev); 740 err = evutil_socket_error_to_string(evutil_socket_geterror(s)); 741 TT_BLATHER(("connection failure on "EV_SOCK_FMT": %s", 742 EV_SOCK_ARG(s), err)); 743 test_ok = 1; 744 } else { 745 TT_FAIL(("didn't fail? what %hd", what)); 746 } 747 748 event_base_loopexit(base, NULL); 749 } 750 751 static void 752 close_socket_cb(evutil_socket_t fd, short what, void *arg) 753 { 754 evutil_socket_t *fdp = arg; 755 if (*fdp >= 0) { 756 evutil_closesocket(*fdp); 757 *fdp = -1; 758 } 759 } 760 761 static void 762 test_bufferevent_connect_fail(void *arg) 763 { 764 struct basic_test_data *data = (struct basic_test_data *)arg; 765 struct bufferevent *bev=NULL; 766 struct sockaddr_in localhost; 767 struct sockaddr *sa = (struct sockaddr*)&localhost; 768 evutil_socket_t fake_listener = -1; 769 ev_socklen_t slen = sizeof(localhost); 770 struct event close_listener_event; 771 int close_listener_event_added = 0; 772 struct timeval one_second = { 1, 0 }; 773 int r; 774 775 test_ok = 0; 776 777 memset(&localhost, 0, sizeof(localhost)); 778 localhost.sin_port = 0; /* have the kernel pick a port */ 779 localhost.sin_addr.s_addr = htonl(0x7f000001L); 780 localhost.sin_family = AF_INET; 781 782 /* bind, but don't listen or accept. should trigger 783 "Connection refused" reliably on most platforms. */ 784 fake_listener = socket(localhost.sin_family, SOCK_STREAM, 0); 785 tt_assert(fake_listener >= 0); 786 tt_assert(bind(fake_listener, sa, slen) == 0); 787 tt_assert(getsockname(fake_listener, sa, &slen) == 0); 788 bev = bufferevent_socket_new(data->base, -1, 789 BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS); 790 tt_assert(bev); 791 bufferevent_setcb(bev, NULL, NULL, want_fail_eventcb, data->base); 792 793 r = bufferevent_socket_connect(bev, sa, slen); 794 /* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells 795 * detects the error immediately, which is not really wrong of it. */ 796 tt_want(r == 0 || r == -1); 797 798 /* Close the listener socket after a second. This should trigger 799 "connection refused" on some other platforms, including OSX. */ 800 evtimer_assign(&close_listener_event, data->base, close_socket_cb, 801 &fake_listener); 802 event_add(&close_listener_event, &one_second); 803 close_listener_event_added = 1; 804 805 event_base_dispatch(data->base); 806 807 tt_int_op(test_ok, ==, 1); 808 809 end: 810 if (fake_listener >= 0) 811 evutil_closesocket(fake_listener); 812 813 if (bev) 814 bufferevent_free(bev); 815 816 if (close_listener_event_added) 817 event_del(&close_listener_event); 818 } 819 820 struct timeout_cb_result { 821 struct timeval read_timeout_at; 822 struct timeval write_timeout_at; 823 struct timeval last_wrote_at; 824 int n_read_timeouts; 825 int n_write_timeouts; 826 int total_calls; 827 }; 828 829 static void 830 bev_timeout_write_cb(struct bufferevent *bev, void *arg) 831 { 832 struct timeout_cb_result *res = arg; 833 evutil_gettimeofday(&res->last_wrote_at, NULL); 834 } 835 836 static void 837 bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg) 838 { 839 struct timeout_cb_result *res = arg; 840 ++res->total_calls; 841 842 if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) 843 == (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) { 844 evutil_gettimeofday(&res->read_timeout_at, NULL); 845 ++res->n_read_timeouts; 846 } 847 if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) 848 == (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) { 849 evutil_gettimeofday(&res->write_timeout_at, NULL); 850 ++res->n_write_timeouts; 851 } 852 } 853 854 static void 855 test_bufferevent_timeouts(void *arg) 856 { 857 /* "arg" is a string containing "pair" and/or "filter". */ 858 struct bufferevent *bev1 = NULL, *bev2 = NULL; 859 struct basic_test_data *data = arg; 860 int use_pair = 0, use_filter = 0; 861 struct timeval tv_w, tv_r, started_at; 862 struct timeout_cb_result res1, res2; 863 char buf[1024]; 864 865 memset(&res1, 0, sizeof(res1)); 866 memset(&res2, 0, sizeof(res2)); 867 868 if (strstr((char*)data->setup_data, "pair")) 869 use_pair = 1; 870 if (strstr((char*)data->setup_data, "filter")) 871 use_filter = 1; 872 873 if (use_pair) { 874 struct bufferevent *p[2]; 875 tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p)); 876 bev1 = p[0]; 877 bev2 = p[1]; 878 } else { 879 bev1 = bufferevent_socket_new(data->base, data->pair[0], 0); 880 bev2 = bufferevent_socket_new(data->base, data->pair[1], 0); 881 } 882 883 tt_assert(bev1); 884 tt_assert(bev2); 885 886 if (use_filter) { 887 struct bufferevent *bevf1, *bevf2; 888 bevf1 = bufferevent_filter_new(bev1, NULL, NULL, 889 BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 890 bevf2 = bufferevent_filter_new(bev2, NULL, NULL, 891 BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 892 tt_assert(bevf1); 893 tt_assert(bevf2); 894 bev1 = bevf1; 895 bev2 = bevf2; 896 } 897 898 /* Do this nice and early. */ 899 bufferevent_disable(bev2, EV_READ); 900 901 /* bev1 will try to write and read. Both will time out. */ 902 evutil_gettimeofday(&started_at, NULL); 903 tv_w.tv_sec = tv_r.tv_sec = 0; 904 tv_w.tv_usec = 100*1000; 905 tv_r.tv_usec = 150*1000; 906 bufferevent_setcb(bev1, NULL, bev_timeout_write_cb, 907 bev_timeout_event_cb, &res1); 908 bufferevent_setwatermark(bev1, EV_WRITE, 1024*1024+10, 0); 909 bufferevent_set_timeouts(bev1, &tv_r, &tv_w); 910 if (use_pair) { 911 /* For a pair, the fact that the other side isn't reading 912 * makes the writer stall */ 913 bufferevent_write(bev1, "ABCDEFG", 7); 914 } else { 915 /* For a real socket, the kernel's TCP buffers can eat a 916 * fair number of bytes; make sure that at some point we 917 * have some bytes that will stall. */ 918 struct evbuffer *output = bufferevent_get_output(bev1); 919 int i; 920 memset(buf, 0xbb, sizeof(buf)); 921 for (i=0;i<1024;++i) { 922 evbuffer_add_reference(output, buf, sizeof(buf), 923 NULL, NULL); 924 } 925 } 926 bufferevent_enable(bev1, EV_READ|EV_WRITE); 927 928 /* bev2 has nothing to say, and isn't listening. */ 929 bufferevent_setcb(bev2, NULL, bev_timeout_write_cb, 930 bev_timeout_event_cb, &res2); 931 tv_w.tv_sec = tv_r.tv_sec = 0; 932 tv_w.tv_usec = 200*1000; 933 tv_r.tv_usec = 100*1000; 934 bufferevent_set_timeouts(bev2, &tv_r, &tv_w); 935 bufferevent_enable(bev2, EV_WRITE); 936 937 tv_r.tv_sec = 0; 938 tv_r.tv_usec = 350000; 939 940 event_base_loopexit(data->base, &tv_r); 941 event_base_dispatch(data->base); 942 943 /* XXXX Test that actually reading or writing a little resets the 944 * timeouts. */ 945 946 /* Each buf1 timeout happens, and happens only once. */ 947 tt_want(res1.n_read_timeouts); 948 tt_want(res1.n_write_timeouts); 949 tt_want(res1.n_read_timeouts == 1); 950 tt_want(res1.n_write_timeouts == 1); 951 952 test_timeval_diff_eq(&started_at, &res1.read_timeout_at, 150); 953 test_timeval_diff_eq(&started_at, &res1.write_timeout_at, 100); 954 955 end: 956 if (bev1) 957 bufferevent_free(bev1); 958 if (bev2) 959 bufferevent_free(bev2); 960 } 961 962 static void 963 trigger_failure_cb(evutil_socket_t fd, short what, void *ctx) 964 { 965 TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout).")); 966 } 967 968 static void 969 trigger_eventcb(struct bufferevent *bev, short what, void *ctx) 970 { 971 struct event_base *base = ctx; 972 if (what == ~0) { 973 TT_BLATHER(("Event successfully triggered.")); 974 event_base_loopexit(base, NULL); 975 return; 976 } 977 reader_eventcb(bev, what, ctx); 978 } 979 980 static void 981 trigger_readcb_triggered(struct bufferevent *bev, void *ctx) 982 { 983 TT_BLATHER(("Read successfully triggered.")); 984 n_reads_invoked++; 985 bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags); 986 } 987 988 static void 989 trigger_readcb(struct bufferevent *bev, void *ctx) 990 { 991 struct timeval timeout = { 30, 0 }; 992 struct event_base *base = ctx; 993 size_t low, high, len; 994 int expected_reads; 995 996 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev))); 997 expected_reads = ++n_reads_invoked; 998 999 bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx); 1000 1001 bufferevent_getwatermark(bev, EV_READ, &low, &high); 1002 len = evbuffer_get_length(bufferevent_get_input(bev)); 1003 1004 bufferevent_setwatermark(bev, EV_READ, len + 1, 0); 1005 bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags); 1006 /* no callback expected */ 1007 tt_int_op(n_reads_invoked, ==, expected_reads); 1008 1009 if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) || 1010 (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) { 1011 /* will be deferred */ 1012 } else { 1013 expected_reads++; 1014 } 1015 1016 event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout); 1017 1018 bufferevent_trigger(bev, EV_READ, 1019 bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS); 1020 tt_int_op(n_reads_invoked, ==, expected_reads); 1021 1022 bufferevent_setwatermark(bev, EV_READ, low, high); 1023 end: 1024 ; 1025 } 1026 1027 static void 1028 test_bufferevent_trigger(void *arg) 1029 { 1030 struct basic_test_data *data = arg; 1031 struct evconnlistener *lev=NULL; 1032 struct bufferevent *bev=NULL; 1033 struct sockaddr_in localhost; 1034 struct sockaddr_storage ss; 1035 struct sockaddr *sa; 1036 ev_socklen_t slen; 1037 1038 int be_flags=BEV_OPT_CLOSE_ON_FREE; 1039 int trig_flags=0; 1040 1041 if (strstr((char*)data->setup_data, "defer")) { 1042 be_flags |= BEV_OPT_DEFER_CALLBACKS; 1043 } 1044 bufferevent_connect_test_flags = be_flags; 1045 1046 if (strstr((char*)data->setup_data, "postpone")) { 1047 trig_flags |= BEV_TRIG_DEFER_CALLBACKS; 1048 } 1049 bufferevent_trigger_test_flags = trig_flags; 1050 1051 memset(&localhost, 0, sizeof(localhost)); 1052 1053 localhost.sin_port = 0; /* pick-a-port */ 1054 localhost.sin_addr.s_addr = htonl(0x7f000001L); 1055 localhost.sin_family = AF_INET; 1056 sa = (struct sockaddr *)&localhost; 1057 lev = evconnlistener_new_bind(data->base, listen_cb, data->base, 1058 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, 1059 16, sa, sizeof(localhost)); 1060 tt_assert(lev); 1061 1062 sa = (struct sockaddr *)&ss; 1063 slen = sizeof(ss); 1064 if (regress_get_listener_addr(lev, sa, &slen) < 0) { 1065 tt_abort_perror("getsockname"); 1066 } 1067 1068 tt_assert(!evconnlistener_enable(lev)); 1069 bev = bufferevent_socket_new(data->base, -1, be_flags); 1070 tt_assert(bev); 1071 bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base); 1072 1073 bufferevent_enable(bev, EV_READ); 1074 1075 tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost))); 1076 1077 event_base_dispatch(data->base); 1078 1079 tt_int_op(n_reads_invoked, ==, 2); 1080 end: 1081 if (lev) 1082 evconnlistener_free(lev); 1083 1084 if (bev) 1085 bufferevent_free(bev); 1086 } 1087 1088 struct testcase_t bufferevent_testcases[] = { 1089 1090 LEGACY(bufferevent, TT_ISOLATED), 1091 LEGACY(bufferevent_pair, TT_ISOLATED), 1092 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) 1093 { "bufferevent_pair_release_lock", test_bufferevent_pair_release_lock, 1094 TT_FORK|TT_ISOLATED|TT_NEED_THREADS|TT_NEED_BASE|TT_LEGACY, 1095 &basic_setup, NULL }, 1096 #endif 1097 LEGACY(bufferevent_watermarks, TT_ISOLATED), 1098 LEGACY(bufferevent_pair_watermarks, TT_ISOLATED), 1099 LEGACY(bufferevent_filters, TT_ISOLATED), 1100 LEGACY(bufferevent_pair_filters, TT_ISOLATED), 1101 { "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE, 1102 &basic_setup, (void*)"" }, 1103 { "bufferevent_connect_defer", test_bufferevent_connect, 1104 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" }, 1105 { "bufferevent_connect_lock", test_bufferevent_connect, 1106 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" }, 1107 { "bufferevent_connect_lock_defer", test_bufferevent_connect, 1108 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1109 (void*)"defer lock" }, 1110 { "bufferevent_connect_unlocked_cbs", test_bufferevent_connect, 1111 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1112 (void*)"lock defer unlocked" }, 1113 { "bufferevent_connect_fail", test_bufferevent_connect_fail, 1114 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1115 { "bufferevent_timeout", test_bufferevent_timeouts, 1116 TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR, &basic_setup, (void*)"" }, 1117 { "bufferevent_timeout_pair", test_bufferevent_timeouts, 1118 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" }, 1119 { "bufferevent_timeout_filter", test_bufferevent_timeouts, 1120 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" }, 1121 { "bufferevent_timeout_filter_pair", test_bufferevent_timeouts, 1122 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" }, 1123 { "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE, 1124 &basic_setup, (void*)"" }, 1125 { "bufferevent_trigger_defer", test_bufferevent_trigger, 1126 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" }, 1127 { "bufferevent_trigger_postpone", test_bufferevent_trigger, 1128 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1129 (void*)"postpone" }, 1130 { "bufferevent_trigger_defer_postpone", test_bufferevent_trigger, 1131 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1132 (void*)"defer postpone" }, 1133 #ifdef EVENT__HAVE_LIBZ 1134 LEGACY(bufferevent_zlib, TT_ISOLATED), 1135 #else 1136 { "bufferevent_zlib", NULL, TT_SKIP, NULL, NULL }, 1137 #endif 1138 1139 END_OF_TESTCASES, 1140 }; 1141 1142 struct testcase_t bufferevent_iocp_testcases[] = { 1143 1144 LEGACY(bufferevent, TT_ISOLATED|TT_ENABLE_IOCP), 1145 LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP), 1146 LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP), 1147 { "bufferevent_connect", test_bufferevent_connect, 1148 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" }, 1149 { "bufferevent_connect_defer", test_bufferevent_connect, 1150 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"defer" }, 1151 { "bufferevent_connect_lock", test_bufferevent_connect, 1152 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup, 1153 (void*)"lock" }, 1154 { "bufferevent_connect_lock_defer", test_bufferevent_connect, 1155 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup, 1156 (void*)"defer lock" }, 1157 { "bufferevent_connect_fail", test_bufferevent_connect_fail, 1158 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL }, 1159 { "bufferevent_connect_nonblocking", test_bufferevent_connect, 1160 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, 1161 (void*)"unset_connectex" }, 1162 1163 END_OF_TESTCASES, 1164 }; 1165