xref: /netbsd-src/external/ibm-public/postfix/dist/src/smtpstone/qmqp-sink.c (revision e89934bbf778a6d6d6894877c4da59d0c7835b0f)
1 /*	$NetBSD: qmqp-sink.c,v 1.2 2017/02/14 01:16:48 christos Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmqp-sink 1
6 /* SUMMARY
7 /*	parallelized QMQP test server
8 /* SYNOPSIS
9 /* .fi
10 /*	\fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR]
11 /*	[\fBinet:\fR][\fIhost\fR]:\fIport\fR \fIbacklog\fR
12 /*
13 /*	\fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR]
14 /*	\fBunix:\fR\fIpathname\fR \fIbacklog\fR
15 /* DESCRIPTION
16 /*	\fBqmqp-sink\fR listens on the named host (or address) and port.
17 /*	It receives messages from the network and throws them away.
18 /*	The purpose is to measure QMQP client performance, not protocol
19 /*	compliance.
20 /*	Connections can be accepted on IPv4 or IPv6 endpoints, or on
21 /*	UNIX-domain sockets.
22 /*	IPv4 and IPv6 are the default.
23 /*	This program is the complement of the \fBqmqp-source\fR(1) program.
24 /*
25 /*	Note: this is an unsupported test program. No attempt is made
26 /*	to maintain compatibility between successive versions.
27 /*
28 /*	Arguments:
29 /* .IP \fB-4\fR
30 /*	Support IPv4 only. This option has no effect when
31 /*	Postfix is built without IPv6 support.
32 /* .IP \fB-6\fR
33 /*	Support IPv6 only. This option is not available when
34 /*	Postfix is built without IPv6 support.
35 /* .IP \fB-c\fR
36 /*	Display a running counter that is updated whenever a delivery
37 /*	is completed.
38 /* .IP \fB-v\fR
39 /*	Increase verbosity. Specify \fB-v -v\fR to see some of the QMQP
40 /*	conversation.
41 /* .IP "\fB-x \fItime\fR"
42 /*	Terminate after \fItime\fR seconds. This is to facilitate memory
43 /*	leak testing.
44 /* SEE ALSO
45 /*	qmqp-source(1), QMQP message generator
46 /* LICENSE
47 /* .ad
48 /* .fi
49 /*	The Secure Mailer license must be distributed with this software.
50 /* AUTHOR(S)
51 /*	Wietse Venema
52 /*	IBM T.J. Watson Research
53 /*	P.O. Box 704
54 /*	Yorktown Heights, NY 10598, USA
55 /*
56 /*	Wietse Venema
57 /*	Google, Inc.
58 /*	111 8th Avenue
59 /*	New York, NY 10011, USA
60 /*--*/
61 
62 /* System library. */
63 
64 #include <sys_defs.h>
65 #include <sys/socket.h>
66 #include <sys/wait.h>
67 #include <unistd.h>
68 #include <string.h>
69 #include <stdlib.h>
70 #include <fcntl.h>
71 #include <signal.h>
72 
73 /* Utility library. */
74 
75 #include <msg.h>
76 #include <vstring.h>
77 #include <vstream.h>
78 #include <listen.h>
79 #include <events.h>
80 #include <mymalloc.h>
81 #include <iostuff.h>
82 #include <msg_vstream.h>
83 #include <netstring.h>
84 #include <inet_proto.h>
85 
86 /* Global library. */
87 
88 #include <qmqp_proto.h>
89 #include <mail_version.h>
90 
91 /* Application-specific. */
92 
93 typedef struct {
94     VSTREAM *stream;			/* client connection */
95     int     count;			/* bytes to go */
96 } SINK_STATE;
97 
98 static int var_tmout;
99 static VSTRING *buffer;
100 static void disconnect(SINK_STATE *);
101 static int count_deliveries;
102 static int counter;
103 
104 /* send_reply - finish conversation */
105 
send_reply(SINK_STATE * state)106 static void send_reply(SINK_STATE *state)
107 {
108     vstring_sprintf(buffer, "%cOk", QMQP_STAT_OK);
109     NETSTRING_PUT_BUF(state->stream, buffer);
110     netstring_fflush(state->stream);
111     if (count_deliveries) {
112 	counter++;
113 	vstream_printf("%d\r", counter);
114 	vstream_fflush(VSTREAM_OUT);
115     }
116     disconnect(state);
117 }
118 
119 /* read_data - read over-all netstring data */
120 
read_data(int unused_event,void * context)121 static void read_data(int unused_event, void *context)
122 {
123     SINK_STATE *state = (SINK_STATE *) context;
124     int     fd = vstream_fileno(state->stream);
125     int     count;
126 
127     /*
128      * Refill the VSTREAM buffer, if necessary.
129      */
130     if (VSTREAM_GETC(state->stream) == VSTREAM_EOF)
131 	netstring_except(state->stream, vstream_ftimeout(state->stream) ?
132 			 NETSTRING_ERR_TIME : NETSTRING_ERR_EOF);
133     state->count--;
134 
135     /*
136      * Flush the VSTREAM buffer. As documented, vstream_fseek() discards
137      * unread input.
138      */
139     if ((count = vstream_peek(state->stream)) > 0) {
140 	state->count -= count;
141 	if (state->count <= 0) {
142 	    send_reply(state);
143 	    return;
144 	}
145 	vstream_fpurge(state->stream, VSTREAM_PURGE_BOTH);
146     }
147 
148     /*
149      * Do not block while waiting for the arrival of more data.
150      */
151     event_disable_readwrite(fd);
152     event_enable_read(fd, read_data, context);
153 }
154 
155 /* read_length - read over-all netstring length */
156 
read_length(int event,void * context)157 static void read_length(int event, void *context)
158 {
159     SINK_STATE *state = (SINK_STATE *) context;
160 
161     switch (vstream_setjmp(state->stream)) {
162 
163     default:
164 	msg_panic("unknown error reading input");
165 
166     case NETSTRING_ERR_TIME:
167 	msg_panic("attempt to read non-readable socket");
168 	/* NOTREACHED */
169 
170     case NETSTRING_ERR_EOF:
171 	msg_warn("lost connection");
172 	disconnect(state);
173 	return;
174 
175     case NETSTRING_ERR_FORMAT:
176 	msg_warn("netstring format error");
177 	disconnect(state);
178 	return;
179 
180     case NETSTRING_ERR_SIZE:
181 	msg_warn("netstring size error");
182 	disconnect(state);
183 	return;
184 
185 	/*
186 	 * Include the netstring terminator in the read byte count. This
187 	 * violates abstractions.
188 	 */
189     case 0:
190 	state->count = netstring_get_length(state->stream) + 1;
191 	read_data(event, context);
192 	return;
193     }
194 }
195 
196 /* disconnect - handle disconnection events */
197 
disconnect(SINK_STATE * state)198 static void disconnect(SINK_STATE *state)
199 {
200     event_disable_readwrite(vstream_fileno(state->stream));
201     vstream_fclose(state->stream);
202     myfree((void *) state);
203 }
204 
205 /* connect_event - handle connection events */
206 
connect_event(int unused_event,void * context)207 static void connect_event(int unused_event, void *context)
208 {
209     int     sock = CAST_ANY_PTR_TO_INT(context);
210     struct sockaddr_storage ss;
211     SOCKADDR_SIZE len = sizeof(ss);
212     struct sockaddr *sa = (struct sockaddr *) &ss;
213     SINK_STATE *state;
214     int     fd;
215 
216     if ((fd = accept(sock, sa, &len)) >= 0) {
217 	if (msg_verbose)
218 	    msg_info("connect (%s)",
219 #ifdef AF_LOCAL
220 		     sa->sa_family == AF_LOCAL ? "AF_LOCAL" :
221 #else
222 		     sa->sa_family == AF_UNIX ? "AF_UNIX" :
223 #endif
224 		     sa->sa_family == AF_INET ? "AF_INET" :
225 #ifdef AF_INET6
226 		     sa->sa_family == AF_INET6 ? "AF_INET6" :
227 #endif
228 		     "unknown protocol family");
229 	non_blocking(fd, NON_BLOCKING);
230 	state = (SINK_STATE *) mymalloc(sizeof(*state));
231 	state->stream = vstream_fdopen(fd, O_RDWR);
232 	vstream_tweak_sock(state->stream);
233 	netstring_setup(state->stream, var_tmout);
234 	event_enable_read(fd, read_length, (void *) state);
235     }
236 }
237 
238 /* terminate - voluntary exit */
239 
terminate(int unused_event,void * unused_context)240 static void terminate(int unused_event, void *unused_context)
241 {
242     exit(0);
243 }
244 
245 /* usage - explain */
246 
usage(char * myname)247 static void usage(char *myname)
248 {
249     msg_fatal("usage: %s [-cv] [-x time] [host]:port backlog", myname);
250 }
251 
252 MAIL_VERSION_STAMP_DECLARE;
253 
main(int argc,char ** argv)254 int     main(int argc, char **argv)
255 {
256     int     sock;
257     int     backlog;
258     int     ch;
259     int     ttl;
260     const char *protocols = INET_PROTO_NAME_ALL;
261 
262     /*
263      * Fingerprint executables and core dumps.
264      */
265     MAIL_VERSION_STAMP_ALLOCATE;
266 
267     /*
268      * Fix 20051207.
269      */
270     signal(SIGPIPE, SIG_IGN);
271 
272     /*
273      * Initialize diagnostics.
274      */
275     msg_vstream_init(argv[0], VSTREAM_ERR);
276 
277     /*
278      * Parse JCL.
279      */
280     while ((ch = GETOPT(argc, argv, "46cvx:")) > 0) {
281 	switch (ch) {
282 	case '4':
283 	    protocols = INET_PROTO_NAME_IPV4;
284 	    break;
285 	case '6':
286 	    protocols = INET_PROTO_NAME_IPV6;
287 	    break;
288 	case 'c':
289 	    count_deliveries++;
290 	    break;
291 	case 'v':
292 	    msg_verbose++;
293 	    break;
294 	case 'x':
295 	    if ((ttl = atoi(optarg)) <= 0)
296 		usage(argv[0]);
297 	    event_request_timer(terminate, (void *) 0, ttl);
298 	    break;
299 	default:
300 	    usage(argv[0]);
301 	}
302     }
303     if (argc - optind != 2)
304 	usage(argv[0]);
305     if ((backlog = atoi(argv[optind + 1])) <= 0)
306 	usage(argv[0]);
307 
308     /*
309      * Initialize.
310      */
311     (void) inet_proto_init("protocols", protocols);
312     buffer = vstring_alloc(1024);
313     if (strncmp(argv[optind], "unix:", 5) == 0) {
314 	sock = unix_listen(argv[optind] + 5, backlog, BLOCKING);
315     } else {
316 	if (strncmp(argv[optind], "inet:", 5) == 0)
317 	    argv[optind] += 5;
318 	sock = inet_listen(argv[optind], backlog, BLOCKING);
319     }
320 
321     /*
322      * Start the event handler.
323      */
324     event_enable_read(sock, connect_event, CAST_INT_TO_VOID_PTR(sock));
325     for (;;)
326 	event_loop(-1);
327 }
328