xref: /openbsd-src/usr.sbin/smtpd/queue.c (revision a28daedfc357b214be5c701aa8ba8adb29a7f1c2)
1 /*	$OpenBSD: queue.c,v 1.61 2009/04/21 14:37:32 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 #include <sys/types.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/param.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <libgen.h>
31 #include <pwd.h>
32 #include <signal.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <unistd.h>
37 
38 #include "smtpd.h"
39 
40 __dead void	queue_shutdown(void);
41 void		queue_sig_handler(int, short, void *);
42 void		queue_dispatch_control(int, short, void *);
43 void		queue_dispatch_smtp(int, short, void *);
44 void		queue_dispatch_mda(int, short, void *);
45 void		queue_dispatch_mta(int, short, void *);
46 void		queue_dispatch_lka(int, short, void *);
47 void		queue_dispatch_runner(int, short, void *);
48 void		queue_setup_events(struct smtpd *);
49 void		queue_disable_events(struct smtpd *);
50 void		queue_purge(char *);
51 
52 int		queue_create_layout_message(char *, char *);
53 void		queue_delete_layout_message(char *, char *);
54 int		queue_record_layout_envelope(char *, struct message *);
55 int		queue_remove_layout_envelope(char *, struct message *);
56 int		queue_commit_layout_message(char *, struct message *);
57 int		queue_open_layout_messagefile(char *, struct message *);
58 
59 struct s_queue	s_queue;
60 
61 void
62 queue_sig_handler(int sig, short event, void *p)
63 {
64 	switch (sig) {
65 	case SIGINT:
66 	case SIGTERM:
67 		queue_shutdown();
68 		break;
69 	default:
70 		fatalx("queue_sig_handler: unexpected signal");
71 	}
72 }
73 
74 void
75 queue_dispatch_control(int sig, short event, void *p)
76 {
77 	struct smtpd		*env = p;
78 	struct imsgbuf		*ibuf;
79 	struct imsg		 imsg;
80 	ssize_t			 n;
81 
82 	ibuf = env->sc_ibufs[PROC_CONTROL];
83 	switch (event) {
84 	case EV_READ:
85 		if ((n = imsg_read(ibuf)) == -1)
86 			fatal("imsg_read_error");
87 		if (n == 0) {
88 			/* this pipe is dead, so remove the event handler */
89 			event_del(&ibuf->ev);
90 			event_loopexit(NULL);
91 			return;
92 		}
93 		break;
94 	case EV_WRITE:
95 		if (msgbuf_write(&ibuf->w) == -1)
96 			fatal("msgbuf_write");
97 		imsg_event_add(ibuf);
98 		return;
99 	default:
100 		fatalx("unknown event");
101 	}
102 
103 	for (;;) {
104 		if ((n = imsg_get(ibuf, &imsg)) == -1)
105 			fatalx("queue_dispatch_control: imsg_get error");
106 		if (n == 0)
107 			break;
108 
109 		switch (imsg.hdr.type) {
110 		case IMSG_STATS: {
111 			struct stats *s;
112 
113 			s = imsg.data;
114 			s->u.queue = s_queue;
115 			imsg_compose(ibuf, IMSG_STATS, 0, 0, -1, s, sizeof(*s));
116 			break;
117 		}
118 		default:
119 			log_warnx("queue_dispatch_control: got imsg %d",
120 			    imsg.hdr.type);
121 			fatalx("queue_dispatch_control: unexpected imsg");
122 		}
123 		imsg_free(&imsg);
124 	}
125 	imsg_event_add(ibuf);
126 }
127 
128 void
129 queue_dispatch_smtp(int sig, short event, void *p)
130 {
131 	struct smtpd		*env = p;
132 	struct imsgbuf		*ibuf;
133 	struct imsg		 imsg;
134 	ssize_t			 n;
135 
136 	ibuf = env->sc_ibufs[PROC_SMTP];
137 	switch (event) {
138 	case EV_READ:
139 		if ((n = imsg_read(ibuf)) == -1)
140 			fatal("imsg_read_error");
141 		if (n == 0) {
142 			/* this pipe is dead, so remove the event handler */
143 			event_del(&ibuf->ev);
144 			event_loopexit(NULL);
145 			return;
146 		}
147 		break;
148 	case EV_WRITE:
149 		if (msgbuf_write(&ibuf->w) == -1)
150 			fatal("msgbuf_write");
151 		imsg_event_add(ibuf);
152 		return;
153 	default:
154 		fatalx("unknown event");
155 	}
156 
157 	for (;;) {
158 		if ((n = imsg_get(ibuf, &imsg)) == -1)
159 			fatalx("queue_dispatch_smtp: imsg_get error");
160 		if (n == 0)
161 			break;
162 
163 		switch (imsg.hdr.type) {
164 		case IMSG_QUEUE_CREATE_MESSAGE: {
165 			struct message		*messagep;
166 			struct submit_status	 ss;
167 			int			(*f)(char *);
168 
169 			log_debug("queue_dispatch_smtp: creating message file");
170 			messagep = imsg.data;
171 			ss.id = messagep->session_id;
172 			ss.code = 250;
173 			bzero(ss.u.msgid, MAX_ID_SIZE);
174 
175 			if (messagep->flags & F_MESSAGE_ENQUEUED)
176 				f = enqueue_create_layout;
177 			else
178 				f = queue_create_incoming_layout;
179 
180 			if (! f(ss.u.msgid))
181 				ss.code = 421;
182 
183 			imsg_compose(ibuf, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1,
184 			    &ss, sizeof(ss));
185 			break;
186 		}
187 		case IMSG_QUEUE_REMOVE_MESSAGE: {
188 			struct message		*messagep;
189 			void			(*f)(char *);
190 
191 			messagep = imsg.data;
192 			if (messagep->flags & F_MESSAGE_ENQUEUED)
193 				f = enqueue_delete_message;
194 			else
195 				f = queue_delete_incoming_message;
196 
197 			f(messagep->message_id);
198 
199 			break;
200 		}
201 		case IMSG_QUEUE_COMMIT_MESSAGE: {
202 			struct message		*messagep;
203 			struct submit_status	 ss;
204 			size_t			*counter;
205 			int			(*f)(struct message *);
206 
207 			messagep = imsg.data;
208 			ss.id = messagep->session_id;
209 
210 			if (messagep->flags & F_MESSAGE_ENQUEUED) {
211 				f = enqueue_commit_message;
212 				counter = &s_queue.inserts_local;
213 			} else {
214 				f = queue_commit_incoming_message;
215 				counter = &s_queue.inserts_remote;
216 			}
217 
218 			if (f(messagep))
219 				(*counter)++;
220 			else
221 				ss.code = 421;
222 
223 			imsg_compose(ibuf, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
224 			    &ss, sizeof(ss));
225 
226 			break;
227 		}
228 		case IMSG_QUEUE_MESSAGE_FILE: {
229 			struct message		*messagep;
230 			struct submit_status	 ss;
231 			int fd;
232 			int			(*f)(struct message *);
233 
234 			messagep = imsg.data;
235 			ss.id = messagep->session_id;
236 
237 			if (messagep->flags & F_MESSAGE_ENQUEUED)
238 				f = enqueue_open_messagefile;
239 			else
240 				f = queue_open_incoming_message_file;
241 
242 			fd = f(messagep);
243 			if (fd == -1)
244 				ss.code = 421;
245 
246 			imsg_compose(ibuf, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd,
247 			    &ss, sizeof(ss));
248 			break;
249 		}
250 		default:
251 			log_warnx("queue_dispatch_smtp: got imsg %d",
252 			    imsg.hdr.type);
253 			fatalx("queue_dispatch_smtp: unexpected imsg");
254 		}
255 		imsg_free(&imsg);
256 	}
257 	imsg_event_add(ibuf);
258 }
259 
260 void
261 queue_dispatch_mda(int sig, short event, void *p)
262 {
263 	struct smtpd		*env = p;
264 	struct imsgbuf		*ibuf;
265 	struct imsg		 imsg;
266 	ssize_t			 n;
267 
268 	ibuf = env->sc_ibufs[PROC_MDA];
269 	switch (event) {
270 	case EV_READ:
271 		if ((n = imsg_read(ibuf)) == -1)
272 			fatal("imsg_read_error");
273 		if (n == 0) {
274 			/* this pipe is dead, so remove the event handler */
275 			event_del(&ibuf->ev);
276 			event_loopexit(NULL);
277 			return;
278 		}
279 		break;
280 	case EV_WRITE:
281 		if (msgbuf_write(&ibuf->w) == -1)
282 			fatal("msgbuf_write");
283 		imsg_event_add(ibuf);
284 		return;
285 	default:
286 		fatalx("unknown event");
287 	}
288 
289 	for (;;) {
290 		if ((n = imsg_get(ibuf, &imsg)) == -1)
291 			fatalx("queue_dispatch_mda: imsg_get error");
292 		if (n == 0)
293 			break;
294 
295 		switch (imsg.hdr.type) {
296 
297 		case IMSG_QUEUE_MESSAGE_UPDATE: {
298 			imsg_compose(env->sc_ibufs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE,
299 			    0, 0, -1, imsg.data, sizeof(struct message));
300 			break;
301 		}
302 
303 		default:
304 			log_warnx("got imsg %d", imsg.hdr.type);
305 			fatalx("queue_dispatch_mda: unexpected imsg");
306 		}
307 		imsg_free(&imsg);
308 	}
309 	imsg_event_add(ibuf);
310 }
311 
312 void
313 queue_dispatch_mta(int sig, short event, void *p)
314 {
315 	struct smtpd		*env = p;
316 	struct imsgbuf		*ibuf;
317 	struct imsg		 imsg;
318 	ssize_t			 n;
319 
320 	ibuf = env->sc_ibufs[PROC_MTA];
321 	switch (event) {
322 	case EV_READ:
323 		if ((n = imsg_read(ibuf)) == -1)
324 			fatal("imsg_read_error");
325 		if (n == 0) {
326 			/* this pipe is dead, so remove the event handler */
327 			event_del(&ibuf->ev);
328 			event_loopexit(NULL);
329 			return;
330 		}
331 		break;
332 	case EV_WRITE:
333 		if (msgbuf_write(&ibuf->w) == -1)
334 			fatal("msgbuf_write");
335 		imsg_event_add(ibuf);
336 		return;
337 	default:
338 		fatalx("unknown event");
339 	}
340 
341 	for (;;) {
342 		if ((n = imsg_get(ibuf, &imsg)) == -1)
343 			fatalx("queue_dispatch_mta: imsg_get error");
344 		if (n == 0)
345 			break;
346 
347 		switch (imsg.hdr.type) {
348 
349 		case IMSG_QUEUE_MESSAGE_FD: {
350 			int fd;
351 			struct batch *batchp;
352 
353 			batchp = imsg.data;
354 			fd = queue_open_message_file(batchp->message_id);
355 			imsg_compose(ibuf,  IMSG_QUEUE_MESSAGE_FD, 0, 0, fd, batchp,
356 			    sizeof(*batchp));
357 			break;
358 		}
359 
360 		case IMSG_QUEUE_MESSAGE_UPDATE: {
361 			imsg_compose(env->sc_ibufs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE,
362 			    0, 0, -1, imsg.data, sizeof(struct message));
363 			break;
364 		}
365 
366 		default:
367 			log_warnx("got imsg %d", imsg.hdr.type);
368 			fatalx("queue_dispatch_mda: unexpected imsg");
369 		}
370 		imsg_free(&imsg);
371 	}
372 	imsg_event_add(ibuf);
373 }
374 
375 void
376 queue_dispatch_lka(int sig, short event, void *p)
377 {
378 	struct smtpd		*env = p;
379 	struct imsgbuf		*ibuf;
380 	struct imsg		 imsg;
381 	ssize_t			 n;
382 
383 	ibuf = env->sc_ibufs[PROC_LKA];
384 	switch (event) {
385 	case EV_READ:
386 		if ((n = imsg_read(ibuf)) == -1)
387 			fatal("imsg_read_error");
388 		if (n == 0) {
389 			/* this pipe is dead, so remove the event handler */
390 			event_del(&ibuf->ev);
391 			event_loopexit(NULL);
392 			return;
393 		}
394 		break;
395 	case EV_WRITE:
396 		if (msgbuf_write(&ibuf->w) == -1)
397 			fatal("msgbuf_write");
398 		imsg_event_add(ibuf);
399 		return;
400  	default:
401 		fatalx("unknown event");
402 	}
403 
404 	for (;;) {
405 		if ((n = imsg_get(ibuf, &imsg)) == -1)
406 			fatalx("queue_dispatch_lka: imsg_get error");
407 		if (n == 0)
408 			break;
409 
410 		switch (imsg.hdr.type) {
411 
412 		case IMSG_QUEUE_SUBMIT_ENVELOPE: {
413 			struct message		*messagep;
414 			struct submit_status	 ss;
415 			int (*f)(struct message *);
416 
417 			messagep = imsg.data;
418 			messagep->id = queue_generate_id();
419 			ss.id = messagep->session_id;
420 
421 			if (IS_MAILBOX(messagep->recipient.rule.r_action) ||
422 			    IS_EXT(messagep->recipient.rule.r_action))
423 				messagep->type = T_MDA_MESSAGE;
424 			else
425 				messagep->type = T_MTA_MESSAGE;
426 
427 			/* Write to disk */
428 			if (messagep->flags & F_MESSAGE_ENQUEUED)
429 				f = enqueue_record_envelope;
430 			else
431 				f = queue_record_incoming_envelope;
432 
433 			if (! f(messagep)) {
434 				ss.code = 421;
435 				imsg_compose(env->sc_ibufs[PROC_SMTP],
436 				    IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss,
437 				    sizeof(ss));
438 			}
439 
440 			break;
441 		}
442 
443 		case IMSG_QUEUE_COMMIT_ENVELOPES: {
444 			struct message		*messagep;
445 			struct submit_status	 ss;
446 
447 			messagep = imsg.data;
448 			ss.id = messagep->session_id;
449 			ss.code = 250;
450 
451 			imsg_compose(env->sc_ibufs[PROC_SMTP], IMSG_QUEUE_COMMIT_ENVELOPES,
452 			    0, 0, -1, &ss, sizeof(ss));
453 
454 			break;
455 		}
456 
457 		default:
458 			log_warnx("got imsg %d", imsg.hdr.type);
459 			fatalx("queue_dispatch_lka: unexpected imsg");
460 		}
461 		imsg_free(&imsg);
462 	}
463 	imsg_event_add(ibuf);
464 }
465 
466 void
467 queue_dispatch_runner(int sig, short event, void *p)
468 {
469 	struct smtpd		*env = p;
470 	struct imsgbuf		*ibuf;
471 	struct imsg		 imsg;
472 	ssize_t			 n;
473 
474 	ibuf = env->sc_ibufs[PROC_RUNNER];
475 	switch (event) {
476 	case EV_READ:
477 		if ((n = imsg_read(ibuf)) == -1)
478 			fatal("imsg_read_error");
479 		if (n == 0) {
480 			/* this pipe is dead, so remove the event handler */
481 			event_del(&ibuf->ev);
482 			event_loopexit(NULL);
483 			return;
484 		}
485 		break;
486 	case EV_WRITE:
487 		if (msgbuf_write(&ibuf->w) == -1)
488 			fatal("msgbuf_write");
489 		imsg_event_add(ibuf);
490 		return;
491 	default:
492 		fatalx("unknown event");
493 	}
494 
495 	for (;;) {
496 		if ((n = imsg_get(ibuf, &imsg)) == -1)
497 			fatalx("queue_dispatch_runner: imsg_get error");
498 		if (n == 0)
499 			break;
500 
501 		switch (imsg.hdr.type) {
502 		default:
503 			log_warnx("got imsg %d", imsg.hdr.type);
504 			fatalx("queue_dispatch_runner: unexpected imsg");
505 		}
506 		imsg_free(&imsg);
507 	}
508 	imsg_event_add(ibuf);
509 }
510 
511 void
512 queue_shutdown(void)
513 {
514 	log_info("queue handler");
515 	_exit(0);
516 }
517 
518 void
519 queue_setup_events(struct smtpd *env)
520 {
521 }
522 
523 void
524 queue_disable_events(struct smtpd *env)
525 {
526 }
527 
528 pid_t
529 queue(struct smtpd *env)
530 {
531 	pid_t		 pid;
532 	struct passwd	*pw;
533 
534 	struct event	 ev_sigint;
535 	struct event	 ev_sigterm;
536 
537 	struct peer peers[] = {
538 		{ PROC_CONTROL,	queue_dispatch_control },
539 		{ PROC_SMTP,	queue_dispatch_smtp },
540 		{ PROC_MDA,	queue_dispatch_mda },
541 		{ PROC_MTA,	queue_dispatch_mta },
542 		{ PROC_LKA,	queue_dispatch_lka },
543 		{ PROC_RUNNER,	queue_dispatch_runner }
544 	};
545 
546 	switch (pid = fork()) {
547 	case -1:
548 		fatal("queue: cannot fork");
549 	case 0:
550 		break;
551 	default:
552 		return (pid);
553 	}
554 
555 	purge_config(env, PURGE_EVERYTHING);
556 
557 	pw = env->sc_pw;
558 
559 #ifndef DEBUG
560 	if (chroot(PATH_SPOOL) == -1)
561 		fatal("queue: chroot");
562 	if (chdir("/") == -1)
563 		fatal("queue: chdir(\"/\")");
564 #else
565 #warning disabling privilege revocation and chroot in DEBUG MODE
566 #endif
567 
568 	setproctitle("queue handler");
569 	smtpd_process = PROC_QUEUE;
570 
571 #ifndef DEBUG
572 	if (setgroups(1, &pw->pw_gid) ||
573 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
574 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
575 		fatal("queue: cannot drop privileges");
576 #endif
577 
578 	event_init();
579 
580 	signal_set(&ev_sigint, SIGINT, queue_sig_handler, env);
581 	signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, env);
582 	signal_add(&ev_sigint, NULL);
583 	signal_add(&ev_sigterm, NULL);
584 	signal(SIGPIPE, SIG_IGN);
585 	signal(SIGHUP, SIG_IGN);
586 
587 	config_pipes(env, peers, 6);
588 	config_peers(env, peers, 6);
589 
590 	queue_purge(PATH_INCOMING);
591 	queue_purge(PATH_ENQUEUE);
592 
593 	queue_setup_events(env);
594 	event_dispatch();
595 	queue_shutdown();
596 
597 	return (0);
598 }
599 
600 int
601 queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct message *messagep)
602 {
603 	TAILQ_REMOVE(&batchp->messages, messagep, entry);
604 	bzero(messagep, sizeof(struct message));
605 	free(messagep);
606 
607 	if (TAILQ_FIRST(&batchp->messages) == NULL) {
608 		SPLAY_REMOVE(batchtree, &env->batch_queue, batchp);
609 		bzero(batchp, sizeof(struct batch));
610 		free(batchp);
611 		return 1;
612 	}
613 	return 0;
614 }
615 
616 struct batch *
617 batch_by_id(struct smtpd *env, u_int64_t id)
618 {
619 	struct batch lookup;
620 
621 	lookup.id = id;
622 	return SPLAY_FIND(batchtree, &env->batch_queue, &lookup);
623 }
624 
625 
626 struct message *
627 message_by_id(struct smtpd *env, struct batch *batchp, u_int64_t id)
628 {
629 	struct message *messagep;
630 
631 	if (batchp != NULL) {
632 		TAILQ_FOREACH(messagep, &batchp->messages, entry) {
633 			if (messagep->id == id)
634 				break;
635 		}
636 		return messagep;
637 	}
638 
639 	SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) {
640 		TAILQ_FOREACH(messagep, &batchp->messages, entry) {
641 			if (messagep->id == id)
642 				return messagep;
643 		}
644 	}
645 	return NULL;
646 }
647 
648 void
649 queue_purge(char *queuepath)
650 {
651 	char		 path[MAXPATHLEN];
652 	struct qwalk	*q;
653 
654 	q = qwalk_new(queuepath);
655 
656 	while (qwalk(q, path))
657 		queue_delete_layout_message(queuepath, basename(path));
658 
659 	qwalk_close(q);
660 }
661