1 /* $NetBSD: qmqp-sink.c,v 1.2 2017/02/14 01:16:48 christos Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmqp-sink 1 6 /* SUMMARY 7 /* parallelized QMQP test server 8 /* SYNOPSIS 9 /* .fi 10 /* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR] 11 /* [\fBinet:\fR][\fIhost\fR]:\fIport\fR \fIbacklog\fR 12 /* 13 /* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR] 14 /* \fBunix:\fR\fIpathname\fR \fIbacklog\fR 15 /* DESCRIPTION 16 /* \fBqmqp-sink\fR listens on the named host (or address) and port. 17 /* It receives messages from the network and throws them away. 18 /* The purpose is to measure QMQP client performance, not protocol 19 /* compliance. 20 /* Connections can be accepted on IPv4 or IPv6 endpoints, or on 21 /* UNIX-domain sockets. 22 /* IPv4 and IPv6 are the default. 23 /* This program is the complement of the \fBqmqp-source\fR(1) program. 24 /* 25 /* Note: this is an unsupported test program. No attempt is made 26 /* to maintain compatibility between successive versions. 27 /* 28 /* Arguments: 29 /* .IP \fB-4\fR 30 /* Support IPv4 only. This option has no effect when 31 /* Postfix is built without IPv6 support. 32 /* .IP \fB-6\fR 33 /* Support IPv6 only. This option is not available when 34 /* Postfix is built without IPv6 support. 35 /* .IP \fB-c\fR 36 /* Display a running counter that is updated whenever a delivery 37 /* is completed. 38 /* .IP \fB-v\fR 39 /* Increase verbosity. Specify \fB-v -v\fR to see some of the QMQP 40 /* conversation. 41 /* .IP "\fB-x \fItime\fR" 42 /* Terminate after \fItime\fR seconds. This is to facilitate memory 43 /* leak testing. 44 /* SEE ALSO 45 /* qmqp-source(1), QMQP message generator 46 /* LICENSE 47 /* .ad 48 /* .fi 49 /* The Secure Mailer license must be distributed with this software. 50 /* AUTHOR(S) 51 /* Wietse Venema 52 /* IBM T.J. Watson Research 53 /* P.O. Box 704 54 /* Yorktown Heights, NY 10598, USA 55 /* 56 /* Wietse Venema 57 /* Google, Inc. 58 /* 111 8th Avenue 59 /* New York, NY 10011, USA 60 /*--*/ 61 62 /* System library. */ 63 64 #include <sys_defs.h> 65 #include <sys/socket.h> 66 #include <sys/wait.h> 67 #include <unistd.h> 68 #include <string.h> 69 #include <stdlib.h> 70 #include <fcntl.h> 71 #include <signal.h> 72 73 /* Utility library. */ 74 75 #include <msg.h> 76 #include <vstring.h> 77 #include <vstream.h> 78 #include <listen.h> 79 #include <events.h> 80 #include <mymalloc.h> 81 #include <iostuff.h> 82 #include <msg_vstream.h> 83 #include <netstring.h> 84 #include <inet_proto.h> 85 86 /* Global library. */ 87 88 #include <qmqp_proto.h> 89 #include <mail_version.h> 90 91 /* Application-specific. */ 92 93 typedef struct { 94 VSTREAM *stream; /* client connection */ 95 int count; /* bytes to go */ 96 } SINK_STATE; 97 98 static int var_tmout; 99 static VSTRING *buffer; 100 static void disconnect(SINK_STATE *); 101 static int count_deliveries; 102 static int counter; 103 104 /* send_reply - finish conversation */ 105 106 static void send_reply(SINK_STATE *state) 107 { 108 vstring_sprintf(buffer, "%cOk", QMQP_STAT_OK); 109 NETSTRING_PUT_BUF(state->stream, buffer); 110 netstring_fflush(state->stream); 111 if (count_deliveries) { 112 counter++; 113 vstream_printf("%d\r", counter); 114 vstream_fflush(VSTREAM_OUT); 115 } 116 disconnect(state); 117 } 118 119 /* read_data - read over-all netstring data */ 120 121 static void read_data(int unused_event, void *context) 122 { 123 SINK_STATE *state = (SINK_STATE *) context; 124 int fd = vstream_fileno(state->stream); 125 int count; 126 127 /* 128 * Refill the VSTREAM buffer, if necessary. 129 */ 130 if (VSTREAM_GETC(state->stream) == VSTREAM_EOF) 131 netstring_except(state->stream, vstream_ftimeout(state->stream) ? 132 NETSTRING_ERR_TIME : NETSTRING_ERR_EOF); 133 state->count--; 134 135 /* 136 * Flush the VSTREAM buffer. As documented, vstream_fseek() discards 137 * unread input. 138 */ 139 if ((count = vstream_peek(state->stream)) > 0) { 140 state->count -= count; 141 if (state->count <= 0) { 142 send_reply(state); 143 return; 144 } 145 vstream_fpurge(state->stream, VSTREAM_PURGE_BOTH); 146 } 147 148 /* 149 * Do not block while waiting for the arrival of more data. 150 */ 151 event_disable_readwrite(fd); 152 event_enable_read(fd, read_data, context); 153 } 154 155 /* read_length - read over-all netstring length */ 156 157 static void read_length(int event, void *context) 158 { 159 SINK_STATE *state = (SINK_STATE *) context; 160 161 switch (vstream_setjmp(state->stream)) { 162 163 default: 164 msg_panic("unknown error reading input"); 165 166 case NETSTRING_ERR_TIME: 167 msg_panic("attempt to read non-readable socket"); 168 /* NOTREACHED */ 169 170 case NETSTRING_ERR_EOF: 171 msg_warn("lost connection"); 172 disconnect(state); 173 return; 174 175 case NETSTRING_ERR_FORMAT: 176 msg_warn("netstring format error"); 177 disconnect(state); 178 return; 179 180 case NETSTRING_ERR_SIZE: 181 msg_warn("netstring size error"); 182 disconnect(state); 183 return; 184 185 /* 186 * Include the netstring terminator in the read byte count. This 187 * violates abstractions. 188 */ 189 case 0: 190 state->count = netstring_get_length(state->stream) + 1; 191 read_data(event, context); 192 return; 193 } 194 } 195 196 /* disconnect - handle disconnection events */ 197 198 static void disconnect(SINK_STATE *state) 199 { 200 event_disable_readwrite(vstream_fileno(state->stream)); 201 vstream_fclose(state->stream); 202 myfree((void *) state); 203 } 204 205 /* connect_event - handle connection events */ 206 207 static void connect_event(int unused_event, void *context) 208 { 209 int sock = CAST_ANY_PTR_TO_INT(context); 210 struct sockaddr_storage ss; 211 SOCKADDR_SIZE len = sizeof(ss); 212 struct sockaddr *sa = (struct sockaddr *) &ss; 213 SINK_STATE *state; 214 int fd; 215 216 if ((fd = accept(sock, sa, &len)) >= 0) { 217 if (msg_verbose) 218 msg_info("connect (%s)", 219 #ifdef AF_LOCAL 220 sa->sa_family == AF_LOCAL ? "AF_LOCAL" : 221 #else 222 sa->sa_family == AF_UNIX ? "AF_UNIX" : 223 #endif 224 sa->sa_family == AF_INET ? "AF_INET" : 225 #ifdef AF_INET6 226 sa->sa_family == AF_INET6 ? "AF_INET6" : 227 #endif 228 "unknown protocol family"); 229 non_blocking(fd, NON_BLOCKING); 230 state = (SINK_STATE *) mymalloc(sizeof(*state)); 231 state->stream = vstream_fdopen(fd, O_RDWR); 232 vstream_tweak_sock(state->stream); 233 netstring_setup(state->stream, var_tmout); 234 event_enable_read(fd, read_length, (void *) state); 235 } 236 } 237 238 /* terminate - voluntary exit */ 239 240 static void terminate(int unused_event, void *unused_context) 241 { 242 exit(0); 243 } 244 245 /* usage - explain */ 246 247 static void usage(char *myname) 248 { 249 msg_fatal("usage: %s [-cv] [-x time] [host]:port backlog", myname); 250 } 251 252 MAIL_VERSION_STAMP_DECLARE; 253 254 int main(int argc, char **argv) 255 { 256 int sock; 257 int backlog; 258 int ch; 259 int ttl; 260 const char *protocols = INET_PROTO_NAME_ALL; 261 262 /* 263 * Fingerprint executables and core dumps. 264 */ 265 MAIL_VERSION_STAMP_ALLOCATE; 266 267 /* 268 * Fix 20051207. 269 */ 270 signal(SIGPIPE, SIG_IGN); 271 272 /* 273 * Initialize diagnostics. 274 */ 275 msg_vstream_init(argv[0], VSTREAM_ERR); 276 277 /* 278 * Parse JCL. 279 */ 280 while ((ch = GETOPT(argc, argv, "46cvx:")) > 0) { 281 switch (ch) { 282 case '4': 283 protocols = INET_PROTO_NAME_IPV4; 284 break; 285 case '6': 286 protocols = INET_PROTO_NAME_IPV6; 287 break; 288 case 'c': 289 count_deliveries++; 290 break; 291 case 'v': 292 msg_verbose++; 293 break; 294 case 'x': 295 if ((ttl = atoi(optarg)) <= 0) 296 usage(argv[0]); 297 event_request_timer(terminate, (void *) 0, ttl); 298 break; 299 default: 300 usage(argv[0]); 301 } 302 } 303 if (argc - optind != 2) 304 usage(argv[0]); 305 if ((backlog = atoi(argv[optind + 1])) <= 0) 306 usage(argv[0]); 307 308 /* 309 * Initialize. 310 */ 311 (void) inet_proto_init("protocols", protocols); 312 buffer = vstring_alloc(1024); 313 if (strncmp(argv[optind], "unix:", 5) == 0) { 314 sock = unix_listen(argv[optind] + 5, backlog, BLOCKING); 315 } else { 316 if (strncmp(argv[optind], "inet:", 5) == 0) 317 argv[optind] += 5; 318 sock = inet_listen(argv[optind], backlog, BLOCKING); 319 } 320 321 /* 322 * Start the event handler. 323 */ 324 event_enable_read(sock, connect_event, CAST_INT_TO_VOID_PTR(sock)); 325 for (;;) 326 event_loop(-1); 327 } 328