1 /* $NetBSD: qmqp-sink.c,v 1.1.1.1 2009/06/23 10:08:56 tron Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmqp-sink 1 6 /* SUMMARY 7 /* multi-threaded QMQP test server 8 /* SYNOPSIS 9 /* .fi 10 /* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR] 11 /* [\fBinet:\fR][\fIhost\fR]:\fIport\fR \fIbacklog\fR 12 /* 13 /* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR] 14 /* \fBunix:\fR\fIpathname\fR \fIbacklog\fR 15 /* DESCRIPTION 16 /* \fBqmqp-sink\fR listens on the named host (or address) and port. 17 /* It receives messages from the network and throws them away. 18 /* The purpose is to measure QMQP client performance, not protocol 19 /* compliance. 20 /* Connections can be accepted on IPv4 or IPv6 endpoints, or on 21 /* UNIX-domain sockets. 22 /* IPv4 and IPv6 are the default. 23 /* This program is the complement of the \fBqmqp-source\fR(1) program. 24 /* 25 /* Note: this is an unsupported test program. No attempt is made 26 /* to maintain compatibility between successive versions. 27 /* 28 /* Arguments: 29 /* .IP \fB-4\fR 30 /* Support IPv4 only. This option has no effect when 31 /* Postfix is built without IPv6 support. 32 /* .IP \fB-6\fR 33 /* Support IPv6 only. This option is not available when 34 /* Postfix is built without IPv6 support. 35 /* .IP \fB-c\fR 36 /* Display a running counter that is updated whenever a delivery 37 /* is completed. 38 /* .IP \fB-v\fR 39 /* Increase verbosity. Specify \fB-v -v\fR to see some of the QMQP 40 /* conversation. 41 /* .IP "\fB-x \fItime\fR 42 /* Terminate after \fItime\fR seconds. This is to facilitate memory 43 /* leak testing. 44 /* SEE ALSO 45 /* qmqp-source(1), QMQP message generator 46 /* LICENSE 47 /* .ad 48 /* .fi 49 /* The Secure Mailer license must be distributed with this software. 50 /* AUTHOR(S) 51 /* Wietse Venema 52 /* IBM T.J. Watson Research 53 /* P.O. Box 704 54 /* Yorktown Heights, NY 10598, USA 55 /*--*/ 56 57 /* System library. */ 58 59 #include <sys_defs.h> 60 #include <sys/socket.h> 61 #include <sys/wait.h> 62 #include <unistd.h> 63 #include <string.h> 64 #include <stdlib.h> 65 #include <fcntl.h> 66 #include <signal.h> 67 68 /* Utility library. */ 69 70 #include <msg.h> 71 #include <vstring.h> 72 #include <vstream.h> 73 #include <listen.h> 74 #include <events.h> 75 #include <mymalloc.h> 76 #include <iostuff.h> 77 #include <msg_vstream.h> 78 #include <netstring.h> 79 #include <inet_proto.h> 80 81 /* Global library. */ 82 83 #include <qmqp_proto.h> 84 #include <mail_version.h> 85 86 /* Application-specific. */ 87 88 typedef struct { 89 VSTREAM *stream; /* client connection */ 90 int count; /* bytes to go */ 91 } SINK_STATE; 92 93 static int var_tmout; 94 static VSTRING *buffer; 95 static void disconnect(SINK_STATE *); 96 static int count_deliveries; 97 static int counter; 98 99 /* send_reply - finish conversation */ 100 101 static void send_reply(SINK_STATE *state) 102 { 103 vstring_sprintf(buffer, "%cOk", QMQP_STAT_OK); 104 NETSTRING_PUT_BUF(state->stream, buffer); 105 netstring_fflush(state->stream); 106 if (count_deliveries) { 107 counter++; 108 vstream_printf("%d\r", counter); 109 vstream_fflush(VSTREAM_OUT); 110 } 111 disconnect(state); 112 } 113 114 /* read_data - read over-all netstring data */ 115 116 static void read_data(int unused_event, char *context) 117 { 118 SINK_STATE *state = (SINK_STATE *) context; 119 int fd = vstream_fileno(state->stream); 120 int count; 121 122 /* 123 * Refill the VSTREAM buffer, if necessary. 124 */ 125 if (VSTREAM_GETC(state->stream) == VSTREAM_EOF) 126 netstring_except(state->stream, vstream_ftimeout(state->stream) ? 127 NETSTRING_ERR_TIME : NETSTRING_ERR_EOF); 128 state->count--; 129 130 /* 131 * Flush the VSTREAM buffer. As documented, vstream_fseek() discards 132 * unread input. 133 */ 134 if ((count = vstream_peek(state->stream)) > 0) { 135 state->count -= count; 136 if (state->count <= 0) { 137 send_reply(state); 138 return; 139 } 140 vstream_fseek(state->stream, 0L, 0); 141 } 142 143 /* 144 * Do not block while waiting for the arrival of more data. 145 */ 146 event_disable_readwrite(fd); 147 event_enable_read(fd, read_data, context); 148 } 149 150 /* read_length - read over-all netstring length */ 151 152 static void read_length(int event, char *context) 153 { 154 SINK_STATE *state = (SINK_STATE *) context; 155 156 switch (vstream_setjmp(state->stream)) { 157 158 default: 159 msg_panic("unknown error reading input"); 160 161 case NETSTRING_ERR_TIME: 162 msg_panic("attempt to read non-readable socket"); 163 /* NOTREACHED */ 164 165 case NETSTRING_ERR_EOF: 166 msg_warn("lost connection"); 167 disconnect(state); 168 return; 169 170 case NETSTRING_ERR_FORMAT: 171 msg_warn("netstring format error"); 172 disconnect(state); 173 return; 174 175 case NETSTRING_ERR_SIZE: 176 msg_warn("netstring size error"); 177 disconnect(state); 178 return; 179 180 /* 181 * Include the netstring terminator in the read byte count. This 182 * violates abstractions. 183 */ 184 case 0: 185 state->count = netstring_get_length(state->stream) + 1; 186 read_data(event, context); 187 return; 188 } 189 } 190 191 /* disconnect - handle disconnection events */ 192 193 static void disconnect(SINK_STATE *state) 194 { 195 event_disable_readwrite(vstream_fileno(state->stream)); 196 vstream_fclose(state->stream); 197 myfree((char *) state); 198 } 199 200 /* connect_event - handle connection events */ 201 202 static void connect_event(int unused_event, char *context) 203 { 204 int sock = CAST_CHAR_PTR_TO_INT(context); 205 struct sockaddr sa; 206 SOCKADDR_SIZE len = sizeof(sa); 207 SINK_STATE *state; 208 int fd; 209 210 if ((fd = accept(sock, &sa, &len)) >= 0) { 211 if (msg_verbose) 212 msg_info("connect (%s)", 213 #ifdef AF_LOCAL 214 sa.sa_family == AF_LOCAL ? "AF_LOCAL" : 215 #else 216 sa.sa_family == AF_UNIX ? "AF_UNIX" : 217 #endif 218 sa.sa_family == AF_INET ? "AF_INET" : 219 #ifdef AF_INET6 220 sa.sa_family == AF_INET6 ? "AF_INET6" : 221 #endif 222 "unknown protocol family"); 223 non_blocking(fd, NON_BLOCKING); 224 state = (SINK_STATE *) mymalloc(sizeof(*state)); 225 state->stream = vstream_fdopen(fd, O_RDWR); 226 netstring_setup(state->stream, var_tmout); 227 event_enable_read(fd, read_length, (char *) state); 228 } 229 } 230 231 /* terminate - voluntary exit */ 232 233 static void terminate(int unused_event, char *unused_context) 234 { 235 exit(0); 236 } 237 238 /* usage - explain */ 239 240 static void usage(char *myname) 241 { 242 msg_fatal("usage: %s [-cv] [-x time] [host]:port backlog", myname); 243 } 244 245 MAIL_VERSION_STAMP_DECLARE; 246 247 int main(int argc, char **argv) 248 { 249 int sock; 250 int backlog; 251 int ch; 252 int ttl; 253 const char *protocols = INET_PROTO_NAME_ALL; 254 INET_PROTO_INFO *proto_info; 255 256 /* 257 * Fingerprint executables and core dumps. 258 */ 259 MAIL_VERSION_STAMP_ALLOCATE; 260 261 /* 262 * Fix 20051207. 263 */ 264 signal(SIGPIPE, SIG_IGN); 265 266 /* 267 * Initialize diagnostics. 268 */ 269 msg_vstream_init(argv[0], VSTREAM_ERR); 270 271 /* 272 * Parse JCL. 273 */ 274 while ((ch = GETOPT(argc, argv, "46cvx:")) > 0) { 275 switch (ch) { 276 case '4': 277 protocols = INET_PROTO_NAME_IPV4; 278 break; 279 case '6': 280 protocols = INET_PROTO_NAME_IPV6; 281 break; 282 case 'c': 283 count_deliveries++; 284 break; 285 case 'v': 286 msg_verbose++; 287 break; 288 case 'x': 289 if ((ttl = atoi(optarg)) <= 0) 290 usage(argv[0]); 291 event_request_timer(terminate, (char *) 0, ttl); 292 break; 293 default: 294 usage(argv[0]); 295 } 296 } 297 if (argc - optind != 2) 298 usage(argv[0]); 299 if ((backlog = atoi(argv[optind + 1])) <= 0) 300 usage(argv[0]); 301 302 /* 303 * Initialize. 304 */ 305 proto_info = inet_proto_init("protocols", protocols); 306 buffer = vstring_alloc(1024); 307 if (strncmp(argv[optind], "unix:", 5) == 0) { 308 sock = unix_listen(argv[optind] + 5, backlog, BLOCKING); 309 } else { 310 if (strncmp(argv[optind], "inet:", 5) == 0) 311 argv[optind] += 5; 312 sock = inet_listen(argv[optind], backlog, BLOCKING); 313 } 314 315 /* 316 * Start the event handler. 317 */ 318 event_enable_read(sock, connect_event, CAST_INT_TO_CHAR_PTR(sock)); 319 for (;;) 320 event_loop(-1); 321 } 322