xref: /netbsd-src/external/ibm-public/postfix/dist/src/smtpstone/qmqp-sink.c (revision c34236556bea94afcaca1782d7d228301edc3ea0)
1 /*	$NetBSD: qmqp-sink.c,v 1.1.1.2 2013/09/25 19:06:36 tron Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmqp-sink 1
6 /* SUMMARY
7 /*	multi-threaded QMQP test server
8 /* SYNOPSIS
9 /* .fi
10 /*	\fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR]
11 /*	[\fBinet:\fR][\fIhost\fR]:\fIport\fR \fIbacklog\fR
12 /*
13 /*	\fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR]
14 /*	\fBunix:\fR\fIpathname\fR \fIbacklog\fR
15 /* DESCRIPTION
16 /*	\fBqmqp-sink\fR listens on the named host (or address) and port.
17 /*	It receives messages from the network and throws them away.
18 /*	The purpose is to measure QMQP client performance, not protocol
19 /*	compliance.
20 /*	Connections can be accepted on IPv4 or IPv6 endpoints, or on
21 /*	UNIX-domain sockets.
22 /*	IPv4 and IPv6 are the default.
23 /*	This program is the complement of the \fBqmqp-source\fR(1) program.
24 /*
25 /*	Note: this is an unsupported test program. No attempt is made
26 /*	to maintain compatibility between successive versions.
27 /*
28 /*	Arguments:
29 /* .IP \fB-4\fR
30 /*	Support IPv4 only. This option has no effect when
31 /*	Postfix is built without IPv6 support.
32 /* .IP \fB-6\fR
33 /*	Support IPv6 only. This option is not available when
34 /*	Postfix is built without IPv6 support.
35 /* .IP \fB-c\fR
36 /*	Display a running counter that is updated whenever a delivery
37 /*	is completed.
38 /* .IP \fB-v\fR
39 /*	Increase verbosity. Specify \fB-v -v\fR to see some of the QMQP
40 /*	conversation.
41 /* .IP "\fB-x \fItime\fR
42 /*	Terminate after \fItime\fR seconds. This is to facilitate memory
43 /*	leak testing.
44 /* SEE ALSO
45 /*	qmqp-source(1), QMQP message generator
46 /* LICENSE
47 /* .ad
48 /* .fi
49 /*	The Secure Mailer license must be distributed with this software.
50 /* AUTHOR(S)
51 /*	Wietse Venema
52 /*	IBM T.J. Watson Research
53 /*	P.O. Box 704
54 /*	Yorktown Heights, NY 10598, USA
55 /*--*/
56 
57 /* System library. */
58 
59 #include <sys_defs.h>
60 #include <sys/socket.h>
61 #include <sys/wait.h>
62 #include <unistd.h>
63 #include <string.h>
64 #include <stdlib.h>
65 #include <fcntl.h>
66 #include <signal.h>
67 
68 /* Utility library. */
69 
70 #include <msg.h>
71 #include <vstring.h>
72 #include <vstream.h>
73 #include <listen.h>
74 #include <events.h>
75 #include <mymalloc.h>
76 #include <iostuff.h>
77 #include <msg_vstream.h>
78 #include <netstring.h>
79 #include <inet_proto.h>
80 
81 /* Global library. */
82 
83 #include <qmqp_proto.h>
84 #include <mail_version.h>
85 
86 /* Application-specific. */
87 
88 typedef struct {
89     VSTREAM *stream;			/* client connection */
90     int     count;			/* bytes to go */
91 } SINK_STATE;
92 
93 static int var_tmout;
94 static VSTRING *buffer;
95 static void disconnect(SINK_STATE *);
96 static int count_deliveries;
97 static int counter;
98 
99 /* send_reply - finish conversation */
100 
101 static void send_reply(SINK_STATE *state)
102 {
103     vstring_sprintf(buffer, "%cOk", QMQP_STAT_OK);
104     NETSTRING_PUT_BUF(state->stream, buffer);
105     netstring_fflush(state->stream);
106     if (count_deliveries) {
107 	counter++;
108 	vstream_printf("%d\r", counter);
109 	vstream_fflush(VSTREAM_OUT);
110     }
111     disconnect(state);
112 }
113 
114 /* read_data - read over-all netstring data */
115 
116 static void read_data(int unused_event, char *context)
117 {
118     SINK_STATE *state = (SINK_STATE *) context;
119     int     fd = vstream_fileno(state->stream);
120     int     count;
121 
122     /*
123      * Refill the VSTREAM buffer, if necessary.
124      */
125     if (VSTREAM_GETC(state->stream) == VSTREAM_EOF)
126 	netstring_except(state->stream, vstream_ftimeout(state->stream) ?
127 			 NETSTRING_ERR_TIME : NETSTRING_ERR_EOF);
128     state->count--;
129 
130     /*
131      * Flush the VSTREAM buffer. As documented, vstream_fseek() discards
132      * unread input.
133      */
134     if ((count = vstream_peek(state->stream)) > 0) {
135 	state->count -= count;
136 	if (state->count <= 0) {
137 	    send_reply(state);
138 	    return;
139 	}
140 	vstream_fseek(state->stream, 0L, 0);
141     }
142 
143     /*
144      * Do not block while waiting for the arrival of more data.
145      */
146     event_disable_readwrite(fd);
147     event_enable_read(fd, read_data, context);
148 }
149 
150 /* read_length - read over-all netstring length */
151 
152 static void read_length(int event, char *context)
153 {
154     SINK_STATE *state = (SINK_STATE *) context;
155 
156     switch (vstream_setjmp(state->stream)) {
157 
158     default:
159 	msg_panic("unknown error reading input");
160 
161     case NETSTRING_ERR_TIME:
162 	msg_panic("attempt to read non-readable socket");
163 	/* NOTREACHED */
164 
165     case NETSTRING_ERR_EOF:
166 	msg_warn("lost connection");
167 	disconnect(state);
168 	return;
169 
170     case NETSTRING_ERR_FORMAT:
171 	msg_warn("netstring format error");
172 	disconnect(state);
173 	return;
174 
175     case NETSTRING_ERR_SIZE:
176 	msg_warn("netstring size error");
177 	disconnect(state);
178 	return;
179 
180 	/*
181 	 * Include the netstring terminator in the read byte count. This
182 	 * violates abstractions.
183 	 */
184     case 0:
185 	state->count = netstring_get_length(state->stream) + 1;
186 	read_data(event, context);
187 	return;
188     }
189 }
190 
191 /* disconnect - handle disconnection events */
192 
193 static void disconnect(SINK_STATE *state)
194 {
195     event_disable_readwrite(vstream_fileno(state->stream));
196     vstream_fclose(state->stream);
197     myfree((char *) state);
198 }
199 
200 /* connect_event - handle connection events */
201 
202 static void connect_event(int unused_event, char *context)
203 {
204     int     sock = CAST_CHAR_PTR_TO_INT(context);
205     struct sockaddr sa;
206     SOCKADDR_SIZE len = sizeof(sa);
207     SINK_STATE *state;
208     int     fd;
209 
210     if ((fd = accept(sock, &sa, &len)) >= 0) {
211 	if (msg_verbose)
212 	    msg_info("connect (%s)",
213 #ifdef AF_LOCAL
214 		     sa.sa_family == AF_LOCAL ? "AF_LOCAL" :
215 #else
216 		     sa.sa_family == AF_UNIX ? "AF_UNIX" :
217 #endif
218 		     sa.sa_family == AF_INET ? "AF_INET" :
219 #ifdef AF_INET6
220 		     sa.sa_family == AF_INET6 ? "AF_INET6" :
221 #endif
222 		     "unknown protocol family");
223 	non_blocking(fd, NON_BLOCKING);
224 	state = (SINK_STATE *) mymalloc(sizeof(*state));
225 	state->stream = vstream_fdopen(fd, O_RDWR);
226 	vstream_tweak_sock(state->stream);
227 	netstring_setup(state->stream, var_tmout);
228 	event_enable_read(fd, read_length, (char *) state);
229     }
230 }
231 
232 /* terminate - voluntary exit */
233 
234 static void terminate(int unused_event, char *unused_context)
235 {
236     exit(0);
237 }
238 
239 /* usage - explain */
240 
241 static void usage(char *myname)
242 {
243     msg_fatal("usage: %s [-cv] [-x time] [host]:port backlog", myname);
244 }
245 
246 MAIL_VERSION_STAMP_DECLARE;
247 
248 int     main(int argc, char **argv)
249 {
250     int     sock;
251     int     backlog;
252     int     ch;
253     int     ttl;
254     const char *protocols = INET_PROTO_NAME_ALL;
255     INET_PROTO_INFO *proto_info;
256 
257     /*
258      * Fingerprint executables and core dumps.
259      */
260     MAIL_VERSION_STAMP_ALLOCATE;
261 
262     /*
263      * Fix 20051207.
264      */
265     signal(SIGPIPE, SIG_IGN);
266 
267     /*
268      * Initialize diagnostics.
269      */
270     msg_vstream_init(argv[0], VSTREAM_ERR);
271 
272     /*
273      * Parse JCL.
274      */
275     while ((ch = GETOPT(argc, argv, "46cvx:")) > 0) {
276 	switch (ch) {
277 	case '4':
278 	    protocols = INET_PROTO_NAME_IPV4;
279 	    break;
280 	case '6':
281 	    protocols = INET_PROTO_NAME_IPV6;
282 	    break;
283 	case 'c':
284 	    count_deliveries++;
285 	    break;
286 	case 'v':
287 	    msg_verbose++;
288 	    break;
289 	case 'x':
290 	    if ((ttl = atoi(optarg)) <= 0)
291 		usage(argv[0]);
292 	    event_request_timer(terminate, (char *) 0, ttl);
293 	    break;
294 	default:
295 	    usage(argv[0]);
296 	}
297     }
298     if (argc - optind != 2)
299 	usage(argv[0]);
300     if ((backlog = atoi(argv[optind + 1])) <= 0)
301 	usage(argv[0]);
302 
303     /*
304      * Initialize.
305      */
306     proto_info = inet_proto_init("protocols", protocols);
307     buffer = vstring_alloc(1024);
308     if (strncmp(argv[optind], "unix:", 5) == 0) {
309 	sock = unix_listen(argv[optind] + 5, backlog, BLOCKING);
310     } else {
311 	if (strncmp(argv[optind], "inet:", 5) == 0)
312 	    argv[optind] += 5;
313 	sock = inet_listen(argv[optind], backlog, BLOCKING);
314     }
315 
316     /*
317      * Start the event handler.
318      */
319     event_enable_read(sock, connect_event, CAST_INT_TO_CHAR_PTR(sock));
320     for (;;)
321 	event_loop(-1);
322 }
323