1 /* $NetBSD: qmqp-sink.c,v 1.2 2017/02/14 01:16:48 christos Exp $ */
2
3 /*++
4 /* NAME
5 /* qmqp-sink 1
6 /* SUMMARY
7 /* parallelized QMQP test server
8 /* SYNOPSIS
9 /* .fi
10 /* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR]
11 /* [\fBinet:\fR][\fIhost\fR]:\fIport\fR \fIbacklog\fR
12 /*
13 /* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR]
14 /* \fBunix:\fR\fIpathname\fR \fIbacklog\fR
15 /* DESCRIPTION
16 /* \fBqmqp-sink\fR listens on the named host (or address) and port.
17 /* It receives messages from the network and throws them away.
18 /* The purpose is to measure QMQP client performance, not protocol
19 /* compliance.
20 /* Connections can be accepted on IPv4 or IPv6 endpoints, or on
21 /* UNIX-domain sockets.
22 /* IPv4 and IPv6 are the default.
23 /* This program is the complement of the \fBqmqp-source\fR(1) program.
24 /*
25 /* Note: this is an unsupported test program. No attempt is made
26 /* to maintain compatibility between successive versions.
27 /*
28 /* Arguments:
29 /* .IP \fB-4\fR
30 /* Support IPv4 only. This option has no effect when
31 /* Postfix is built without IPv6 support.
32 /* .IP \fB-6\fR
33 /* Support IPv6 only. This option is not available when
34 /* Postfix is built without IPv6 support.
35 /* .IP \fB-c\fR
36 /* Display a running counter that is updated whenever a delivery
37 /* is completed.
38 /* .IP \fB-v\fR
39 /* Increase verbosity. Specify \fB-v -v\fR to see some of the QMQP
40 /* conversation.
41 /* .IP "\fB-x \fItime\fR"
42 /* Terminate after \fItime\fR seconds. This is to facilitate memory
43 /* leak testing.
44 /* SEE ALSO
45 /* qmqp-source(1), QMQP message generator
46 /* LICENSE
47 /* .ad
48 /* .fi
49 /* The Secure Mailer license must be distributed with this software.
50 /* AUTHOR(S)
51 /* Wietse Venema
52 /* IBM T.J. Watson Research
53 /* P.O. Box 704
54 /* Yorktown Heights, NY 10598, USA
55 /*
56 /* Wietse Venema
57 /* Google, Inc.
58 /* 111 8th Avenue
59 /* New York, NY 10011, USA
60 /*--*/
61
62 /* System library. */
63
64 #include <sys_defs.h>
65 #include <sys/socket.h>
66 #include <sys/wait.h>
67 #include <unistd.h>
68 #include <string.h>
69 #include <stdlib.h>
70 #include <fcntl.h>
71 #include <signal.h>
72
73 /* Utility library. */
74
75 #include <msg.h>
76 #include <vstring.h>
77 #include <vstream.h>
78 #include <listen.h>
79 #include <events.h>
80 #include <mymalloc.h>
81 #include <iostuff.h>
82 #include <msg_vstream.h>
83 #include <netstring.h>
84 #include <inet_proto.h>
85
86 /* Global library. */
87
88 #include <qmqp_proto.h>
89 #include <mail_version.h>
90
91 /* Application-specific. */
92
93 typedef struct {
94 VSTREAM *stream; /* client connection */
95 int count; /* bytes to go */
96 } SINK_STATE;
97
98 static int var_tmout;
99 static VSTRING *buffer;
100 static void disconnect(SINK_STATE *);
101 static int count_deliveries;
102 static int counter;
103
104 /* send_reply - finish conversation */
105
send_reply(SINK_STATE * state)106 static void send_reply(SINK_STATE *state)
107 {
108 vstring_sprintf(buffer, "%cOk", QMQP_STAT_OK);
109 NETSTRING_PUT_BUF(state->stream, buffer);
110 netstring_fflush(state->stream);
111 if (count_deliveries) {
112 counter++;
113 vstream_printf("%d\r", counter);
114 vstream_fflush(VSTREAM_OUT);
115 }
116 disconnect(state);
117 }
118
119 /* read_data - read over-all netstring data */
120
read_data(int unused_event,void * context)121 static void read_data(int unused_event, void *context)
122 {
123 SINK_STATE *state = (SINK_STATE *) context;
124 int fd = vstream_fileno(state->stream);
125 int count;
126
127 /*
128 * Refill the VSTREAM buffer, if necessary.
129 */
130 if (VSTREAM_GETC(state->stream) == VSTREAM_EOF)
131 netstring_except(state->stream, vstream_ftimeout(state->stream) ?
132 NETSTRING_ERR_TIME : NETSTRING_ERR_EOF);
133 state->count--;
134
135 /*
136 * Flush the VSTREAM buffer. As documented, vstream_fseek() discards
137 * unread input.
138 */
139 if ((count = vstream_peek(state->stream)) > 0) {
140 state->count -= count;
141 if (state->count <= 0) {
142 send_reply(state);
143 return;
144 }
145 vstream_fpurge(state->stream, VSTREAM_PURGE_BOTH);
146 }
147
148 /*
149 * Do not block while waiting for the arrival of more data.
150 */
151 event_disable_readwrite(fd);
152 event_enable_read(fd, read_data, context);
153 }
154
155 /* read_length - read over-all netstring length */
156
read_length(int event,void * context)157 static void read_length(int event, void *context)
158 {
159 SINK_STATE *state = (SINK_STATE *) context;
160
161 switch (vstream_setjmp(state->stream)) {
162
163 default:
164 msg_panic("unknown error reading input");
165
166 case NETSTRING_ERR_TIME:
167 msg_panic("attempt to read non-readable socket");
168 /* NOTREACHED */
169
170 case NETSTRING_ERR_EOF:
171 msg_warn("lost connection");
172 disconnect(state);
173 return;
174
175 case NETSTRING_ERR_FORMAT:
176 msg_warn("netstring format error");
177 disconnect(state);
178 return;
179
180 case NETSTRING_ERR_SIZE:
181 msg_warn("netstring size error");
182 disconnect(state);
183 return;
184
185 /*
186 * Include the netstring terminator in the read byte count. This
187 * violates abstractions.
188 */
189 case 0:
190 state->count = netstring_get_length(state->stream) + 1;
191 read_data(event, context);
192 return;
193 }
194 }
195
196 /* disconnect - handle disconnection events */
197
disconnect(SINK_STATE * state)198 static void disconnect(SINK_STATE *state)
199 {
200 event_disable_readwrite(vstream_fileno(state->stream));
201 vstream_fclose(state->stream);
202 myfree((void *) state);
203 }
204
205 /* connect_event - handle connection events */
206
connect_event(int unused_event,void * context)207 static void connect_event(int unused_event, void *context)
208 {
209 int sock = CAST_ANY_PTR_TO_INT(context);
210 struct sockaddr_storage ss;
211 SOCKADDR_SIZE len = sizeof(ss);
212 struct sockaddr *sa = (struct sockaddr *) &ss;
213 SINK_STATE *state;
214 int fd;
215
216 if ((fd = accept(sock, sa, &len)) >= 0) {
217 if (msg_verbose)
218 msg_info("connect (%s)",
219 #ifdef AF_LOCAL
220 sa->sa_family == AF_LOCAL ? "AF_LOCAL" :
221 #else
222 sa->sa_family == AF_UNIX ? "AF_UNIX" :
223 #endif
224 sa->sa_family == AF_INET ? "AF_INET" :
225 #ifdef AF_INET6
226 sa->sa_family == AF_INET6 ? "AF_INET6" :
227 #endif
228 "unknown protocol family");
229 non_blocking(fd, NON_BLOCKING);
230 state = (SINK_STATE *) mymalloc(sizeof(*state));
231 state->stream = vstream_fdopen(fd, O_RDWR);
232 vstream_tweak_sock(state->stream);
233 netstring_setup(state->stream, var_tmout);
234 event_enable_read(fd, read_length, (void *) state);
235 }
236 }
237
238 /* terminate - voluntary exit */
239
terminate(int unused_event,void * unused_context)240 static void terminate(int unused_event, void *unused_context)
241 {
242 exit(0);
243 }
244
245 /* usage - explain */
246
usage(char * myname)247 static void usage(char *myname)
248 {
249 msg_fatal("usage: %s [-cv] [-x time] [host]:port backlog", myname);
250 }
251
252 MAIL_VERSION_STAMP_DECLARE;
253
main(int argc,char ** argv)254 int main(int argc, char **argv)
255 {
256 int sock;
257 int backlog;
258 int ch;
259 int ttl;
260 const char *protocols = INET_PROTO_NAME_ALL;
261
262 /*
263 * Fingerprint executables and core dumps.
264 */
265 MAIL_VERSION_STAMP_ALLOCATE;
266
267 /*
268 * Fix 20051207.
269 */
270 signal(SIGPIPE, SIG_IGN);
271
272 /*
273 * Initialize diagnostics.
274 */
275 msg_vstream_init(argv[0], VSTREAM_ERR);
276
277 /*
278 * Parse JCL.
279 */
280 while ((ch = GETOPT(argc, argv, "46cvx:")) > 0) {
281 switch (ch) {
282 case '4':
283 protocols = INET_PROTO_NAME_IPV4;
284 break;
285 case '6':
286 protocols = INET_PROTO_NAME_IPV6;
287 break;
288 case 'c':
289 count_deliveries++;
290 break;
291 case 'v':
292 msg_verbose++;
293 break;
294 case 'x':
295 if ((ttl = atoi(optarg)) <= 0)
296 usage(argv[0]);
297 event_request_timer(terminate, (void *) 0, ttl);
298 break;
299 default:
300 usage(argv[0]);
301 }
302 }
303 if (argc - optind != 2)
304 usage(argv[0]);
305 if ((backlog = atoi(argv[optind + 1])) <= 0)
306 usage(argv[0]);
307
308 /*
309 * Initialize.
310 */
311 (void) inet_proto_init("protocols", protocols);
312 buffer = vstring_alloc(1024);
313 if (strncmp(argv[optind], "unix:", 5) == 0) {
314 sock = unix_listen(argv[optind] + 5, backlog, BLOCKING);
315 } else {
316 if (strncmp(argv[optind], "inet:", 5) == 0)
317 argv[optind] += 5;
318 sock = inet_listen(argv[optind], backlog, BLOCKING);
319 }
320
321 /*
322 * Start the event handler.
323 */
324 event_enable_read(sock, connect_event, CAST_INT_TO_VOID_PTR(sock));
325 for (;;)
326 event_loop(-1);
327 }
328