1 /* $OpenBSD: tcpthread.c,v 1.3 2025/01/13 12:55:13 bluhm Exp $ */ 2 3 /* 4 * Copyright (c) 2025 Alexander Bluhm <bluhm@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/atomic.h> 21 #include <sys/queue.h> 22 #include <sys/socket.h> 23 #include <sys/sysctl.h> 24 25 #include <netinet/in.h> 26 #include <netinet/tcp.h> 27 #include <netinet/tcp_timer.h> 28 #include <netinet/tcp_var.h> 29 30 #include <err.h> 31 #include <errno.h> 32 #include <pthread.h> 33 #include <signal.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <unistd.h> 38 39 static const struct timespec time_1000ns = { 0, 1000 }; 40 static const struct timeval time_1us = { 0, 1 }; 41 42 union sockaddr_union { 43 struct sockaddr su_sa; 44 struct sockaddr_in su_sin; 45 struct sockaddr_in6 su_sin6; 46 }; 47 48 unsigned int run_time = 10; 49 unsigned int sock_num = 1; 50 unsigned int connect_num = 1, accept_num = 1, send_num = 1, recv_num = 1, 51 close_num = 1, splice_num = 0, unsplice_num = 0, drop_num = 0; 52 int max_percent = 0, idle_percent = 0; 53 volatile unsigned long max_count = 0, idle_count = 0; 54 int *listen_socks, *splice_listen_socks; 55 volatile int *connect_socks, *accept_socks, 56 *splice_accept_socks, *splice_connect_socks; 57 union sockaddr_union *listen_addrs, *splice_listen_addrs; 58 struct tcp_ident_mapping *accept_tims, *splice_accept_tims; 59 struct sockaddr_in sin_loopback; 60 struct sockaddr_in6 sin6_loopback; 61 62 static void __dead 63 usage(void) 64 { 65 fprintf(stderr, 66 "tcpthread [-a accept] [-c connect] [-D drop] [-I idle] [-M max] " 67 "[-n num] [-o close] [-r recv] [-S splice] [-s send] [-t time] " 68 "[-U unsplice]\n" 69 " -a accept threads accepting sockets, default %u\n" 70 " -c connect threads connecting sockets, default %u\n" 71 " -D drop threads dropping TCP connections, default %u\n" 72 " -I idle percent with splice idle time, default %u\n" 73 " -M max percent with splice max lenght, default %d\n" 74 " -n num number of file descriptors, default %d\n" 75 " -o close threads closing sockets, default %u\n" 76 " -r recv threads receiving data, default %u\n" 77 " -S splice threads splicing sockets, default %u\n" 78 " -s send threads sending data, default %u\n" 79 " -t time run time in seconds, default %u\n" 80 " -U unsplice threads running unsplice, default %u\n", 81 accept_num, connect_num, drop_num, idle_percent, max_percent, 82 sock_num, close_num, recv_num, splice_num, send_num, run_time, 83 unsplice_num); 84 exit(2); 85 } 86 87 static volatile int * 88 random_socket(unsigned int *seed) 89 { 90 static volatile int **sockets[] = { 91 &connect_socks, 92 &accept_socks, 93 &splice_accept_socks, 94 &splice_connect_socks, 95 }; 96 volatile int **socksp; 97 98 socksp = sockets[(rand_r(seed) % ((splice_num > 0) ? 4 : 2))]; 99 100 return &(*socksp)[rand_r(seed) % sock_num]; 101 } 102 103 static int 104 connect_socket(volatile int *connectp, struct sockaddr *addr) 105 { 106 int sock; 107 108 if (*connectp != -1) { 109 /* still connected, not closed */ 110 return 0; 111 } 112 /* connect to random listen socket */ 113 sock = socket(addr->sa_family, SOCK_STREAM | SOCK_NONBLOCK, 114 IPPROTO_TCP); 115 if (sock < 0) 116 err(1, "%s: socket", __func__); 117 if (connect(sock, addr, addr->sa_len) < 0 && 118 errno != EINPROGRESS) { 119 err(1, "%s: connect %d", __func__, sock); 120 } 121 if ((int)atomic_cas_uint(connectp, -1, sock) != -1) { 122 /* another thread has connect slot n */ 123 if (close(sock) < 0) 124 err(1, "%s: close %d", __func__, sock); 125 return 0; 126 } 127 return 1; 128 } 129 130 static void * 131 connect_routine(void *arg) 132 { 133 volatile int *run = arg; 134 unsigned long count = 0; 135 struct sockaddr *addr; 136 unsigned int seed; 137 unsigned int n; 138 int connected; 139 140 seed = arc4random(); 141 142 while (*run) { 143 connected = 0; 144 for (n = 0; n < sock_num; n++) { 145 addr = &((splice_num > 0) ? 146 splice_listen_addrs : listen_addrs) 147 [rand_r(&seed) % sock_num].su_sa; 148 if (!connect_socket(&connect_socks[n], addr)) 149 continue; 150 connected = 1; 151 count++; 152 } 153 if (!connected) { 154 /* all sockets were connected, wait a bit */ 155 if (nanosleep(&time_1000ns, NULL) < 0) 156 err(1, "%s: nanosleep", __func__); 157 } 158 } 159 160 return (void *)count; 161 } 162 163 static int 164 accept_socket(volatile int *acceptp, int *listens, 165 struct tcp_ident_mapping *tim, union sockaddr_union *addrs) 166 { 167 unsigned int i; 168 int sock; 169 struct sockaddr *sa; 170 socklen_t len; 171 172 if (*acceptp != -1) { 173 /* still accepted, not closed */ 174 return 0; 175 } 176 sock = -1; 177 for (i = 0; i < sock_num; i++) { 178 sa = (struct sockaddr *)&tim->faddr; 179 len = sizeof(tim->faddr); 180 sock = accept4(listens[i], sa, &len, SOCK_NONBLOCK); 181 if (sock < 0) { 182 if (errno == EWOULDBLOCK) { 183 /* no connection to accept */ 184 continue; 185 } 186 if (errno == ECONNABORTED) { 187 /* accepted socket was disconnected */ 188 continue; 189 } 190 err(1, "%s: accept %d", __func__, listens[i]); 191 } 192 sa = &addrs[i].su_sa; 193 memcpy(&tim->laddr, sa, sa->sa_len); 194 break; 195 } 196 if (sock == -1) { 197 /* all listen sockets block, wait a bit */ 198 if (nanosleep(&time_1000ns, NULL) < 0) 199 err(1, "%s: nanosleep", __func__); 200 return 0; 201 } 202 membar_producer(); 203 if ((int)atomic_cas_uint(acceptp, -1, sock) != -1) { 204 /* another thread has accepted slot n */ 205 if (close(sock) < 0) 206 err(1, "%s: close %d", __func__, sock); 207 return 0; 208 } 209 return 1; 210 } 211 212 static void * 213 accept_routine(void *arg) 214 { 215 volatile int *run = arg; 216 unsigned long count = 0; 217 unsigned int n; 218 int accepted; 219 220 while (*run) { 221 accepted = 0; 222 for (n = 0; n < sock_num; n++) { 223 if (!accept_socket(&accept_socks[n], listen_socks, 224 &accept_tims[n], listen_addrs)) 225 continue; 226 accepted = 1; 227 count++; 228 } 229 if (!accepted) { 230 /* all sockets were accepted, wait a bit */ 231 if (nanosleep(&time_1000ns, NULL) < 0) 232 err(1, "%s: nanosleep", __func__); 233 } 234 } 235 236 return (void *)count; 237 } 238 239 static void * 240 send_routine(void *arg) 241 { 242 volatile int *run = arg; 243 unsigned long count = 0; 244 unsigned int seed; 245 char buf[1024]; /* 1 KB */ 246 volatile int *sockp; 247 int sock; 248 249 seed = arc4random(); 250 251 while (*run) { 252 sockp = random_socket(&seed); 253 sock = *sockp; 254 if (sock == -1) 255 continue; 256 if (send(sock, buf, sizeof(buf), 0) < 0) { 257 if (errno == EWOULDBLOCK) 258 continue; 259 if (errno == EFBIG) 260 atomic_inc_long(&max_count); 261 if (errno == ETIMEDOUT) 262 atomic_inc_long(&idle_count); 263 if ((int)atomic_cas_uint(sockp, sock, -1) != sock) { 264 /* another thread has closed sockp */ 265 continue; 266 } 267 if (close(sock) < 0) 268 err(1, "%s: close %d", __func__, sock); 269 } 270 count++; 271 } 272 273 return (void *)count; 274 } 275 276 static void * 277 recv_routine(void *arg) 278 { 279 volatile int *run = arg; 280 unsigned long count = 0; 281 unsigned int seed; 282 char buf[10*1024]; /* 10 KB */ 283 volatile int *sockp; 284 int sock; 285 286 seed = arc4random(); 287 288 while (*run) { 289 sockp = &((rand_r(&seed) % 2) ? connect_socks : accept_socks) 290 [rand_r(&seed) % sock_num]; 291 sock = *sockp; 292 if (sock == -1) 293 continue; 294 errno = 0; 295 if (recv(sock, buf, sizeof(buf), 0) <= 0) { 296 if (errno == EWOULDBLOCK) 297 continue; 298 if (errno == EFBIG) 299 atomic_inc_long(&max_count); 300 if (errno == ETIMEDOUT) 301 atomic_inc_long(&idle_count); 302 if ((int)atomic_cas_uint(sockp, sock, -1) != sock) { 303 /* another thread has closed sockp */ 304 continue; 305 } 306 if (close(sock) < 0) 307 err(1, "%s: close %d", __func__, sock); 308 } 309 count++; 310 } 311 312 return (void *)count; 313 } 314 315 static void * 316 close_routine(void *arg) 317 { 318 volatile int *run = arg; 319 unsigned long count = 0; 320 unsigned int seed; 321 volatile int *sockp; 322 int sock; 323 324 seed = arc4random(); 325 326 while (*run) { 327 sockp = random_socket(&seed); 328 if (*sockp == -1) 329 continue; 330 sock = atomic_swap_uint(sockp, -1); 331 if (sock == -1) { 332 /* another thead has closed the socket, wait a bit */ 333 if (nanosleep(&time_1000ns, NULL) < 0) 334 err(1, "%s: nanosleep", __func__); 335 continue; 336 } 337 if (close(sock) < 0) 338 err(1, "%s: close %d", __func__, sock); 339 count++; 340 } 341 342 return (void *)count; 343 } 344 345 static void * 346 splice_routine(void *arg) 347 { 348 volatile int *run = arg; 349 unsigned long count = 0; 350 struct sockaddr *addr; 351 unsigned int seed; 352 unsigned int n; 353 int sock; 354 struct splice accept_splice, connect_splice; 355 int spliced; 356 357 seed = arc4random(); 358 359 while (*run) { 360 spliced = 0; 361 for (n = 0; n < sock_num; n++) { 362 if (!accept_socket(&splice_accept_socks[n], 363 splice_listen_socks, 364 &splice_accept_tims[n], splice_listen_addrs)) 365 continue; 366 /* free the matching connect slot */ 367 sock = atomic_swap_uint(&splice_connect_socks[n], -1); 368 if (sock != -1) { 369 if (close(sock) < 0) 370 err(1, "%s: close %d", __func__, sock); 371 } 372 addr = &listen_addrs[rand_r(&seed) % sock_num].su_sa; 373 if (!connect_socket(&splice_connect_socks[n], addr)) { 374 /* close the accepted socket */ 375 sock = atomic_swap_uint( 376 &splice_accept_socks[n], -1); 377 if (sock != -1) { 378 if (close(sock) < 0) { 379 err(1, "%s: close %d", 380 __func__, sock); 381 } 382 } 383 continue; 384 } 385 memset(&accept_splice, 0, sizeof(accept_splice)); 386 memset(&connect_splice, 0, sizeof(connect_splice)); 387 accept_splice.sp_fd = splice_accept_socks[n]; 388 connect_splice.sp_fd = splice_connect_socks[n]; 389 if ((rand_r(&seed) % 100) < max_percent) { 390 accept_splice.sp_max = 1; 391 connect_splice.sp_max = 1; 392 } 393 if ((rand_r(&seed) % 100) < idle_percent) { 394 accept_splice.sp_idle = time_1us; 395 connect_splice.sp_idle = time_1us; 396 } 397 if (accept_splice.sp_fd == -1 || 398 connect_splice.sp_fd == -1 || 399 setsockopt(accept_splice.sp_fd, 400 SOL_SOCKET, SO_SPLICE, 401 &connect_splice, sizeof(connect_splice)) < 0 || 402 setsockopt(connect_splice.sp_fd, 403 SOL_SOCKET, SO_SPLICE, 404 &accept_splice, sizeof(accept_splice)) < 0) { 405 /* close the accepted and connected socket */ 406 sock = atomic_swap_uint( 407 &splice_accept_socks[n], -1); 408 if (sock != -1) { 409 if (close(sock) < 0) { 410 err(1, "%s: close %d", 411 __func__, sock); 412 } 413 } 414 sock = atomic_swap_uint( 415 &splice_connect_socks[n], -1); 416 if (sock != -1) { 417 if (close(sock) < 0) { 418 err(1, "%s: close %d", 419 __func__, sock); 420 } 421 } 422 continue; 423 } 424 spliced = 1; 425 count++; 426 } 427 if (!spliced) { 428 /* splicing for all sockets failed, wait a bit */ 429 if (nanosleep(&time_1000ns, NULL) < 0) 430 err(1, "%s: nanosleep", __func__); 431 } 432 } 433 434 return (void *)count; 435 } 436 437 static void * 438 unsplice_routine(void *arg) 439 { 440 volatile int *run = arg; 441 unsigned long count = 0; 442 unsigned int seed; 443 volatile int *sockp; 444 int sock; 445 446 seed = arc4random(); 447 448 while (*run) { 449 sockp = &((rand_r(&seed) % 2) ? 450 splice_accept_socks : splice_connect_socks) 451 [rand_r(&seed) % sock_num]; 452 sock = *sockp; 453 if (sock == -1) 454 continue; 455 if (setsockopt(sock, SOL_SOCKET, SO_SPLICE, NULL, 0) < 0) { 456 if ((int)atomic_cas_uint(sockp, sock, -1) != sock) { 457 /* another thread has closed sockp */ 458 continue; 459 } 460 if (close(sock) < 0) 461 err(1, "%s: close %d", __func__, sock); 462 } 463 count++; 464 } 465 466 return (void *)count; 467 } 468 469 static void * 470 drop_routine(void *arg) 471 { 472 static const int mib[] = { CTL_NET, PF_INET, IPPROTO_TCP, TCPCTL_DROP }; 473 volatile int *run = arg; 474 unsigned long count = 0; 475 unsigned int seed, n; 476 volatile int *socks; 477 struct tcp_ident_mapping *tims; 478 479 seed = arc4random(); 480 481 while (*run) { 482 if (splice_num > 0 && (rand_r(&seed) % 2)) { 483 socks = splice_accept_socks; 484 tims = splice_accept_tims; 485 } else { 486 socks = accept_socks; 487 tims = accept_tims; 488 } 489 n = rand_r(&seed) % sock_num; 490 if (socks[n] == -1) 491 continue; 492 membar_consumer(); 493 /* accept_tims is not MP safe, but only ESRCH may happen */ 494 if (sysctl(mib, sizeof(mib) / sizeof(mib[0]), NULL, NULL, 495 &tims[n], sizeof(tims[0])) < 0) { 496 if (errno == ESRCH) 497 continue; 498 err(1, "sysctl TCPCTL_DROP"); 499 } 500 count++; 501 } 502 503 return (void *)count; 504 } 505 506 int 507 main(int argc, char *argv[]) 508 { 509 pthread_t *connect_thread, *accept_thread, *send_thread, *recv_thread, 510 *close_thread, *splice_thread, *unsplice_thread, *drop_thread; 511 struct sockaddr *sa; 512 const char *errstr; 513 unsigned int seed; 514 int ch, run; 515 unsigned int n; 516 unsigned long connect_count, accept_count, send_count, recv_count, 517 close_count, splice_count, unsplice_count, drop_count; 518 socklen_t len; 519 520 seed = arc4random(); 521 522 while ((ch = getopt(argc, argv, "a:c:D:I:M:n:o:r:S:s:t:U:")) != -1) { 523 switch (ch) { 524 case 'a': 525 accept_num = strtonum(optarg, 0, UINT_MAX, &errstr); 526 if (errstr != NULL) 527 errx(1, "accept is %s: %s", errstr, optarg); 528 break; 529 case 'c': 530 connect_num = strtonum(optarg, 0, UINT_MAX, &errstr); 531 if (errstr != NULL) 532 errx(1, "connect is %s: %s", errstr, optarg); 533 break; 534 case 'D': 535 drop_num = strtonum(optarg, 0, UINT_MAX, &errstr); 536 if (errstr != NULL) 537 errx(1, "drop is %s: %s", errstr, optarg); 538 break; 539 case 'I': 540 idle_percent = strtonum(optarg, 0, 100, &errstr); 541 if (errstr != NULL) 542 errx(1, "idle is %s: %s", errstr, optarg); 543 break; 544 case 'M': 545 max_percent = strtonum(optarg, 0, 100, &errstr); 546 if (errstr != NULL) 547 errx(1, "max is %s: %s", errstr, optarg); 548 break; 549 case 'n': 550 sock_num = strtonum(optarg, 1, INT_MAX, &errstr); 551 if (errstr != NULL) 552 errx(1, "num is %s: %s", errstr, optarg); 553 break; 554 case 'o': 555 close_num = strtonum(optarg, 0, UINT_MAX, &errstr); 556 if (errstr != NULL) 557 errx(1, "close is %s: %s", errstr, optarg); 558 break; 559 case 'r': 560 recv_num = strtonum(optarg, 0, UINT_MAX, &errstr); 561 if (errstr != NULL) 562 errx(1, "recv is %s: %s", errstr, optarg); 563 break; 564 case 'S': 565 splice_num = strtonum(optarg, 0, UINT_MAX, &errstr); 566 if (errstr != NULL) 567 errx(1, "splice is %s: %s", errstr, optarg); 568 break; 569 case 's': 570 send_num = strtonum(optarg, 0, UINT_MAX, &errstr); 571 if (errstr != NULL) 572 errx(1, "send is %s: %s", errstr, optarg); 573 break; 574 case 't': 575 run_time = strtonum(optarg, 0, UINT_MAX, &errstr); 576 if (errstr != NULL) 577 errx(1, "time is %s: %s", errstr, optarg); 578 break; 579 case 'U': 580 unsplice_num = strtonum(optarg, 0, UINT_MAX, &errstr); 581 if (errstr != NULL) 582 errx(1, "unsplice is %s: %s", errstr, optarg); 583 break; 584 default: 585 usage(); 586 } 587 } 588 argc -= optind; 589 argv += optind; 590 if (argc > 0) 591 usage(); 592 593 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) 594 err(1, "signal"); 595 596 sin_loopback.sin_family = AF_INET; 597 sin_loopback.sin_len = sizeof(sin_loopback); 598 sin_loopback.sin_addr.s_addr = htonl(INADDR_LOOPBACK); 599 600 sin6_loopback.sin6_family = AF_INET6; 601 sin6_loopback.sin6_len = sizeof(sin6_loopback); 602 sin6_loopback.sin6_addr = in6addr_loopback; 603 604 listen_socks = reallocarray(NULL, sock_num, sizeof(int)); 605 if (listen_socks == NULL) 606 err(1, "listen_socks"); 607 connect_socks = reallocarray(NULL, sock_num, sizeof(int)); 608 if (connect_socks == NULL) 609 err(1, "connect_socks"); 610 accept_socks = reallocarray(NULL, sock_num, sizeof(int)); 611 if (accept_socks == NULL) 612 err(1, "accept_socks"); 613 for (n = 0; n < sock_num; n++) 614 listen_socks[n] = connect_socks[n] = accept_socks[n] = -1; 615 listen_addrs = calloc(sock_num, sizeof(listen_addrs[0])); 616 if (listen_addrs == NULL) 617 err(1, "listen_addrs"); 618 accept_tims = calloc(sock_num, sizeof(accept_tims[0])); 619 if (accept_tims == NULL) 620 err(1, "accept_tims"); 621 if (splice_num > 0) { 622 splice_listen_socks = reallocarray(NULL, sock_num, sizeof(int)); 623 if (splice_listen_socks == NULL) 624 err(1, "splice_listen_socks"); 625 splice_accept_socks = reallocarray(NULL, sock_num, sizeof(int)); 626 if (splice_accept_socks == NULL) 627 err(1, "splice_accept_socks"); 628 splice_connect_socks = 629 reallocarray(NULL, sock_num, sizeof(int)); 630 if (splice_connect_socks == NULL) 631 err(1, "splice_connect_socks"); 632 for (n = 0; n < sock_num; n++) { 633 splice_listen_socks[n] = splice_accept_socks[n] = 634 splice_connect_socks[n] = -1; 635 } 636 splice_listen_addrs = calloc(sock_num, 637 sizeof(splice_listen_addrs[0])); 638 if (splice_listen_addrs == NULL) 639 err(1, "splice_listen_addrs"); 640 splice_accept_tims = calloc(sock_num, 641 sizeof(splice_accept_tims[0])); 642 if (splice_accept_tims == NULL) 643 err(1, "splice_accept_tims"); 644 } 645 646 for (n = 0; n < sock_num; n++) { 647 int af; 648 649 af = (rand_r(&seed) % 2) ? AF_INET : AF_INET6; 650 listen_socks[n] = socket(af, SOCK_STREAM | SOCK_NONBLOCK, 651 IPPROTO_TCP); 652 if (listen_socks[n] < 0) 653 err(1, "socket"); 654 if (af == AF_INET) 655 sa = (struct sockaddr *)&sin_loopback; 656 if (af == AF_INET6) 657 sa = (struct sockaddr *)&sin6_loopback; 658 if (bind(listen_socks[n], sa, sa->sa_len) < 0) 659 err(1, "bind"); 660 len = sizeof(listen_addrs[n]); 661 if (getsockname(listen_socks[n], &listen_addrs[n].su_sa, &len) 662 < 0) 663 err(1, "getsockname"); 664 if (listen(listen_socks[n], 128) < 0) 665 err(1, "listen"); 666 667 if (splice_num > 0) { 668 af = (rand_r(&seed) % 2) ? AF_INET : AF_INET6; 669 splice_listen_socks[n] = socket(af, 670 SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP); 671 if (splice_listen_socks[n] < 0) 672 err(1, "socket"); 673 if (af == AF_INET) 674 sa = (struct sockaddr *)&sin_loopback; 675 if (af == AF_INET6) 676 sa = (struct sockaddr *)&sin6_loopback; 677 if (bind(splice_listen_socks[n], sa, sa->sa_len) < 0) 678 err(1, "bind"); 679 len = sizeof(splice_listen_addrs[n]); 680 if (getsockname(splice_listen_socks[n], 681 &splice_listen_addrs[n].su_sa, &len) < 0) 682 err(1, "getsockname"); 683 if (listen(splice_listen_socks[n], 128) < 0) 684 err(1, "listen"); 685 } 686 } 687 688 run = 1; 689 690 connect_thread = calloc(connect_num, sizeof(pthread_t)); 691 if (connect_thread == NULL) 692 err(1, "connect_thread"); 693 for (n = 0; n < connect_num; n++) { 694 errno = pthread_create(&connect_thread[n], NULL, 695 connect_routine, &run); 696 if (errno) 697 err(1, "pthread_create connect %u", n); 698 } 699 700 accept_thread = calloc(accept_num, sizeof(pthread_t)); 701 if (accept_thread == NULL) 702 err(1, "accept_thread"); 703 for (n = 0; n < accept_num; n++) { 704 errno = pthread_create(&accept_thread[n], NULL, 705 accept_routine, &run); 706 if (errno) 707 err(1, "pthread_create accept %u", n); 708 } 709 710 send_thread = calloc(send_num, sizeof(pthread_t)); 711 if (send_thread == NULL) 712 err(1, "send_thread"); 713 for (n = 0; n < send_num; n++) { 714 errno = pthread_create(&send_thread[n], NULL, 715 send_routine, &run); 716 if (errno) 717 err(1, "pthread_create send %u", n); 718 } 719 720 recv_thread = calloc(recv_num, sizeof(pthread_t)); 721 if (recv_thread == NULL) 722 err(1, "recv_thread"); 723 for (n = 0; n < recv_num; n++) { 724 errno = pthread_create(&recv_thread[n], NULL, 725 recv_routine, &run); 726 if (errno) 727 err(1, "pthread_create recv %u", n); 728 } 729 730 close_thread = calloc(close_num, sizeof(pthread_t)); 731 if (close_thread == NULL) 732 err(1, "close_thread"); 733 for (n = 0; n < close_num; n++) { 734 errno = pthread_create(&close_thread[n], NULL, 735 close_routine, &run); 736 if (errno) 737 err(1, "pthread_create close %u", n); 738 } 739 740 if (splice_num > 0) { 741 splice_thread = calloc(splice_num, sizeof(pthread_t)); 742 if (splice_thread == NULL) 743 err(1, "splice_thread"); 744 for (n = 0; n < splice_num; n++) { 745 errno = pthread_create(&splice_thread[n], NULL, 746 splice_routine, &run); 747 if (errno) 748 err(1, "pthread_create splice %u", n); 749 } 750 751 unsplice_thread = calloc(unsplice_num, sizeof(pthread_t)); 752 if (unsplice_thread == NULL) 753 err(1, "unsplice_thread"); 754 for (n = 0; n < unsplice_num; n++) { 755 errno = pthread_create(&unsplice_thread[n], NULL, 756 unsplice_routine, &run); 757 if (errno) 758 err(1, "pthread_create unsplice %u", n); 759 } 760 } 761 drop_thread = calloc(drop_num, sizeof(pthread_t)); 762 if (drop_thread == NULL) 763 err(1, "drop_thread"); 764 for (n = 0; n < drop_num; n++) { 765 errno = pthread_create(&drop_thread[n], NULL, 766 drop_routine, &run); 767 if (errno) 768 err(1, "pthread_create drop %u", n); 769 } 770 771 if (run_time > 0) { 772 if (sleep(run_time) < 0) 773 err(1, "sleep %u", run_time); 774 } 775 run = 0; 776 777 connect_count = 0; 778 for (n = 0; n < connect_num; n++) { 779 unsigned long count; 780 781 errno = pthread_join(connect_thread[n], (void **)&count); 782 if (errno) 783 err(1, "pthread_join connect %u", n); 784 connect_count += count; 785 } 786 free(connect_thread); 787 788 accept_count = 0; 789 for (n = 0; n < accept_num; n++) { 790 unsigned long count; 791 792 errno = pthread_join(accept_thread[n], (void **)&count); 793 if (errno) 794 err(1, "pthread_join accept %u", n); 795 accept_count += count; 796 } 797 free(accept_thread); 798 799 send_count = 0; 800 for (n = 0; n < send_num; n++) { 801 unsigned long count; 802 803 errno = pthread_join(send_thread[n], (void **)&count); 804 if (errno) 805 err(1, "pthread_join send %u", n); 806 send_count += count; 807 } 808 free(send_thread); 809 810 recv_count = 0; 811 for (n = 0; n < recv_num; n++) { 812 unsigned long count; 813 814 errno = pthread_join(recv_thread[n], (void **)&count); 815 if (errno) 816 err(1, "pthread_join recv %u", n); 817 recv_count += count; 818 } 819 free(recv_thread); 820 821 close_count = 0; 822 for (n = 0; n < close_num; n++) { 823 unsigned long count; 824 825 errno = pthread_join(close_thread[n], (void **)&count); 826 if (errno) 827 err(1, "pthread_join close %u", n); 828 close_count += count; 829 } 830 free(close_thread); 831 832 if (splice_num > 0) { 833 splice_count = 0; 834 for (n = 0; n < splice_num; n++) { 835 unsigned long count; 836 837 errno = pthread_join(splice_thread[n], (void **)&count); 838 if (errno) 839 err(1, "pthread_join splice %u", n); 840 splice_count += count; 841 } 842 free(splice_thread); 843 844 unsplice_count = 0; 845 for (n = 0; n < unsplice_num; n++) { 846 unsigned long count; 847 848 errno = pthread_join(unsplice_thread[n], 849 (void **)&count); 850 if (errno) 851 err(1, "pthread_join unsplice %u", n); 852 unsplice_count += count; 853 } 854 free(unsplice_thread); 855 } 856 drop_count = 0; 857 for (n = 0; n < drop_num; n++) { 858 unsigned long count; 859 860 errno = pthread_join(drop_thread[n], (void **)&count); 861 if (errno) 862 err(1, "pthread_join drop %u", n); 863 drop_count += count; 864 } 865 free(drop_thread); 866 867 free((int *)listen_socks); 868 free((int *)connect_socks); 869 free((int *)accept_socks); 870 free(listen_addrs); 871 free(accept_tims); 872 if (splice_num > 0) { 873 free((int *)splice_listen_socks); 874 free((int *)splice_accept_socks); 875 free((int *)splice_connect_socks); 876 free(splice_listen_addrs); 877 free(splice_accept_tims); 878 } 879 880 printf("count: connect %lu, ", connect_count); 881 if (splice_num > 0) { 882 printf("splice %lu, unsplice %lu, max %lu, idle %lu, ", 883 splice_count, unsplice_count, max_count, idle_count); 884 } 885 printf("accept %lu, send %lu, recv %lu, close %lu, drop %lu\n", 886 accept_count, send_count, recv_count, close_count, drop_count); 887 888 return 0; 889 } 890