xref: /openbsd-src/usr.sbin/smtpd/queue.c (revision 2b0358df1d88d06ef4139321dd05bd5e05d91eaf)
1 /*	$OpenBSD: queue.c,v 1.58 2009/03/29 14:18:20 jacekm Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 #include <sys/types.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/param.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <libgen.h>
31 #include <pwd.h>
32 #include <signal.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <unistd.h>
37 
38 #include "smtpd.h"
39 
40 __dead void	queue_shutdown(void);
41 void		queue_sig_handler(int, short, void *);
42 void		queue_dispatch_control(int, short, void *);
43 void		queue_dispatch_smtp(int, short, void *);
44 void		queue_dispatch_mda(int, short, void *);
45 void		queue_dispatch_mta(int, short, void *);
46 void		queue_dispatch_lka(int, short, void *);
47 void		queue_dispatch_runner(int, short, void *);
48 void		queue_setup_events(struct smtpd *);
49 void		queue_disable_events(struct smtpd *);
50 void		queue_purge(char *);
51 
52 int		queue_create_layout_message(char *, char *);
53 void		queue_delete_layout_message(char *, char *);
54 int		queue_record_layout_envelope(char *, struct message *);
55 int		queue_remove_layout_envelope(char *, struct message *);
56 int		queue_commit_layout_message(char *, struct message *);
57 int		queue_open_layout_messagefile(char *, struct message *);
58 
59 struct s_queue	s_queue;
60 
61 void
62 queue_sig_handler(int sig, short event, void *p)
63 {
64 	switch (sig) {
65 	case SIGINT:
66 	case SIGTERM:
67 		queue_shutdown();
68 		break;
69 	default:
70 		fatalx("queue_sig_handler: unexpected signal");
71 	}
72 }
73 
74 void
75 queue_dispatch_control(int sig, short event, void *p)
76 {
77 	struct smtpd		*env = p;
78 	struct imsgbuf		*ibuf;
79 	struct imsg		 imsg;
80 	ssize_t			 n;
81 
82 	ibuf = env->sc_ibufs[PROC_CONTROL];
83 	switch (event) {
84 	case EV_READ:
85 		if ((n = imsg_read(ibuf)) == -1)
86 			fatal("imsg_read_error");
87 		if (n == 0) {
88 			/* this pipe is dead, so remove the event handler */
89 			event_del(&ibuf->ev);
90 			event_loopexit(NULL);
91 			return;
92 		}
93 		break;
94 	case EV_WRITE:
95 		if (msgbuf_write(&ibuf->w) == -1)
96 			fatal("msgbuf_write");
97 		imsg_event_add(ibuf);
98 		return;
99 	default:
100 		fatalx("unknown event");
101 	}
102 
103 	for (;;) {
104 		if ((n = imsg_get(ibuf, &imsg)) == -1)
105 			fatal("queue_dispatch_control: imsg_read error");
106 		if (n == 0)
107 			break;
108 
109 		switch (imsg.hdr.type) {
110 		case IMSG_QUEUE_CREATE_MESSAGE: {
111 			struct message		*messagep;
112 			struct submit_status	 ss;
113 
114 			log_debug("queue_dispatch_control: creating message file");
115 			messagep = imsg.data;
116 
117 			ss.id = messagep->session_id;
118 			ss.code = 250;
119 			bzero(ss.u.msgid, MAX_ID_SIZE);
120 
121 			if (! enqueue_create_layout(ss.u.msgid))
122 				ss.code = 421;
123 
124 			imsg_compose(ibuf, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1,
125 			    &ss, sizeof(ss));
126 
127 			break;
128 		}
129 		case IMSG_QUEUE_MESSAGE_FILE: {
130 			int fd;
131 			struct submit_status ss;
132 			struct message *messagep;
133 
134 			messagep = imsg.data;
135 			ss.msg = *messagep;
136 			ss.id = messagep->session_id;
137 			ss.code = 250;
138 			fd = enqueue_open_messagefile(messagep);
139 			if (fd == -1)
140 				ss.code = 421;
141 
142 			imsg_compose(ibuf, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, &ss,
143 			    sizeof(ss));
144 
145 			break;
146 		}
147 		case IMSG_QUEUE_COMMIT_MESSAGE: {
148 			struct message		*messagep;
149 			struct submit_status	 ss;
150 
151 			messagep = imsg.data;
152 			ss.id = messagep->session_id;
153 
154 			ss.code = 250;
155 			if (enqueue_commit_message(messagep))
156 				s_queue.inserts_local++;
157 			else
158 				ss.code = 421;
159 
160 			imsg_compose(ibuf, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
161 			    &ss, sizeof(ss));
162 
163 			break;
164 		}
165 		case IMSG_STATS: {
166 			struct stats *s;
167 
168 			s = imsg.data;
169 			s->u.queue = s_queue;
170 			imsg_compose(ibuf, IMSG_STATS, 0, 0, -1, s, sizeof(*s));
171 			break;
172 		}
173 		default:
174 			log_warnx("queue_dispatch_control: got imsg %d",
175 			    imsg.hdr.type);
176 			fatalx("queue_dispatch_control: unexpected imsg");
177 		}
178 		imsg_free(&imsg);
179 	}
180 	imsg_event_add(ibuf);
181 }
182 
183 void
184 queue_dispatch_smtp(int sig, short event, void *p)
185 {
186 	struct smtpd		*env = p;
187 	struct imsgbuf		*ibuf;
188 	struct imsg		 imsg;
189 	ssize_t			 n;
190 
191 	ibuf = env->sc_ibufs[PROC_SMTP];
192 	switch (event) {
193 	case EV_READ:
194 		if ((n = imsg_read(ibuf)) == -1)
195 			fatal("imsg_read_error");
196 		if (n == 0) {
197 			/* this pipe is dead, so remove the event handler */
198 			event_del(&ibuf->ev);
199 			event_loopexit(NULL);
200 			return;
201 		}
202 		break;
203 	case EV_WRITE:
204 		if (msgbuf_write(&ibuf->w) == -1)
205 			fatal("msgbuf_write");
206 		imsg_event_add(ibuf);
207 		return;
208 	default:
209 		fatalx("unknown event");
210 	}
211 
212 	for (;;) {
213 		if ((n = imsg_get(ibuf, &imsg)) == -1)
214 			fatal("queue_dispatch_smtp: imsg_read error");
215 		if (n == 0)
216 			break;
217 
218 		switch (imsg.hdr.type) {
219 		case IMSG_QUEUE_CREATE_MESSAGE: {
220 			struct message		*messagep;
221 			struct submit_status	 ss;
222 
223 			log_debug("queue_dispatch_smtp: creating message file");
224 			messagep = imsg.data;
225 			ss.id = messagep->session_id;
226 			ss.code = 250;
227 			bzero(ss.u.msgid, MAX_ID_SIZE);
228 
229 			if (! queue_create_incoming_layout(ss.u.msgid))
230 				ss.code = 421;
231 
232 			imsg_compose(ibuf, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1,
233 			    &ss, sizeof(ss));
234 			break;
235 		}
236 		case IMSG_QUEUE_REMOVE_MESSAGE: {
237 			struct message		*messagep;
238 
239 			messagep = imsg.data;
240 			queue_delete_incoming_message(messagep->message_id);
241 			break;
242 		}
243 		case IMSG_QUEUE_COMMIT_MESSAGE: {
244 			struct message		*messagep;
245 			struct submit_status	 ss;
246 
247 			messagep = imsg.data;
248 			ss.id = messagep->session_id;
249 
250 			if (queue_commit_incoming_message(messagep))
251 				s_queue.inserts_remote++;
252 			else
253 				ss.code = 421;
254 
255 			imsg_compose(ibuf, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
256 			    &ss, sizeof(ss));
257 
258 			break;
259 		}
260 		case IMSG_QUEUE_MESSAGE_FILE: {
261 			struct message		*messagep;
262 			struct submit_status	 ss;
263 			int fd;
264 
265 			messagep = imsg.data;
266 			ss.id = messagep->session_id;
267 
268 			fd = queue_open_incoming_message_file(messagep);
269 			if (fd == -1)
270 				ss.code = 421;
271 
272 			imsg_compose(ibuf, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd,
273 			    &ss, sizeof(ss));
274 			break;
275 		}
276 		default:
277 			log_warnx("queue_dispatch_smtp: got imsg %d",
278 			    imsg.hdr.type);
279 			fatalx("queue_dispatch_smtp: unexpected imsg");
280 		}
281 		imsg_free(&imsg);
282 	}
283 	imsg_event_add(ibuf);
284 }
285 
286 void
287 queue_dispatch_mda(int sig, short event, void *p)
288 {
289 	struct smtpd		*env = p;
290 	struct imsgbuf		*ibuf;
291 	struct imsg		 imsg;
292 	ssize_t			 n;
293 
294 	ibuf = env->sc_ibufs[PROC_MDA];
295 	switch (event) {
296 	case EV_READ:
297 		if ((n = imsg_read(ibuf)) == -1)
298 			fatal("imsg_read_error");
299 		if (n == 0) {
300 			/* this pipe is dead, so remove the event handler */
301 			event_del(&ibuf->ev);
302 			event_loopexit(NULL);
303 			return;
304 		}
305 		break;
306 	case EV_WRITE:
307 		if (msgbuf_write(&ibuf->w) == -1)
308 			fatal("msgbuf_write");
309 		imsg_event_add(ibuf);
310 		return;
311 	default:
312 		fatalx("unknown event");
313 	}
314 
315 	for (;;) {
316 		if ((n = imsg_get(ibuf, &imsg)) == -1)
317 			fatal("queue_dispatch_mda: imsg_read error");
318 		if (n == 0)
319 			break;
320 
321 		switch (imsg.hdr.type) {
322 
323 		case IMSG_QUEUE_MESSAGE_UPDATE: {
324 			imsg_compose(env->sc_ibufs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE,
325 			    0, 0, -1, imsg.data, sizeof(struct message));
326 			break;
327 		}
328 
329 		default:
330 			log_warnx("got imsg %d", imsg.hdr.type);
331 			fatalx("queue_dispatch_mda: unexpected imsg");
332 		}
333 		imsg_free(&imsg);
334 	}
335 	imsg_event_add(ibuf);
336 }
337 
338 void
339 queue_dispatch_mta(int sig, short event, void *p)
340 {
341 	struct smtpd		*env = p;
342 	struct imsgbuf		*ibuf;
343 	struct imsg		 imsg;
344 	ssize_t			 n;
345 
346 	ibuf = env->sc_ibufs[PROC_MTA];
347 	switch (event) {
348 	case EV_READ:
349 		if ((n = imsg_read(ibuf)) == -1)
350 			fatal("imsg_read_error");
351 		if (n == 0) {
352 			/* this pipe is dead, so remove the event handler */
353 			event_del(&ibuf->ev);
354 			event_loopexit(NULL);
355 			return;
356 		}
357 		break;
358 	case EV_WRITE:
359 		if (msgbuf_write(&ibuf->w) == -1)
360 			fatal("msgbuf_write");
361 		imsg_event_add(ibuf);
362 		return;
363 	default:
364 		fatalx("unknown event");
365 	}
366 
367 	for (;;) {
368 		if ((n = imsg_get(ibuf, &imsg)) == -1)
369 			fatal("queue_dispatch_mda: imsg_read error");
370 		if (n == 0)
371 			break;
372 
373 		switch (imsg.hdr.type) {
374 
375 		case IMSG_QUEUE_MESSAGE_FD: {
376 			int fd;
377 			struct batch *batchp;
378 
379 			batchp = imsg.data;
380 			fd = queue_open_message_file(batchp->message_id);
381 			imsg_compose(ibuf,  IMSG_QUEUE_MESSAGE_FD, 0, 0, fd, batchp,
382 			    sizeof(*batchp));
383 			break;
384 		}
385 
386 		case IMSG_QUEUE_MESSAGE_UPDATE: {
387 			imsg_compose(env->sc_ibufs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE,
388 			    0, 0, -1, imsg.data, sizeof(struct message));
389 			break;
390 		}
391 
392 		default:
393 			log_warnx("got imsg %d", imsg.hdr.type);
394 			fatalx("queue_dispatch_mda: unexpected imsg");
395 		}
396 		imsg_free(&imsg);
397 	}
398 	imsg_event_add(ibuf);
399 }
400 
401 void
402 queue_dispatch_lka(int sig, short event, void *p)
403 {
404 	struct smtpd		*env = p;
405 	struct imsgbuf		*ibuf;
406 	struct imsg		 imsg;
407 	ssize_t			 n;
408 
409 	ibuf = env->sc_ibufs[PROC_LKA];
410 	switch (event) {
411 	case EV_READ:
412 		if ((n = imsg_read(ibuf)) == -1)
413 			fatal("imsg_read_error");
414 		if (n == 0) {
415 			/* this pipe is dead, so remove the event handler */
416 			event_del(&ibuf->ev);
417 			event_loopexit(NULL);
418 			return;
419 		}
420 		break;
421 	case EV_WRITE:
422 		if (msgbuf_write(&ibuf->w) == -1)
423 			fatal("msgbuf_write");
424 		imsg_event_add(ibuf);
425 		return;
426  	default:
427 		fatalx("unknown event");
428 	}
429 
430 	for (;;) {
431 		if ((n = imsg_get(ibuf, &imsg)) == -1)
432 			fatal("queue_dispatch_lka: imsg_read error");
433 		if (n == 0)
434 			break;
435 
436 		switch (imsg.hdr.type) {
437 
438 		case IMSG_QUEUE_SUBMIT_ENVELOPE: {
439 			struct message		*messagep;
440 			struct submit_status	 ss;
441 			int (*f)(struct message *);
442 			enum smtp_proc_type peer;
443 
444 			messagep = imsg.data;
445 			messagep->id = queue_generate_id();
446 			ss.id = messagep->session_id;
447 
448 			if (IS_MAILBOX(messagep->recipient.rule.r_action) ||
449 			    IS_EXT(messagep->recipient.rule.r_action))
450 				messagep->type = T_MDA_MESSAGE;
451 			else
452 				messagep->type = T_MTA_MESSAGE;
453 
454 			/* Write to disk */
455 			if (messagep->flags & F_MESSAGE_ENQUEUED) {
456 				f = enqueue_record_envelope;
457 				peer = PROC_CONTROL;
458 			}
459 			else {
460 				f = queue_record_incoming_envelope;
461 				peer = PROC_SMTP;
462 			}
463 
464 			if (! f(messagep)) {
465 				ss.code = 421;
466 				imsg_compose(env->sc_ibufs[peer], IMSG_QUEUE_TEMPFAIL,
467 				    0, 0, -1, &ss, sizeof(ss));
468 			}
469 
470 			break;
471 		}
472 
473 		case IMSG_QUEUE_COMMIT_ENVELOPES: {
474 			struct message		*messagep;
475 			struct submit_status	 ss;
476 			enum smtp_proc_type	 peer;
477 
478 			messagep = imsg.data;
479 			ss.id = messagep->session_id;
480 			ss.code = 250;
481 
482 			if (messagep->flags & F_MESSAGE_ENQUEUED)
483 				peer = PROC_CONTROL;
484 			else
485 				peer = PROC_SMTP;
486 
487 			imsg_compose(env->sc_ibufs[peer], IMSG_QUEUE_COMMIT_ENVELOPES,
488 			    0, 0, -1, &ss, sizeof(ss));
489 
490 			break;
491 		}
492 
493 		default:
494 			log_warnx("got imsg %d", imsg.hdr.type);
495 			fatalx("queue_dispatch_lka: unexpected imsg");
496 		}
497 		imsg_free(&imsg);
498 	}
499 	imsg_event_add(ibuf);
500 }
501 
502 void
503 queue_dispatch_runner(int sig, short event, void *p)
504 {
505 	struct smtpd		*env = p;
506 	struct imsgbuf		*ibuf;
507 	struct imsg		 imsg;
508 	ssize_t			 n;
509 
510 	ibuf = env->sc_ibufs[PROC_RUNNER];
511 	switch (event) {
512 	case EV_READ:
513 		if ((n = imsg_read(ibuf)) == -1)
514 			fatal("imsg_read_error");
515 		if (n == 0) {
516 			/* this pipe is dead, so remove the event handler */
517 			event_del(&ibuf->ev);
518 			event_loopexit(NULL);
519 			return;
520 		}
521 		break;
522 	case EV_WRITE:
523 		if (msgbuf_write(&ibuf->w) == -1)
524 			fatal("msgbuf_write");
525 		imsg_event_add(ibuf);
526 		return;
527 	default:
528 		fatalx("unknown event");
529 	}
530 
531 	for (;;) {
532 		if ((n = imsg_get(ibuf, &imsg)) == -1)
533 			fatal("queue_dispatch_runner: imsg_read error");
534 		if (n == 0)
535 			break;
536 
537 		switch (imsg.hdr.type) {
538 		default:
539 			log_warnx("got imsg %d", imsg.hdr.type);
540 			fatalx("queue_dispatch_runner: unexpected imsg");
541 		}
542 		imsg_free(&imsg);
543 	}
544 	imsg_event_add(ibuf);
545 }
546 
547 void
548 queue_shutdown(void)
549 {
550 	log_info("queue handler");
551 	_exit(0);
552 }
553 
554 void
555 queue_setup_events(struct smtpd *env)
556 {
557 }
558 
559 void
560 queue_disable_events(struct smtpd *env)
561 {
562 }
563 
564 pid_t
565 queue(struct smtpd *env)
566 {
567 	pid_t		 pid;
568 	struct passwd	*pw;
569 
570 	struct event	 ev_sigint;
571 	struct event	 ev_sigterm;
572 
573 	struct peer peers[] = {
574 		{ PROC_CONTROL,	queue_dispatch_control },
575 		{ PROC_SMTP,	queue_dispatch_smtp },
576 		{ PROC_MDA,	queue_dispatch_mda },
577 		{ PROC_MTA,	queue_dispatch_mta },
578 		{ PROC_LKA,	queue_dispatch_lka },
579 		{ PROC_RUNNER,	queue_dispatch_runner }
580 	};
581 
582 	switch (pid = fork()) {
583 	case -1:
584 		fatal("queue: cannot fork");
585 	case 0:
586 		break;
587 	default:
588 		return (pid);
589 	}
590 
591 	purge_config(env, PURGE_EVERYTHING);
592 
593 	pw = env->sc_pw;
594 
595 #ifndef DEBUG
596 	if (chroot(PATH_SPOOL) == -1)
597 		fatal("queue: chroot");
598 	if (chdir("/") == -1)
599 		fatal("queue: chdir(\"/\")");
600 #else
601 #warning disabling privilege revocation and chroot in DEBUG MODE
602 #endif
603 
604 	setproctitle("queue handler");
605 	smtpd_process = PROC_QUEUE;
606 
607 #ifndef DEBUG
608 	if (setgroups(1, &pw->pw_gid) ||
609 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
610 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
611 		fatal("queue: cannot drop privileges");
612 #endif
613 
614 	event_init();
615 
616 	signal_set(&ev_sigint, SIGINT, queue_sig_handler, env);
617 	signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, env);
618 	signal_add(&ev_sigint, NULL);
619 	signal_add(&ev_sigterm, NULL);
620 	signal(SIGPIPE, SIG_IGN);
621 	signal(SIGHUP, SIG_IGN);
622 
623 	config_pipes(env, peers, 6);
624 	config_peers(env, peers, 6);
625 
626 	queue_purge(PATH_INCOMING);
627 	queue_purge(PATH_ENQUEUE);
628 
629 	queue_setup_events(env);
630 	event_dispatch();
631 	queue_shutdown();
632 
633 	return (0);
634 }
635 
636 int
637 queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct message *messagep)
638 {
639 	TAILQ_REMOVE(&batchp->messages, messagep, entry);
640 	bzero(messagep, sizeof(struct message));
641 	free(messagep);
642 
643 	if (TAILQ_FIRST(&batchp->messages) == NULL) {
644 		SPLAY_REMOVE(batchtree, &env->batch_queue, batchp);
645 		bzero(batchp, sizeof(struct batch));
646 		free(batchp);
647 		return 1;
648 	}
649 	return 0;
650 }
651 
652 struct batch *
653 batch_by_id(struct smtpd *env, u_int64_t id)
654 {
655 	struct batch lookup;
656 
657 	lookup.id = id;
658 	return SPLAY_FIND(batchtree, &env->batch_queue, &lookup);
659 }
660 
661 
662 struct message *
663 message_by_id(struct smtpd *env, struct batch *batchp, u_int64_t id)
664 {
665 	struct message *messagep;
666 
667 	if (batchp != NULL) {
668 		TAILQ_FOREACH(messagep, &batchp->messages, entry) {
669 			if (messagep->id == id)
670 				break;
671 		}
672 		return messagep;
673 	}
674 
675 	SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) {
676 		TAILQ_FOREACH(messagep, &batchp->messages, entry) {
677 			if (messagep->id == id)
678 				return messagep;
679 		}
680 	}
681 	return NULL;
682 }
683 
684 void
685 queue_purge(char *queuepath)
686 {
687 	char		 path[MAXPATHLEN];
688 	struct qwalk	*q;
689 
690 	q = qwalk_new(queuepath);
691 
692 	while (qwalk(q, path))
693 		queue_delete_layout_message(queuepath, basename(path));
694 
695 	qwalk_close(q);
696 }
697