1 /* $NetBSD: qmqp-source.c,v 1.2 2017/02/14 01:16:48 christos Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmqp-source 1 6 /* SUMMARY 7 /* parallelized QMQP test generator 8 /* SYNOPSIS 9 /* .fi 10 /* \fBqmqp-source\fR [\fIoptions\fR] [\fBinet:\fR]\fIhost\fR[:\fIport\fR] 11 /* 12 /* \fBqmqp-source\fR [\fIoptions\fR] \fBunix:\fIpathname\fR 13 /* DESCRIPTION 14 /* \fBqmqp-source\fR connects to the named host and TCP port (default 628) 15 /* and sends one or more messages to it, either sequentially 16 /* or in parallel. The program speaks the QMQP protocol. 17 /* Connections can be made to UNIX-domain and IPv4 or IPv6 servers. 18 /* IPv4 and IPv6 are the default. 19 /* 20 /* Note: this is an unsupported test program. No attempt is made 21 /* to maintain compatibility between successive versions. 22 /* 23 /* Arguments: 24 /* .IP \fB-4\fR 25 /* Connect to the server with IPv4. This option has no effect when 26 /* Postfix is built without IPv6 support. 27 /* .IP \fB-6\fR 28 /* Connect to the server with IPv6. This option is not available when 29 /* Postfix is built without IPv6 support. 30 /* .IP \fB-c\fR 31 /* Display a running counter that is incremented each time 32 /* a delivery completes. 33 /* .IP "\fB-C \fIcount\fR" 34 /* When a host sends RESET instead of SYN|ACK, try \fIcount\fR times 35 /* before giving up. The default count is 1. Specify a larger count in 36 /* order to work around a problem with TCP/IP stacks that send RESET 37 /* when the listen queue is full. 38 /* .IP "\fB-f \fIfrom\fR" 39 /* Use the specified sender address (default: <foo@myhostname>). 40 /* .IP "\fB-l \fIlength\fR" 41 /* Send \fIlength\fR bytes as message payload. The length 42 /* includes the message headers. 43 /* .IP "\fB-m \fImessage_count\fR" 44 /* Send the specified number of messages (default: 1). 45 /* .IP "\fB-M \fImyhostname\fR" 46 /* Use the specified hostname or [address] in the default 47 /* sender and recipient addresses, instead of the machine 48 /* hostname. 49 /* .IP "\fB-r \fIrecipient_count\fR" 50 /* Send the specified number of recipients per transaction (default: 1). 51 /* Recipient names are generated by prepending a number to the 52 /* recipient address. 53 /* .IP "\fB-s \fIsession_count\fR" 54 /* Run the specified number of QMQP sessions in parallel (default: 1). 55 /* .IP "\fB-t \fIto\fR" 56 /* Use the specified recipient address (default: <foo@myhostname>). 57 /* .IP "\fB-R \fIinterval\fR" 58 /* Wait for a random period of time 0 <= n <= interval between messages. 59 /* Suspending one thread does not affect other delivery threads. 60 /* .IP \fB-v\fR 61 /* Make the program more verbose, for debugging purposes. 62 /* .IP "\fB-w \fIinterval\fR" 63 /* Wait a fixed time between messages. 64 /* Suspending one thread does not affect other delivery threads. 65 /* SEE ALSO 66 /* qmqp-sink(1), QMQP message dump 67 /* LICENSE 68 /* .ad 69 /* .fi 70 /* The Secure Mailer license must be distributed with this software. 71 /* AUTHOR(S) 72 /* Wietse Venema 73 /* IBM T.J. Watson Research 74 /* P.O. Box 704 75 /* Yorktown Heights, NY 10598, USA 76 /* 77 /* Wietse Venema 78 /* Google, Inc. 79 /* 111 8th Avenue 80 /* New York, NY 10011, USA 81 /*--*/ 82 83 /* System library. */ 84 85 #include <sys_defs.h> 86 #include <sys/socket.h> 87 #include <sys/wait.h> 88 #include <netinet/in.h> 89 #include <sys/un.h> 90 #include <stdlib.h> 91 #include <unistd.h> 92 #include <signal.h> 93 #include <errno.h> 94 #include <string.h> 95 96 /* Utility library. */ 97 98 #include <msg.h> 99 #include <msg_vstream.h> 100 #include <vstring.h> 101 #include <vstream.h> 102 #include <get_hostname.h> 103 #include <split_at.h> 104 #include <connect.h> 105 #include <mymalloc.h> 106 #include <events.h> 107 #include <iostuff.h> 108 #include <netstring.h> 109 #include <sane_connect.h> 110 #include <host_port.h> 111 #include <myaddrinfo.h> 112 #include <inet_proto.h> 113 #include <valid_hostname.h> 114 #include <valid_mailhost_addr.h> 115 116 /* Global library. */ 117 118 #include <mail_date.h> 119 #include <qmqp_proto.h> 120 #include <mail_version.h> 121 122 /* Application-specific. */ 123 124 /* 125 * Per-session data structure with state. 126 * 127 * This software can maintain multiple parallel connections to the same QMQP 128 * server. However, it makes no more than one connection request at a time 129 * to avoid overwhelming the server with SYN packets and having to back off. 130 * Back-off would screw up the benchmark. Pending connection requests are 131 * kept in a linear list. 132 */ 133 typedef struct SESSION { 134 int xfer_count; /* # of xfers in session */ 135 int rcpt_done; /* # of recipients done */ 136 int rcpt_count; /* # of recipients to go */ 137 VSTREAM *stream; /* open connection */ 138 int connect_count; /* # of connect()s to retry */ 139 struct SESSION *next; /* connect() queue linkage */ 140 } SESSION; 141 142 static SESSION *last_session; /* connect() queue tail */ 143 144 static VSTRING *buffer; 145 static int var_line_limit = 10240; 146 static int var_timeout = 300; 147 static const char *var_myhostname; 148 static int session_count; 149 static int message_count = 1; 150 static struct sockaddr_storage ss; 151 152 #undef sun 153 static struct sockaddr_un sun; 154 static struct sockaddr *sa; 155 static int sa_length; 156 static int recipients = 1; 157 static char *defaddr; 158 static char *recipient; 159 static char *sender; 160 static int message_length = 1024; 161 static int count = 0; 162 static int counter = 0; 163 static int connect_count = 1; 164 static int random_delay = 0; 165 static int fixed_delay = 0; 166 static const char *mydate; 167 static int mypid; 168 169 static void enqueue_connect(SESSION *); 170 static void start_connect(SESSION *); 171 static void connect_done(int, void *); 172 173 static void send_data(SESSION *); 174 static void receive_reply(int, void *); 175 176 static VSTRING *message_buffer; 177 static VSTRING *sender_buffer; 178 static VSTRING *recipient_buffer; 179 180 /* Silly little macros. */ 181 182 #define STR(x) vstring_str(x) 183 #define LEN(x) VSTRING_LEN(x) 184 185 /* random_interval - generate a random value in 0 .. (small) interval */ 186 187 static int random_interval(int interval) 188 { 189 return (rand() % (interval + 1)); 190 } 191 192 /* socket_error - look up and reset the last socket error */ 193 194 static int socket_error(int sock) 195 { 196 int error; 197 SOCKOPT_SIZE error_len; 198 199 /* 200 * Some Solaris 2 versions have getsockopt() itself return the error, 201 * instead of returning it via the parameter list. 202 */ 203 error = 0; 204 error_len = sizeof(error); 205 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (void *) &error, &error_len) < 0) 206 return (-1); 207 if (error) { 208 errno = error; 209 return (-1); 210 } 211 212 /* 213 * No problems. 214 */ 215 return (0); 216 } 217 218 /* exception_text - translate exceptions from the netstring module */ 219 220 static char *exception_text(int except) 221 { 222 ; 223 224 switch (except) { 225 case NETSTRING_ERR_EOF: 226 return ("lost connection"); 227 case NETSTRING_ERR_TIME: 228 return ("timeout"); 229 case NETSTRING_ERR_FORMAT: 230 return ("netstring format error"); 231 case NETSTRING_ERR_SIZE: 232 return ("netstring size exceeds limit"); 233 default: 234 msg_panic("exception_text: unknown exception %d", except); 235 } 236 /* NOTREACHED */ 237 } 238 239 /* startup - connect to server but do not wait */ 240 241 static void startup(SESSION *session) 242 { 243 if (message_count-- <= 0) { 244 myfree((void *) session); 245 session_count--; 246 return; 247 } 248 enqueue_connect(session); 249 } 250 251 /* start_event - invoke startup from timer context */ 252 253 static void start_event(int unused_event, void *context) 254 { 255 SESSION *session = (SESSION *) context; 256 257 startup(session); 258 } 259 260 /* start_another - start another session */ 261 262 static void start_another(SESSION *session) 263 { 264 if (random_delay > 0) { 265 event_request_timer(start_event, (void *) session, 266 random_interval(random_delay)); 267 } else if (fixed_delay > 0) { 268 event_request_timer(start_event, (void *) session, fixed_delay); 269 } else { 270 startup(session); 271 } 272 } 273 274 /* enqueue_connect - queue a connection request */ 275 276 static void enqueue_connect(SESSION *session) 277 { 278 session->next = 0; 279 if (last_session == 0) { 280 last_session = session; 281 start_connect(session); 282 } else { 283 last_session->next = session; 284 last_session = session; 285 } 286 } 287 288 /* dequeue_connect - connection request completed */ 289 290 static void dequeue_connect(SESSION *session) 291 { 292 if (session == last_session) { 293 if (session->next != 0) 294 msg_panic("dequeue_connect: queue ends after last"); 295 last_session = 0; 296 } else { 297 if (session->next == 0) 298 msg_panic("dequeue_connect: queue ends before last"); 299 start_connect(session->next); 300 } 301 } 302 303 /* fail_connect - handle failed startup */ 304 305 static void fail_connect(SESSION *session) 306 { 307 if (session->connect_count-- == 1) 308 msg_fatal("connect: %m"); 309 msg_warn("connect: %m"); 310 event_disable_readwrite(vstream_fileno(session->stream)); 311 vstream_fclose(session->stream); 312 session->stream = 0; 313 #ifdef MISSING_USLEEP 314 doze(10); 315 #else 316 usleep(10); 317 #endif 318 start_connect(session); 319 } 320 321 /* start_connect - start TCP handshake */ 322 323 static void start_connect(SESSION *session) 324 { 325 int fd; 326 struct linger linger; 327 328 /* 329 * Some systems don't set the socket error when connect() fails early 330 * (loopback) so we must deal with the error immediately, rather than 331 * retrieving it later with getsockopt(). We can't use MSG_PEEK to 332 * distinguish between server disconnect and connection refused. 333 */ 334 if ((fd = socket(sa->sa_family, SOCK_STREAM, 0)) < 0) 335 msg_fatal("socket: %m"); 336 (void) non_blocking(fd, NON_BLOCKING); 337 linger.l_onoff = 1; 338 linger.l_linger = 0; 339 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, (void *) &linger, 340 sizeof(linger)) < 0) 341 msg_warn("setsockopt SO_LINGER %d: %m", linger.l_linger); 342 session->stream = vstream_fdopen(fd, O_RDWR); 343 event_enable_write(fd, connect_done, (void *) session); 344 netstring_setup(session->stream, var_timeout); 345 if (sane_connect(fd, sa, sa_length) < 0 && errno != EINPROGRESS) 346 fail_connect(session); 347 } 348 349 /* connect_done - send message sender info */ 350 351 static void connect_done(int unused_event, void *context) 352 { 353 SESSION *session = (SESSION *) context; 354 int fd = vstream_fileno(session->stream); 355 356 /* 357 * Try again after some delay when the connection failed, in case they 358 * run a Mickey Mouse protocol stack. 359 */ 360 if (socket_error(fd) < 0) { 361 fail_connect(session); 362 } else { 363 dequeue_connect(session); 364 non_blocking(fd, BLOCKING); 365 event_disable_readwrite(fd); 366 /* Avoid poor performance when TCP MSS > VSTREAM_BUFSIZE. */ 367 if (sa->sa_family == AF_INET 368 #ifdef AF_INET6 369 || sa->sa_family == AF_INET6 370 #endif 371 ) 372 vstream_tweak_tcp(session->stream); 373 send_data(session); 374 } 375 } 376 377 /* send_data - send message+sender+recipients */ 378 379 static void send_data(SESSION *session) 380 { 381 int fd = vstream_fileno(session->stream); 382 int except; 383 384 /* 385 * Prepare for disaster. 386 */ 387 if ((except = vstream_setjmp(session->stream)) != 0) 388 msg_fatal("%s while sending message", exception_text(except)); 389 390 /* 391 * Send the message content, by wrapping three netstrings into an 392 * over-all netstring. 393 * 394 * XXX This should be done more carefully to avoid blocking when sending 395 * large messages over slow networks. 396 */ 397 netstring_put_multi(session->stream, 398 STR(message_buffer), LEN(message_buffer), 399 STR(sender_buffer), LEN(sender_buffer), 400 STR(recipient_buffer), LEN(recipient_buffer), 401 (char *) 0); 402 netstring_fflush(session->stream); 403 404 /* 405 * Wake me up when the server replies or when something bad happens. 406 */ 407 event_enable_read(fd, receive_reply, (void *) session); 408 } 409 410 /* receive_reply - read server reply */ 411 412 static void receive_reply(int unused_event, void *context) 413 { 414 SESSION *session = (SESSION *) context; 415 int except; 416 417 /* 418 * Prepare for disaster. 419 */ 420 if ((except = vstream_setjmp(session->stream)) != 0) 421 msg_fatal("%s while receiving server reply", exception_text(except)); 422 423 /* 424 * Receive and process the server reply. 425 */ 426 netstring_get(session->stream, buffer, var_line_limit); 427 if (msg_verbose) 428 vstream_printf("<< %.*s\n", (int) LEN(buffer), STR(buffer)); 429 if (STR(buffer)[0] != QMQP_STAT_OK) 430 msg_fatal("%s error: %.*s", 431 STR(buffer)[0] == QMQP_STAT_RETRY ? "recoverable" : 432 STR(buffer)[0] == QMQP_STAT_HARD ? "unrecoverable" : 433 "unknown", (int) LEN(buffer) - 1, STR(buffer) + 1); 434 435 /* 436 * Update the optional running counter. 437 */ 438 if (count) { 439 counter++; 440 vstream_printf("%d\r", counter); 441 vstream_fflush(VSTREAM_OUT); 442 } 443 444 /* 445 * Finish this session. QMQP sends only one message per session. 446 */ 447 event_disable_readwrite(vstream_fileno(session->stream)); 448 vstream_fclose(session->stream); 449 session->stream = 0; 450 start_another(session); 451 } 452 453 /* usage - explain */ 454 455 static void usage(char *myname) 456 { 457 msg_fatal("usage: %s -cv -s sess -l msglen -m msgs -C count -M myhostname -f from -t to -R delay -w delay host[:port]", myname); 458 } 459 460 MAIL_VERSION_STAMP_DECLARE; 461 462 /* main - parse JCL and start the machine */ 463 464 int main(int argc, char **argv) 465 { 466 SESSION *session; 467 char *host; 468 char *port; 469 char *path; 470 int path_len; 471 int sessions = 1; 472 int ch; 473 ssize_t len; 474 int n; 475 int i; 476 char *buf; 477 const char *parse_err; 478 struct addrinfo *res; 479 int aierr; 480 const char *protocols = INET_PROTO_NAME_ALL; 481 482 /* 483 * Fingerprint executables and core dumps. 484 */ 485 MAIL_VERSION_STAMP_ALLOCATE; 486 487 signal(SIGPIPE, SIG_IGN); 488 msg_vstream_init(argv[0], VSTREAM_ERR); 489 490 /* 491 * Parse JCL. 492 */ 493 while ((ch = GETOPT(argc, argv, "46cC:f:l:m:M:r:R:s:t:vw:")) > 0) { 494 switch (ch) { 495 case '4': 496 protocols = INET_PROTO_NAME_IPV4; 497 break; 498 case '6': 499 protocols = INET_PROTO_NAME_IPV6; 500 break; 501 case 'c': 502 count++; 503 break; 504 case 'C': 505 if ((connect_count = atoi(optarg)) <= 0) 506 usage(argv[0]); 507 break; 508 case 'f': 509 sender = optarg; 510 break; 511 case 'l': 512 if ((message_length = atoi(optarg)) <= 0) 513 usage(argv[0]); 514 break; 515 case 'm': 516 if ((message_count = atoi(optarg)) <= 0) 517 usage(argv[0]); 518 break; 519 case 'M': 520 if (*optarg == '[') { 521 if (!valid_mailhost_literal(optarg, DO_GRIPE)) 522 msg_fatal("bad address literal: %s", optarg); 523 } else { 524 if (!valid_hostname(optarg, DO_GRIPE)) 525 msg_fatal("bad hostname: %s", optarg); 526 } 527 var_myhostname = optarg; 528 break; 529 case 'r': 530 if ((recipients = atoi(optarg)) <= 0) 531 usage(argv[0]); 532 break; 533 case 'R': 534 if (fixed_delay > 0 || (random_delay = atoi(optarg)) <= 0) 535 usage(argv[0]); 536 break; 537 case 's': 538 if ((sessions = atoi(optarg)) <= 0) 539 usage(argv[0]); 540 break; 541 case 't': 542 recipient = optarg; 543 break; 544 case 'v': 545 msg_verbose++; 546 break; 547 case 'w': 548 if (random_delay > 0 || (fixed_delay = atoi(optarg)) <= 0) 549 usage(argv[0]); 550 break; 551 default: 552 usage(argv[0]); 553 } 554 } 555 if (argc - optind != 1) 556 usage(argv[0]); 557 558 if (random_delay > 0) 559 srand(getpid()); 560 561 /* 562 * Translate endpoint address to internal form. 563 */ 564 (void) inet_proto_init("protocols", protocols); 565 if (strncmp(argv[optind], "unix:", 5) == 0) { 566 path = argv[optind] + 5; 567 path_len = strlen(path); 568 if (path_len >= (int) sizeof(sun.sun_path)) 569 msg_fatal("unix-domain name too long: %s", path); 570 memset((void *) &sun, 0, sizeof(sun)); 571 sun.sun_family = AF_UNIX; 572 #ifdef HAS_SUN_LEN 573 sun.sun_len = path_len + 1; 574 #endif 575 memcpy(sun.sun_path, path, path_len); 576 sa = (struct sockaddr *) &sun; 577 sa_length = sizeof(sun); 578 } else { 579 if (strncmp(argv[optind], "inet:", 5) == 0) 580 argv[optind] += 5; 581 buf = mystrdup(argv[optind]); 582 if ((parse_err = host_port(buf, &host, (char *) 0, &port, "628")) != 0) 583 msg_fatal("%s: %s", argv[optind], parse_err); 584 if ((aierr = hostname_to_sockaddr(host, port, SOCK_STREAM, &res)) != 0) 585 msg_fatal("%s: %s", argv[optind], MAI_STRERROR(aierr)); 586 myfree(buf); 587 sa = (struct sockaddr *) &ss; 588 if (res->ai_addrlen > sizeof(ss)) 589 msg_fatal("address length %d > buffer length %d", 590 (int) res->ai_addrlen, (int) sizeof(ss)); 591 memcpy((void *) sa, res->ai_addr, res->ai_addrlen); 592 sa_length = res->ai_addrlen; 593 #ifdef HAS_SA_LEN 594 sa->sa_len = sa_length; 595 #endif 596 freeaddrinfo(res); 597 } 598 599 /* 600 * Allocate space for temporary buffer. 601 */ 602 buffer = vstring_alloc(100); 603 604 /* 605 * Make sure we have sender and recipient addresses. 606 */ 607 if (var_myhostname == 0) 608 var_myhostname = get_hostname(); 609 if (sender == 0 || recipient == 0) { 610 vstring_sprintf(buffer, "foo@%s", var_myhostname); 611 defaddr = mystrdup(vstring_str(buffer)); 612 if (sender == 0) 613 sender = defaddr; 614 if (recipient == 0) 615 recipient = defaddr; 616 } 617 618 /* 619 * Prepare some results that may be used multiple times: the message 620 * content netstring, the sender netstring, and the recipient netstrings. 621 */ 622 mydate = mail_date(time((time_t *) 0)); 623 mypid = getpid(); 624 625 message_buffer = vstring_alloc(message_length + 200); 626 vstring_sprintf(buffer, 627 "From: <%s>\nTo: <%s>\nDate: %s\nMessage-Id: <%d@%s>\n\n", 628 sender, recipient, mydate, mypid, var_myhostname); 629 for (n = 1; LEN(buffer) < message_length; n++) { 630 for (i = 0; i < n && i < 79; i++) 631 VSTRING_ADDCH(buffer, 'X'); 632 VSTRING_ADDCH(buffer, '\n'); 633 } 634 STR(buffer)[message_length - 1] = '\n'; 635 netstring_memcpy(message_buffer, STR(buffer), message_length); 636 637 len = strlen(sender); 638 sender_buffer = vstring_alloc(len); 639 netstring_memcpy(sender_buffer, sender, len); 640 641 if (recipients == 1) { 642 len = strlen(recipient); 643 recipient_buffer = vstring_alloc(len); 644 netstring_memcpy(recipient_buffer, recipient, len); 645 } else { 646 recipient_buffer = vstring_alloc(100); 647 for (n = 0; n < recipients; n++) { 648 vstring_sprintf(buffer, "%d%s", n, recipient); 649 netstring_memcat(recipient_buffer, STR(buffer), LEN(buffer)); 650 } 651 } 652 653 /* 654 * Start sessions. 655 */ 656 while (sessions-- > 0) { 657 session = (SESSION *) mymalloc(sizeof(*session)); 658 session->stream = 0; 659 session->xfer_count = 0; 660 session->connect_count = connect_count; 661 session->next = 0; 662 session_count++; 663 startup(session); 664 } 665 for (;;) { 666 event_loop(-1); 667 if (session_count <= 0 && message_count <= 0) { 668 if (count) { 669 VSTREAM_PUTC('\n', VSTREAM_OUT); 670 vstream_fflush(VSTREAM_OUT); 671 } 672 exit(0); 673 } 674 } 675 } 676