1 /* $NetBSD: qmqp-source.c,v 1.1.1.1 2009/06/23 10:08:56 tron Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmqp-source 1 6 /* SUMMARY 7 /* multi-threaded QMQP test generator 8 /* SYNOPSIS 9 /* .fi 10 /* \fBqmqp-source\fR [\fIoptions\fR] [\fBinet:\fR]\fIhost\fR[:\fIport\fR] 11 /* 12 /* \fBqmqp-source\fR [\fIoptions\fR] \fBunix:\fIpathname\fR 13 /* DESCRIPTION 14 /* \fBqmqp-source\fR connects to the named host and TCP port (default 628) 15 /* and sends one or more messages to it, either sequentially 16 /* or in parallel. The program speaks the QMQP protocol. 17 /* Connections can be made to UNIX-domain and IPv4 or IPv6 servers. 18 /* IPv4 and IPv6 are the default. 19 /* 20 /* Note: this is an unsupported test program. No attempt is made 21 /* to maintain compatibility between successive versions. 22 /* 23 /* Arguments: 24 /* .IP \fB-4\fR 25 /* Connect to the server with IPv4. This option has no effect when 26 /* Postfix is built without IPv6 support. 27 /* .IP \fB-6\fR 28 /* Connect to the server with IPv6. This option is not available when 29 /* Postfix is built without IPv6 support. 30 /* .IP \fB-c\fR 31 /* Display a running counter that is incremented each time 32 /* a delivery completes. 33 /* .IP "\fB-C \fIcount\fR" 34 /* When a host sends RESET instead of SYN|ACK, try \fIcount\fR times 35 /* before giving up. The default count is 1. Specify a larger count in 36 /* order to work around a problem with TCP/IP stacks that send RESET 37 /* when the listen queue is full. 38 /* .IP "\fB-f \fIfrom\fR" 39 /* Use the specified sender address (default: <foo@myhostname>). 40 /* .IP "\fB-l \fIlength\fR" 41 /* Send \fIlength\fR bytes as message payload. The length 42 /* includes the message headers. 43 /* .IP "\fB-m \fImessage_count\fR" 44 /* Send the specified number of messages (default: 1). 45 /* .IP "\fB-M \fImyhostname\fR" 46 /* Use the specified hostname or [address] in the default 47 /* sender and recipient addresses, instead of the machine 48 /* hostname. 49 /* .IP "\fB-r \fIrecipient_count\fR" 50 /* Send the specified number of recipients per transaction (default: 1). 51 /* Recipient names are generated by prepending a number to the 52 /* recipient address. 53 /* .IP "\fB-s \fIsession_count\fR" 54 /* Run the specified number of QMQP sessions in parallel (default: 1). 55 /* .IP "\fB-t \fIto\fR" 56 /* Use the specified recipient address (default: <foo@myhostname>). 57 /* .IP "\fB-R \fIinterval\fR" 58 /* Wait for a random period of time 0 <= n <= interval between messages. 59 /* Suspending one thread does not affect other delivery threads. 60 /* .IP \fB-v\fR 61 /* Make the program more verbose, for debugging purposes. 62 /* .IP "\fB-w \fIinterval\fR" 63 /* Wait a fixed time between messages. 64 /* Suspending one thread does not affect other delivery threads. 65 /* SEE ALSO 66 /* qmqp-sink(1), QMQP message dump 67 /* LICENSE 68 /* .ad 69 /* .fi 70 /* The Secure Mailer license must be distributed with this software. 71 /* AUTHOR(S) 72 /* Wietse Venema 73 /* IBM T.J. Watson Research 74 /* P.O. Box 704 75 /* Yorktown Heights, NY 10598, USA 76 /*--*/ 77 78 /* System library. */ 79 80 #include <sys_defs.h> 81 #include <sys/socket.h> 82 #include <sys/wait.h> 83 #include <netinet/in.h> 84 #include <sys/un.h> 85 #include <stdlib.h> 86 #include <unistd.h> 87 #include <signal.h> 88 #include <errno.h> 89 #include <string.h> 90 91 /* Utility library. */ 92 93 #include <msg.h> 94 #include <msg_vstream.h> 95 #include <vstring.h> 96 #include <vstream.h> 97 #include <get_hostname.h> 98 #include <split_at.h> 99 #include <connect.h> 100 #include <mymalloc.h> 101 #include <events.h> 102 #include <iostuff.h> 103 #include <netstring.h> 104 #include <sane_connect.h> 105 #include <host_port.h> 106 #include <myaddrinfo.h> 107 #include <inet_proto.h> 108 #include <valid_hostname.h> 109 #include <valid_mailhost_addr.h> 110 111 /* Global library. */ 112 113 #include <mail_date.h> 114 #include <qmqp_proto.h> 115 #include <mail_version.h> 116 117 /* Application-specific. */ 118 119 /* 120 * Per-session data structure with state. 121 * 122 * This software can maintain multiple parallel connections to the same QMQP 123 * server. However, it makes no more than one connection request at a time 124 * to avoid overwhelming the server with SYN packets and having to back off. 125 * Back-off would screw up the benchmark. Pending connection requests are 126 * kept in a linear list. 127 */ 128 typedef struct SESSION { 129 int xfer_count; /* # of xfers in session */ 130 int rcpt_done; /* # of recipients done */ 131 int rcpt_count; /* # of recipients to go */ 132 VSTREAM *stream; /* open connection */ 133 int connect_count; /* # of connect()s to retry */ 134 struct SESSION *next; /* connect() queue linkage */ 135 } SESSION; 136 137 static SESSION *last_session; /* connect() queue tail */ 138 139 static VSTRING *buffer; 140 static int var_line_limit = 10240; 141 static int var_timeout = 300; 142 static const char *var_myhostname; 143 static int session_count; 144 static int message_count = 1; 145 static struct sockaddr_storage ss; 146 147 #undef sun 148 static struct sockaddr_un sun; 149 static struct sockaddr *sa; 150 static int sa_length; 151 static int recipients = 1; 152 static char *defaddr; 153 static char *recipient; 154 static char *sender; 155 static int message_length = 1024; 156 static int count = 0; 157 static int counter = 0; 158 static int connect_count = 1; 159 static int random_delay = 0; 160 static int fixed_delay = 0; 161 static const char *mydate; 162 static int mypid; 163 164 static void enqueue_connect(SESSION *); 165 static void start_connect(SESSION *); 166 static void connect_done(int, char *); 167 168 static void send_data(SESSION *); 169 static void receive_reply(int, char *); 170 171 static VSTRING *message_buffer; 172 static VSTRING *sender_buffer; 173 static VSTRING *recipient_buffer; 174 175 /* Silly little macros. */ 176 177 #define STR(x) vstring_str(x) 178 #define LEN(x) VSTRING_LEN(x) 179 180 /* random_interval - generate a random value in 0 .. (small) interval */ 181 182 static int random_interval(int interval) 183 { 184 return (rand() % (interval + 1)); 185 } 186 187 /* socket_error - look up and reset the last socket error */ 188 189 static int socket_error(int sock) 190 { 191 int error; 192 SOCKOPT_SIZE error_len; 193 194 /* 195 * Some Solaris 2 versions have getsockopt() itself return the error, 196 * instead of returning it via the parameter list. 197 */ 198 error = 0; 199 error_len = sizeof(error); 200 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &error, &error_len) < 0) 201 return (-1); 202 if (error) { 203 errno = error; 204 return (-1); 205 } 206 207 /* 208 * No problems. 209 */ 210 return (0); 211 } 212 213 /* exception_text - translate exceptions from the netstring module */ 214 215 static char *exception_text(int except) 216 { 217 ; 218 219 switch (except) { 220 case NETSTRING_ERR_EOF: 221 return ("lost connection"); 222 case NETSTRING_ERR_TIME: 223 return ("timeout"); 224 case NETSTRING_ERR_FORMAT: 225 return ("netstring format error"); 226 case NETSTRING_ERR_SIZE: 227 return ("netstring size exceeds limit"); 228 default: 229 msg_panic("exception_text: unknown exception %d", except); 230 } 231 /* NOTREACHED */ 232 } 233 234 /* startup - connect to server but do not wait */ 235 236 static void startup(SESSION *session) 237 { 238 if (message_count-- <= 0) { 239 myfree((char *) session); 240 session_count--; 241 return; 242 } 243 enqueue_connect(session); 244 } 245 246 /* start_event - invoke startup from timer context */ 247 248 static void start_event(int unused_event, char *context) 249 { 250 SESSION *session = (SESSION *) context; 251 252 startup(session); 253 } 254 255 /* start_another - start another session */ 256 257 static void start_another(SESSION *session) 258 { 259 if (random_delay > 0) { 260 event_request_timer(start_event, (char *) session, 261 random_interval(random_delay)); 262 } else if (fixed_delay > 0) { 263 event_request_timer(start_event, (char *) session, fixed_delay); 264 } else { 265 startup(session); 266 } 267 } 268 269 /* enqueue_connect - queue a connection request */ 270 271 static void enqueue_connect(SESSION *session) 272 { 273 session->next = 0; 274 if (last_session == 0) { 275 last_session = session; 276 start_connect(session); 277 } else { 278 last_session->next = session; 279 last_session = session; 280 } 281 } 282 283 /* dequeue_connect - connection request completed */ 284 285 static void dequeue_connect(SESSION *session) 286 { 287 if (session == last_session) { 288 if (session->next != 0) 289 msg_panic("dequeue_connect: queue ends after last"); 290 last_session = 0; 291 } else { 292 if (session->next == 0) 293 msg_panic("dequeue_connect: queue ends before last"); 294 start_connect(session->next); 295 } 296 } 297 298 /* fail_connect - handle failed startup */ 299 300 static void fail_connect(SESSION *session) 301 { 302 if (session->connect_count-- == 1) 303 msg_fatal("connect: %m"); 304 msg_warn("connect: %m"); 305 event_disable_readwrite(vstream_fileno(session->stream)); 306 vstream_fclose(session->stream); 307 session->stream = 0; 308 #ifdef MISSING_USLEEP 309 doze(10); 310 #else 311 usleep(10); 312 #endif 313 start_connect(session); 314 } 315 316 /* start_connect - start TCP handshake */ 317 318 static void start_connect(SESSION *session) 319 { 320 int fd; 321 struct linger linger; 322 323 /* 324 * Some systems don't set the socket error when connect() fails early 325 * (loopback) so we must deal with the error immediately, rather than 326 * retrieving it later with getsockopt(). We can't use MSG_PEEK to 327 * distinguish between server disconnect and connection refused. 328 */ 329 if ((fd = socket(sa->sa_family, SOCK_STREAM, 0)) < 0) 330 msg_fatal("socket: %m"); 331 (void) non_blocking(fd, NON_BLOCKING); 332 linger.l_onoff = 1; 333 linger.l_linger = 0; 334 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *) &linger, 335 sizeof(linger)) < 0) 336 msg_warn("setsockopt SO_LINGER %d: %m", linger.l_linger); 337 session->stream = vstream_fdopen(fd, O_RDWR); 338 event_enable_write(fd, connect_done, (char *) session); 339 netstring_setup(session->stream, var_timeout); 340 if (sane_connect(fd, sa, sa_length) < 0 && errno != EINPROGRESS) 341 fail_connect(session); 342 } 343 344 /* connect_done - send message sender info */ 345 346 static void connect_done(int unused_event, char *context) 347 { 348 SESSION *session = (SESSION *) context; 349 int fd = vstream_fileno(session->stream); 350 351 /* 352 * Try again after some delay when the connection failed, in case they 353 * run a Mickey Mouse protocol stack. 354 */ 355 if (socket_error(fd) < 0) { 356 fail_connect(session); 357 } else { 358 dequeue_connect(session); 359 non_blocking(fd, BLOCKING); 360 event_disable_readwrite(fd); 361 /* Avoid poor performance when TCP MSS > VSTREAM_BUFSIZE. */ 362 if (sa->sa_family == AF_INET 363 #ifdef AF_INET6 364 || sa->sa_family == AF_INET6 365 #endif 366 ) 367 vstream_tweak_tcp(session->stream); 368 send_data(session); 369 } 370 } 371 372 /* send_data - send message+sender+recipients */ 373 374 static void send_data(SESSION *session) 375 { 376 int fd = vstream_fileno(session->stream); 377 int except; 378 379 /* 380 * Prepare for disaster. 381 */ 382 if ((except = vstream_setjmp(session->stream)) != 0) 383 msg_fatal("%s while sending message", exception_text(except)); 384 385 /* 386 * Send the message content, by wrapping three netstrings into an 387 * over-all netstring. 388 * 389 * XXX This should be done more carefully to avoid blocking when sending 390 * large messages over slow networks. 391 */ 392 netstring_put_multi(session->stream, 393 STR(message_buffer), LEN(message_buffer), 394 STR(sender_buffer), LEN(sender_buffer), 395 STR(recipient_buffer), LEN(recipient_buffer), 396 (char *) 0); 397 netstring_fflush(session->stream); 398 399 /* 400 * Wake me up when the server replies or when something bad happens. 401 */ 402 event_enable_read(fd, receive_reply, (char *) session); 403 } 404 405 /* receive_reply - read server reply */ 406 407 static void receive_reply(int unused_event, char *context) 408 { 409 SESSION *session = (SESSION *) context; 410 int except; 411 412 /* 413 * Prepare for disaster. 414 */ 415 if ((except = vstream_setjmp(session->stream)) != 0) 416 msg_fatal("%s while receiving server reply", exception_text(except)); 417 418 /* 419 * Receive and process the server reply. 420 */ 421 netstring_get(session->stream, buffer, var_line_limit); 422 if (msg_verbose) 423 vstream_printf("<< %.*s\n", (int) LEN(buffer), STR(buffer)); 424 if (STR(buffer)[0] != QMQP_STAT_OK) 425 msg_fatal("%s error: %.*s", 426 STR(buffer)[0] == QMQP_STAT_RETRY ? "recoverable" : 427 STR(buffer)[0] == QMQP_STAT_HARD ? "unrecoverable" : 428 "unknown", (int) LEN(buffer) - 1, STR(buffer) + 1); 429 430 /* 431 * Update the optional running counter. 432 */ 433 if (count) { 434 counter++; 435 vstream_printf("%d\r", counter); 436 vstream_fflush(VSTREAM_OUT); 437 } 438 439 /* 440 * Finish this session. QMQP sends only one message per session. 441 */ 442 event_disable_readwrite(vstream_fileno(session->stream)); 443 vstream_fclose(session->stream); 444 session->stream = 0; 445 start_another(session); 446 } 447 448 /* usage - explain */ 449 450 static void usage(char *myname) 451 { 452 msg_fatal("usage: %s -cv -s sess -l msglen -m msgs -C count -M myhostname -f from -t to -R delay -w delay host[:port]", myname); 453 } 454 455 MAIL_VERSION_STAMP_DECLARE; 456 457 /* main - parse JCL and start the machine */ 458 459 int main(int argc, char **argv) 460 { 461 SESSION *session; 462 char *host; 463 char *port; 464 char *path; 465 int path_len; 466 int sessions = 1; 467 int ch; 468 ssize_t len; 469 int n; 470 int i; 471 char *buf; 472 const char *parse_err; 473 struct addrinfo *res; 474 int aierr; 475 const char *protocols = INET_PROTO_NAME_ALL; 476 INET_PROTO_INFO *proto_info; 477 478 /* 479 * Fingerprint executables and core dumps. 480 */ 481 MAIL_VERSION_STAMP_ALLOCATE; 482 483 signal(SIGPIPE, SIG_IGN); 484 msg_vstream_init(argv[0], VSTREAM_ERR); 485 486 /* 487 * Parse JCL. 488 */ 489 while ((ch = GETOPT(argc, argv, "46cC:f:l:m:M:r:R:s:t:vw:")) > 0) { 490 switch (ch) { 491 case '4': 492 protocols = INET_PROTO_NAME_IPV4; 493 break; 494 case '6': 495 protocols = INET_PROTO_NAME_IPV6; 496 break; 497 case 'c': 498 count++; 499 break; 500 case 'C': 501 if ((connect_count = atoi(optarg)) <= 0) 502 usage(argv[0]); 503 break; 504 case 'f': 505 sender = optarg; 506 break; 507 case 'l': 508 if ((message_length = atoi(optarg)) <= 0) 509 usage(argv[0]); 510 break; 511 case 'm': 512 if ((message_count = atoi(optarg)) <= 0) 513 usage(argv[0]); 514 break; 515 case 'M': 516 if (*optarg == '[') { 517 if (!valid_mailhost_literal(optarg, DO_GRIPE)) 518 msg_fatal("bad address literal: %s", optarg); 519 } else { 520 if (!valid_hostname(optarg, DO_GRIPE)) 521 msg_fatal("bad hostname: %s", optarg); 522 } 523 var_myhostname = optarg; 524 break; 525 case 'r': 526 if ((recipients = atoi(optarg)) <= 0) 527 usage(argv[0]); 528 break; 529 case 'R': 530 if (fixed_delay > 0 || (random_delay = atoi(optarg)) <= 0) 531 usage(argv[0]); 532 break; 533 case 's': 534 if ((sessions = atoi(optarg)) <= 0) 535 usage(argv[0]); 536 break; 537 case 't': 538 recipient = optarg; 539 break; 540 case 'v': 541 msg_verbose++; 542 break; 543 case 'w': 544 if (random_delay > 0 || (fixed_delay = atoi(optarg)) <= 0) 545 usage(argv[0]); 546 break; 547 default: 548 usage(argv[0]); 549 } 550 } 551 if (argc - optind != 1) 552 usage(argv[0]); 553 554 if (random_delay > 0) 555 srand(getpid()); 556 557 /* 558 * Translate endpoint address to internal form. 559 */ 560 proto_info = inet_proto_init("protocols", protocols); 561 if (strncmp(argv[optind], "unix:", 5) == 0) { 562 path = argv[optind] + 5; 563 path_len = strlen(path); 564 if (path_len >= (int) sizeof(sun.sun_path)) 565 msg_fatal("unix-domain name too long: %s", path); 566 memset((char *) &sun, 0, sizeof(sun)); 567 sun.sun_family = AF_UNIX; 568 #ifdef HAS_SUN_LEN 569 sun.sun_len = path_len + 1; 570 #endif 571 memcpy(sun.sun_path, path, path_len); 572 sa = (struct sockaddr *) & sun; 573 sa_length = sizeof(sun); 574 } else { 575 if (strncmp(argv[optind], "inet:", 5) == 0) 576 argv[optind] += 5; 577 buf = mystrdup(argv[optind]); 578 if ((parse_err = host_port(buf, &host, (char *) 0, &port, "628")) != 0) 579 msg_fatal("%s: %s", argv[optind], parse_err); 580 if ((aierr = hostname_to_sockaddr(host, port, SOCK_STREAM, &res)) != 0) 581 msg_fatal("%s: %s", argv[optind], MAI_STRERROR(aierr)); 582 myfree(buf); 583 sa = (struct sockaddr *) & ss; 584 if (res->ai_addrlen > sizeof(ss)) 585 msg_fatal("address length %d > buffer length %d", 586 (int) res->ai_addrlen, (int) sizeof(ss)); 587 memcpy((char *) sa, res->ai_addr, res->ai_addrlen); 588 sa_length = res->ai_addrlen; 589 #ifdef HAS_SA_LEN 590 sa->sa_len = sa_length; 591 #endif 592 freeaddrinfo(res); 593 } 594 595 /* 596 * Allocate space for temporary buffer. 597 */ 598 buffer = vstring_alloc(100); 599 600 /* 601 * Make sure we have sender and recipient addresses. 602 */ 603 if (var_myhostname == 0) 604 var_myhostname = get_hostname(); 605 if (sender == 0 || recipient == 0) { 606 vstring_sprintf(buffer, "foo@%s", var_myhostname); 607 defaddr = mystrdup(vstring_str(buffer)); 608 if (sender == 0) 609 sender = defaddr; 610 if (recipient == 0) 611 recipient = defaddr; 612 } 613 614 /* 615 * Prepare some results that may be used multiple times: the message 616 * content netstring, the sender netstring, and the recipient netstrings. 617 */ 618 mydate = mail_date(time((time_t *) 0)); 619 mypid = getpid(); 620 621 message_buffer = vstring_alloc(message_length + 200); 622 vstring_sprintf(buffer, 623 "From: <%s>\nTo: <%s>\nDate: %s\nMessage-Id: <%d@%s>\n\n", 624 sender, recipient, mydate, mypid, var_myhostname); 625 for (n = 1; LEN(buffer) < message_length; n++) { 626 for (i = 0; i < n && i < 79; i++) 627 VSTRING_ADDCH(buffer, 'X'); 628 VSTRING_ADDCH(buffer, '\n'); 629 } 630 STR(buffer)[message_length - 1] = '\n'; 631 netstring_memcpy(message_buffer, STR(buffer), message_length); 632 633 len = strlen(sender); 634 sender_buffer = vstring_alloc(len); 635 netstring_memcpy(sender_buffer, sender, len); 636 637 if (recipients == 1) { 638 len = strlen(recipient); 639 recipient_buffer = vstring_alloc(len); 640 netstring_memcpy(recipient_buffer, recipient, len); 641 } else { 642 recipient_buffer = vstring_alloc(100); 643 for (n = 0; n < recipients; n++) { 644 vstring_sprintf(buffer, "%d%s", n, recipient); 645 netstring_memcat(recipient_buffer, STR(buffer), LEN(buffer)); 646 } 647 } 648 649 /* 650 * Start sessions. 651 */ 652 while (sessions-- > 0) { 653 session = (SESSION *) mymalloc(sizeof(*session)); 654 session->stream = 0; 655 session->xfer_count = 0; 656 session->connect_count = connect_count; 657 session->next = 0; 658 session_count++; 659 startup(session); 660 } 661 for (;;) { 662 event_loop(-1); 663 if (session_count <= 0 && message_count <= 0) { 664 if (count) { 665 VSTREAM_PUTC('\n', VSTREAM_OUT); 666 vstream_fflush(VSTREAM_OUT); 667 } 668 exit(0); 669 } 670 } 671 } 672