1 /* $NetBSD: qmqp-sink.c,v 1.1.1.2 2013/09/25 19:06:36 tron Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmqp-sink 1 6 /* SUMMARY 7 /* multi-threaded QMQP test server 8 /* SYNOPSIS 9 /* .fi 10 /* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR] 11 /* [\fBinet:\fR][\fIhost\fR]:\fIport\fR \fIbacklog\fR 12 /* 13 /* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR] 14 /* \fBunix:\fR\fIpathname\fR \fIbacklog\fR 15 /* DESCRIPTION 16 /* \fBqmqp-sink\fR listens on the named host (or address) and port. 17 /* It receives messages from the network and throws them away. 18 /* The purpose is to measure QMQP client performance, not protocol 19 /* compliance. 20 /* Connections can be accepted on IPv4 or IPv6 endpoints, or on 21 /* UNIX-domain sockets. 22 /* IPv4 and IPv6 are the default. 23 /* This program is the complement of the \fBqmqp-source\fR(1) program. 24 /* 25 /* Note: this is an unsupported test program. No attempt is made 26 /* to maintain compatibility between successive versions. 27 /* 28 /* Arguments: 29 /* .IP \fB-4\fR 30 /* Support IPv4 only. This option has no effect when 31 /* Postfix is built without IPv6 support. 32 /* .IP \fB-6\fR 33 /* Support IPv6 only. This option is not available when 34 /* Postfix is built without IPv6 support. 35 /* .IP \fB-c\fR 36 /* Display a running counter that is updated whenever a delivery 37 /* is completed. 38 /* .IP \fB-v\fR 39 /* Increase verbosity. Specify \fB-v -v\fR to see some of the QMQP 40 /* conversation. 41 /* .IP "\fB-x \fItime\fR 42 /* Terminate after \fItime\fR seconds. This is to facilitate memory 43 /* leak testing. 44 /* SEE ALSO 45 /* qmqp-source(1), QMQP message generator 46 /* LICENSE 47 /* .ad 48 /* .fi 49 /* The Secure Mailer license must be distributed with this software. 50 /* AUTHOR(S) 51 /* Wietse Venema 52 /* IBM T.J. Watson Research 53 /* P.O. Box 704 54 /* Yorktown Heights, NY 10598, USA 55 /*--*/ 56 57 /* System library. */ 58 59 #include <sys_defs.h> 60 #include <sys/socket.h> 61 #include <sys/wait.h> 62 #include <unistd.h> 63 #include <string.h> 64 #include <stdlib.h> 65 #include <fcntl.h> 66 #include <signal.h> 67 68 /* Utility library. */ 69 70 #include <msg.h> 71 #include <vstring.h> 72 #include <vstream.h> 73 #include <listen.h> 74 #include <events.h> 75 #include <mymalloc.h> 76 #include <iostuff.h> 77 #include <msg_vstream.h> 78 #include <netstring.h> 79 #include <inet_proto.h> 80 81 /* Global library. */ 82 83 #include <qmqp_proto.h> 84 #include <mail_version.h> 85 86 /* Application-specific. */ 87 88 typedef struct { 89 VSTREAM *stream; /* client connection */ 90 int count; /* bytes to go */ 91 } SINK_STATE; 92 93 static int var_tmout; 94 static VSTRING *buffer; 95 static void disconnect(SINK_STATE *); 96 static int count_deliveries; 97 static int counter; 98 99 /* send_reply - finish conversation */ 100 101 static void send_reply(SINK_STATE *state) 102 { 103 vstring_sprintf(buffer, "%cOk", QMQP_STAT_OK); 104 NETSTRING_PUT_BUF(state->stream, buffer); 105 netstring_fflush(state->stream); 106 if (count_deliveries) { 107 counter++; 108 vstream_printf("%d\r", counter); 109 vstream_fflush(VSTREAM_OUT); 110 } 111 disconnect(state); 112 } 113 114 /* read_data - read over-all netstring data */ 115 116 static void read_data(int unused_event, char *context) 117 { 118 SINK_STATE *state = (SINK_STATE *) context; 119 int fd = vstream_fileno(state->stream); 120 int count; 121 122 /* 123 * Refill the VSTREAM buffer, if necessary. 124 */ 125 if (VSTREAM_GETC(state->stream) == VSTREAM_EOF) 126 netstring_except(state->stream, vstream_ftimeout(state->stream) ? 127 NETSTRING_ERR_TIME : NETSTRING_ERR_EOF); 128 state->count--; 129 130 /* 131 * Flush the VSTREAM buffer. As documented, vstream_fseek() discards 132 * unread input. 133 */ 134 if ((count = vstream_peek(state->stream)) > 0) { 135 state->count -= count; 136 if (state->count <= 0) { 137 send_reply(state); 138 return; 139 } 140 vstream_fseek(state->stream, 0L, 0); 141 } 142 143 /* 144 * Do not block while waiting for the arrival of more data. 145 */ 146 event_disable_readwrite(fd); 147 event_enable_read(fd, read_data, context); 148 } 149 150 /* read_length - read over-all netstring length */ 151 152 static void read_length(int event, char *context) 153 { 154 SINK_STATE *state = (SINK_STATE *) context; 155 156 switch (vstream_setjmp(state->stream)) { 157 158 default: 159 msg_panic("unknown error reading input"); 160 161 case NETSTRING_ERR_TIME: 162 msg_panic("attempt to read non-readable socket"); 163 /* NOTREACHED */ 164 165 case NETSTRING_ERR_EOF: 166 msg_warn("lost connection"); 167 disconnect(state); 168 return; 169 170 case NETSTRING_ERR_FORMAT: 171 msg_warn("netstring format error"); 172 disconnect(state); 173 return; 174 175 case NETSTRING_ERR_SIZE: 176 msg_warn("netstring size error"); 177 disconnect(state); 178 return; 179 180 /* 181 * Include the netstring terminator in the read byte count. This 182 * violates abstractions. 183 */ 184 case 0: 185 state->count = netstring_get_length(state->stream) + 1; 186 read_data(event, context); 187 return; 188 } 189 } 190 191 /* disconnect - handle disconnection events */ 192 193 static void disconnect(SINK_STATE *state) 194 { 195 event_disable_readwrite(vstream_fileno(state->stream)); 196 vstream_fclose(state->stream); 197 myfree((char *) state); 198 } 199 200 /* connect_event - handle connection events */ 201 202 static void connect_event(int unused_event, char *context) 203 { 204 int sock = CAST_CHAR_PTR_TO_INT(context); 205 struct sockaddr sa; 206 SOCKADDR_SIZE len = sizeof(sa); 207 SINK_STATE *state; 208 int fd; 209 210 if ((fd = accept(sock, &sa, &len)) >= 0) { 211 if (msg_verbose) 212 msg_info("connect (%s)", 213 #ifdef AF_LOCAL 214 sa.sa_family == AF_LOCAL ? "AF_LOCAL" : 215 #else 216 sa.sa_family == AF_UNIX ? "AF_UNIX" : 217 #endif 218 sa.sa_family == AF_INET ? "AF_INET" : 219 #ifdef AF_INET6 220 sa.sa_family == AF_INET6 ? "AF_INET6" : 221 #endif 222 "unknown protocol family"); 223 non_blocking(fd, NON_BLOCKING); 224 state = (SINK_STATE *) mymalloc(sizeof(*state)); 225 state->stream = vstream_fdopen(fd, O_RDWR); 226 vstream_tweak_sock(state->stream); 227 netstring_setup(state->stream, var_tmout); 228 event_enable_read(fd, read_length, (char *) state); 229 } 230 } 231 232 /* terminate - voluntary exit */ 233 234 static void terminate(int unused_event, char *unused_context) 235 { 236 exit(0); 237 } 238 239 /* usage - explain */ 240 241 static void usage(char *myname) 242 { 243 msg_fatal("usage: %s [-cv] [-x time] [host]:port backlog", myname); 244 } 245 246 MAIL_VERSION_STAMP_DECLARE; 247 248 int main(int argc, char **argv) 249 { 250 int sock; 251 int backlog; 252 int ch; 253 int ttl; 254 const char *protocols = INET_PROTO_NAME_ALL; 255 INET_PROTO_INFO *proto_info; 256 257 /* 258 * Fingerprint executables and core dumps. 259 */ 260 MAIL_VERSION_STAMP_ALLOCATE; 261 262 /* 263 * Fix 20051207. 264 */ 265 signal(SIGPIPE, SIG_IGN); 266 267 /* 268 * Initialize diagnostics. 269 */ 270 msg_vstream_init(argv[0], VSTREAM_ERR); 271 272 /* 273 * Parse JCL. 274 */ 275 while ((ch = GETOPT(argc, argv, "46cvx:")) > 0) { 276 switch (ch) { 277 case '4': 278 protocols = INET_PROTO_NAME_IPV4; 279 break; 280 case '6': 281 protocols = INET_PROTO_NAME_IPV6; 282 break; 283 case 'c': 284 count_deliveries++; 285 break; 286 case 'v': 287 msg_verbose++; 288 break; 289 case 'x': 290 if ((ttl = atoi(optarg)) <= 0) 291 usage(argv[0]); 292 event_request_timer(terminate, (char *) 0, ttl); 293 break; 294 default: 295 usage(argv[0]); 296 } 297 } 298 if (argc - optind != 2) 299 usage(argv[0]); 300 if ((backlog = atoi(argv[optind + 1])) <= 0) 301 usage(argv[0]); 302 303 /* 304 * Initialize. 305 */ 306 proto_info = inet_proto_init("protocols", protocols); 307 buffer = vstring_alloc(1024); 308 if (strncmp(argv[optind], "unix:", 5) == 0) { 309 sock = unix_listen(argv[optind] + 5, backlog, BLOCKING); 310 } else { 311 if (strncmp(argv[optind], "inet:", 5) == 0) 312 argv[optind] += 5; 313 sock = inet_listen(argv[optind], backlog, BLOCKING); 314 } 315 316 /* 317 * Start the event handler. 318 */ 319 event_enable_read(sock, connect_event, CAST_INT_TO_CHAR_PTR(sock)); 320 for (;;) 321 event_loop(-1); 322 } 323