xref: /openbsd-src/usr.sbin/smtpd/queue.c (revision 43003dfe3ad45d1698bed8a37f2b0f5b14f20d4f)
1 /*	$OpenBSD: queue.c,v 1.70 2009/09/15 16:50:06 jacekm Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 #include <sys/types.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/param.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <libgen.h>
31 #include <pwd.h>
32 #include <signal.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <unistd.h>
37 
38 #include "smtpd.h"
39 
40 __dead void	queue_shutdown(void);
41 void		queue_sig_handler(int, short, void *);
42 void		queue_dispatch_control(int, short, void *);
43 void		queue_dispatch_smtp(int, short, void *);
44 void		queue_dispatch_mda(int, short, void *);
45 void		queue_dispatch_mta(int, short, void *);
46 void		queue_dispatch_lka(int, short, void *);
47 void		queue_dispatch_runner(int, short, void *);
48 void		queue_setup_events(struct smtpd *);
49 void		queue_disable_events(struct smtpd *);
50 void		queue_purge(char *);
51 
52 int		queue_create_layout_message(char *, char *);
53 void		queue_delete_layout_message(char *, char *);
54 int		queue_record_layout_envelope(char *, struct message *);
55 int		queue_remove_layout_envelope(char *, struct message *);
56 int		queue_commit_layout_message(char *, struct message *);
57 int		queue_open_layout_messagefile(char *, struct message *);
58 
59 void
60 queue_sig_handler(int sig, short event, void *p)
61 {
62 	switch (sig) {
63 	case SIGINT:
64 	case SIGTERM:
65 		queue_shutdown();
66 		break;
67 	default:
68 		fatalx("queue_sig_handler: unexpected signal");
69 	}
70 }
71 
72 void
73 queue_dispatch_control(int sig, short event, void *p)
74 {
75 	struct smtpd		*env = p;
76 	struct imsgev		*iev;
77 	struct imsgbuf		*ibuf;
78 	struct imsg		 imsg;
79 	ssize_t			 n;
80 
81 	iev = env->sc_ievs[PROC_CONTROL];
82 	ibuf = &iev->ibuf;
83 
84 	if (event & EV_READ) {
85 		if ((n = imsg_read(ibuf)) == -1)
86 			fatal("imsg_read_error");
87 		if (n == 0) {
88 			/* this pipe is dead, so remove the event handler */
89 			event_del(&iev->ev);
90 			event_loopexit(NULL);
91 			return;
92 		}
93 	}
94 
95 	if (event & EV_WRITE) {
96 		if (msgbuf_write(&ibuf->w) == -1)
97 			fatal("msgbuf_write");
98 	}
99 
100 	for (;;) {
101 		if ((n = imsg_get(ibuf, &imsg)) == -1)
102 			fatal("queue_dispatch_control: imsg_get error");
103 		if (n == 0)
104 			break;
105 
106 		switch (imsg.hdr.type) {
107 		default:
108 			log_warnx("queue_dispatch_control: got imsg %d",
109 			    imsg.hdr.type);
110 			fatalx("queue_dispatch_control: unexpected imsg");
111 		}
112 		imsg_free(&imsg);
113 	}
114 	imsg_event_add(iev);
115 }
116 
117 void
118 queue_dispatch_smtp(int sig, short event, void *p)
119 {
120 	struct smtpd		*env = p;
121 	struct imsgev		*iev;
122 	struct imsgbuf		*ibuf;
123 	struct imsg		 imsg;
124 	ssize_t			 n;
125 
126 	iev = env->sc_ievs[PROC_SMTP];
127 	ibuf = &iev->ibuf;
128 
129 	if (event & EV_READ) {
130 		if ((n = imsg_read(ibuf)) == -1)
131 			fatal("imsg_read_error");
132 		if (n == 0) {
133 			/* this pipe is dead, so remove the event handler */
134 			event_del(&iev->ev);
135 			event_loopexit(NULL);
136 			return;
137 		}
138 	}
139 
140 	if (event & EV_WRITE) {
141 		if (msgbuf_write(&ibuf->w) == -1)
142 			fatal("msgbuf_write");
143 	}
144 
145 	for (;;) {
146 		if ((n = imsg_get(ibuf, &imsg)) == -1)
147 			fatal("queue_dispatch_smtp: imsg_get error");
148 		if (n == 0)
149 			break;
150 
151 		switch (imsg.hdr.type) {
152 		case IMSG_QUEUE_CREATE_MESSAGE: {
153 			struct message		*messagep = imsg.data;
154 			struct submit_status	 ss;
155 			int			(*f)(char *);
156 
157 			log_debug("queue_dispatch_smtp: creating message file");
158 
159 			IMSG_SIZE_CHECK(messagep);
160 
161 			ss.id = messagep->session_id;
162 			ss.code = 250;
163 			bzero(ss.u.msgid, MAX_ID_SIZE);
164 
165 			if (messagep->flags & F_MESSAGE_ENQUEUED)
166 				f = enqueue_create_layout;
167 			else
168 				f = queue_create_incoming_layout;
169 
170 			if (! f(ss.u.msgid))
171 				ss.code = 421;
172 
173 			imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1,
174 			    &ss, sizeof(ss));
175 			break;
176 		}
177 		case IMSG_QUEUE_REMOVE_MESSAGE: {
178 			struct message		*messagep = imsg.data;
179 			void			(*f)(char *);
180 
181 			IMSG_SIZE_CHECK(messagep);
182 
183 			if (messagep->flags & F_MESSAGE_ENQUEUED)
184 				f = enqueue_delete_message;
185 			else
186 				f = queue_delete_incoming_message;
187 
188 			f(messagep->message_id);
189 
190 			break;
191 		}
192 		case IMSG_QUEUE_COMMIT_MESSAGE: {
193 			struct message		*messagep = imsg.data;
194 			struct submit_status	 ss;
195 			size_t			*counter;
196 			int			(*f)(struct message *);
197 
198 			IMSG_SIZE_CHECK(messagep);
199 
200 			ss.id = messagep->session_id;
201 
202 			if (messagep->flags & F_MESSAGE_ENQUEUED) {
203 				f = enqueue_commit_message;
204 				counter = &env->stats->queue.inserts_local;
205 			} else {
206 				f = queue_commit_incoming_message;
207 				counter = &env->stats->queue.inserts_remote;
208 			}
209 
210 			if (f(messagep))
211 				(*counter)++;
212 			else
213 				ss.code = 421;
214 
215 			imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
216 			    &ss, sizeof(ss));
217 
218 			break;
219 		}
220 		case IMSG_QUEUE_MESSAGE_FILE: {
221 			struct message		*messagep = imsg.data;
222 			struct submit_status	 ss;
223 			int fd;
224 			int			(*f)(struct message *);
225 
226 			IMSG_SIZE_CHECK(messagep);
227 
228 			ss.id = messagep->session_id;
229 
230 			if (messagep->flags & F_MESSAGE_ENQUEUED)
231 				f = enqueue_open_messagefile;
232 			else
233 				f = queue_open_incoming_message_file;
234 
235 			fd = f(messagep);
236 			if (fd == -1)
237 				ss.code = 421;
238 
239 			imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd,
240 			    &ss, sizeof(ss));
241 			break;
242 		}
243 		default:
244 			log_warnx("queue_dispatch_smtp: got imsg %d",
245 			    imsg.hdr.type);
246 			fatalx("queue_dispatch_smtp: unexpected imsg");
247 		}
248 		imsg_free(&imsg);
249 	}
250 	imsg_event_add(iev);
251 }
252 
253 void
254 queue_dispatch_mda(int sig, short event, void *p)
255 {
256 	struct smtpd		*env = p;
257 	struct imsgev		*iev;
258 	struct imsgbuf		*ibuf;
259 	struct imsg		 imsg;
260 	ssize_t			 n;
261 
262 	iev = env->sc_ievs[PROC_MDA];
263 	ibuf = &iev->ibuf;
264 
265 	if (event & EV_READ) {
266 		if ((n = imsg_read(ibuf)) == -1)
267 			fatal("imsg_read_error");
268 		if (n == 0) {
269 			/* this pipe is dead, so remove the event handler */
270 			event_del(&iev->ev);
271 			event_loopexit(NULL);
272 			return;
273 		}
274 	}
275 
276 	if (event & EV_WRITE) {
277 		if (msgbuf_write(&ibuf->w) == -1)
278 			fatal("msgbuf_write");
279 	}
280 
281 	for (;;) {
282 		if ((n = imsg_get(ibuf, &imsg)) == -1)
283 			fatal("queue_dispatch_mda: imsg_get error");
284 		if (n == 0)
285 			break;
286 
287 		switch (imsg.hdr.type) {
288 
289 		case IMSG_QUEUE_MESSAGE_UPDATE: {
290 			imsg_compose_event(env->sc_ievs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE,
291 			    0, 0, -1, imsg.data, sizeof(struct message));
292 			break;
293 		}
294 
295 		default:
296 			log_warnx("got imsg %d", imsg.hdr.type);
297 			fatalx("queue_dispatch_mda: unexpected imsg");
298 		}
299 		imsg_free(&imsg);
300 	}
301 	imsg_event_add(iev);
302 }
303 
304 void
305 queue_dispatch_mta(int sig, short event, void *p)
306 {
307 	struct smtpd		*env = p;
308 	struct imsgev		*iev;
309 	struct imsgbuf		*ibuf;
310 	struct imsg		 imsg;
311 	ssize_t			 n;
312 
313 	iev = env->sc_ievs[PROC_MTA];
314 	ibuf = &iev->ibuf;
315 
316 	if (event & EV_READ) {
317 		if ((n = imsg_read(ibuf)) == -1)
318 			fatal("imsg_read_error");
319 		if (n == 0) {
320 			/* this pipe is dead, so remove the event handler */
321 			event_del(&iev->ev);
322 			event_loopexit(NULL);
323 			return;
324 		}
325 	}
326 
327 	if (event & EV_WRITE) {
328 		if (msgbuf_write(&ibuf->w) == -1)
329 			fatal("msgbuf_write");
330 	}
331 
332 	for (;;) {
333 		if ((n = imsg_get(ibuf, &imsg)) == -1)
334 			fatal("queue_dispatch_mta: imsg_get error");
335 		if (n == 0)
336 			break;
337 
338 		switch (imsg.hdr.type) {
339 
340 		case IMSG_QUEUE_MESSAGE_FD: {
341 			struct batch *batchp = imsg.data;
342 			int fd;
343 
344 			IMSG_SIZE_CHECK(batchp);
345 
346 			fd = queue_open_message_file(batchp->message_id);
347 			imsg_compose_event(iev,  IMSG_QUEUE_MESSAGE_FD, 0, 0, fd, batchp,
348 			    sizeof(*batchp));
349 			break;
350 		}
351 
352 		case IMSG_QUEUE_MESSAGE_UPDATE: {
353 			imsg_compose_event(env->sc_ievs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE,
354 			    0, 0, -1, imsg.data, sizeof(struct message));
355 			break;
356 		}
357 
358 		default:
359 			log_warnx("got imsg %d", imsg.hdr.type);
360 			fatalx("queue_dispatch_mda: unexpected imsg");
361 		}
362 		imsg_free(&imsg);
363 	}
364 	imsg_event_add(iev);
365 }
366 
367 void
368 queue_dispatch_lka(int sig, short event, void *p)
369 {
370 	struct smtpd		*env = p;
371 	struct imsgev		*iev;
372 	struct imsgbuf		*ibuf;
373 	struct imsg		 imsg;
374 	ssize_t			 n;
375 
376 	iev = env->sc_ievs[PROC_LKA];
377 	ibuf = &iev->ibuf;
378 
379 	if (event & EV_READ) {
380 		if ((n = imsg_read(ibuf)) == -1)
381 			fatal("imsg_read_error");
382 		if (n == 0) {
383 			/* this pipe is dead, so remove the event handler */
384 			event_del(&iev->ev);
385 			event_loopexit(NULL);
386 			return;
387 		}
388 	}
389 
390 	if (event & EV_WRITE) {
391 		if (msgbuf_write(&ibuf->w) == -1)
392 			fatal("msgbuf_write");
393 	}
394 
395 	for (;;) {
396 		if ((n = imsg_get(ibuf, &imsg)) == -1)
397 			fatal("queue_dispatch_lka: imsg_get error");
398 		if (n == 0)
399 			break;
400 
401 		switch (imsg.hdr.type) {
402 
403 		case IMSG_QUEUE_SUBMIT_ENVELOPE: {
404 			struct message		*messagep = imsg.data;
405 			struct submit_status	 ss;
406 			int (*f)(struct message *);
407 
408 			IMSG_SIZE_CHECK(messagep);
409 
410 			messagep->id = queue_generate_id();
411 			ss.id = messagep->session_id;
412 
413 			if (IS_MAILBOX(messagep->recipient.rule.r_action) ||
414 			    IS_EXT(messagep->recipient.rule.r_action))
415 				messagep->type = T_MDA_MESSAGE;
416 			else
417 				messagep->type = T_MTA_MESSAGE;
418 
419 			/* Write to disk */
420 			if (messagep->flags & F_MESSAGE_ENQUEUED)
421 				f = enqueue_record_envelope;
422 			else
423 				f = queue_record_incoming_envelope;
424 
425 			if (! f(messagep)) {
426 				ss.code = 421;
427 				imsg_compose_event(env->sc_ievs[PROC_SMTP],
428 				    IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss,
429 				    sizeof(ss));
430 			}
431 
432 			break;
433 		}
434 
435 		case IMSG_QUEUE_COMMIT_ENVELOPES: {
436 			struct message		*messagep = imsg.data;
437 			struct submit_status	 ss;
438 
439 			IMSG_SIZE_CHECK(messagep);
440 
441 			ss.id = messagep->session_id;
442 			ss.code = 250;
443 
444 			imsg_compose_event(env->sc_ievs[PROC_SMTP], IMSG_QUEUE_COMMIT_ENVELOPES,
445 			    0, 0, -1, &ss, sizeof(ss));
446 
447 			break;
448 		}
449 
450 		default:
451 			log_warnx("got imsg %d", imsg.hdr.type);
452 			fatalx("queue_dispatch_lka: unexpected imsg");
453 		}
454 		imsg_free(&imsg);
455 	}
456 	imsg_event_add(iev);
457 }
458 
459 void
460 queue_dispatch_runner(int sig, short event, void *p)
461 {
462 	struct smtpd		*env = p;
463 	struct imsgev		*iev;
464 	struct imsgbuf		*ibuf;
465 	struct imsg		 imsg;
466 	ssize_t			 n;
467 
468 	iev = env->sc_ievs[PROC_RUNNER];
469 	ibuf = &iev->ibuf;
470 
471 	if (event & EV_READ) {
472 		if ((n = imsg_read(ibuf)) == -1)
473 			fatal("imsg_read_error");
474 		if (n == 0) {
475 			/* this pipe is dead, so remove the event handler */
476 			event_del(&iev->ev);
477 			event_loopexit(NULL);
478 			return;
479 		}
480 	}
481 
482 	if (event & EV_WRITE) {
483 		if (msgbuf_write(&ibuf->w) == -1)
484 			fatal("msgbuf_write");
485 	}
486 
487 	for (;;) {
488 		if ((n = imsg_get(ibuf, &imsg)) == -1)
489 			fatal("queue_dispatch_runner: imsg_get error");
490 		if (n == 0)
491 			break;
492 
493 		switch (imsg.hdr.type) {
494 		default:
495 			log_warnx("got imsg %d", imsg.hdr.type);
496 			fatalx("queue_dispatch_runner: unexpected imsg");
497 		}
498 		imsg_free(&imsg);
499 	}
500 	imsg_event_add(iev);
501 }
502 
503 void
504 queue_shutdown(void)
505 {
506 	log_info("queue handler");
507 	_exit(0);
508 }
509 
510 void
511 queue_setup_events(struct smtpd *env)
512 {
513 }
514 
515 void
516 queue_disable_events(struct smtpd *env)
517 {
518 }
519 
520 pid_t
521 queue(struct smtpd *env)
522 {
523 	pid_t		 pid;
524 	struct passwd	*pw;
525 
526 	struct event	 ev_sigint;
527 	struct event	 ev_sigterm;
528 
529 	struct peer peers[] = {
530 		{ PROC_CONTROL,	queue_dispatch_control },
531 		{ PROC_SMTP,	queue_dispatch_smtp },
532 		{ PROC_MDA,	queue_dispatch_mda },
533 		{ PROC_MTA,	queue_dispatch_mta },
534 		{ PROC_LKA,	queue_dispatch_lka },
535 		{ PROC_RUNNER,	queue_dispatch_runner }
536 	};
537 
538 	switch (pid = fork()) {
539 	case -1:
540 		fatal("queue: cannot fork");
541 	case 0:
542 		break;
543 	default:
544 		return (pid);
545 	}
546 
547 	purge_config(env, PURGE_EVERYTHING);
548 
549 	pw = env->sc_pw;
550 
551 #ifndef DEBUG
552 	if (chroot(PATH_SPOOL) == -1)
553 		fatal("queue: chroot");
554 	if (chdir("/") == -1)
555 		fatal("queue: chdir(\"/\")");
556 #else
557 #warning disabling privilege revocation and chroot in DEBUG MODE
558 #endif
559 
560 	smtpd_process = PROC_QUEUE;
561 	setproctitle("%s", env->sc_title[smtpd_process]);
562 
563 #ifndef DEBUG
564 	if (setgroups(1, &pw->pw_gid) ||
565 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
566 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
567 		fatal("queue: cannot drop privileges");
568 #endif
569 
570 	event_init();
571 
572 	signal_set(&ev_sigint, SIGINT, queue_sig_handler, env);
573 	signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, env);
574 	signal_add(&ev_sigint, NULL);
575 	signal_add(&ev_sigterm, NULL);
576 	signal(SIGPIPE, SIG_IGN);
577 	signal(SIGHUP, SIG_IGN);
578 
579 	config_pipes(env, peers, nitems(peers));
580 	config_peers(env, peers, nitems(peers));
581 
582 	queue_purge(PATH_INCOMING);
583 	queue_purge(PATH_ENQUEUE);
584 
585 	queue_setup_events(env);
586 	event_dispatch();
587 	queue_shutdown();
588 
589 	return (0);
590 }
591 
592 struct batch *
593 batch_by_id(struct smtpd *env, u_int64_t id)
594 {
595 	struct batch lookup;
596 
597 	lookup.id = id;
598 	return SPLAY_FIND(batchtree, &env->batch_queue, &lookup);
599 }
600 
601 
602 void
603 queue_purge(char *queuepath)
604 {
605 	char		 path[MAXPATHLEN];
606 	struct qwalk	*q;
607 
608 	q = qwalk_new(queuepath);
609 
610 	while (qwalk(q, path))
611 		queue_delete_layout_message(queuepath, basename(path));
612 
613 	qwalk_close(q);
614 }
615