xref: /openbsd-src/usr.sbin/smtpd/queue.c (revision 5054e3e78af0749a9bb00ba9a024b3ee2d90290f)
1 /*	$OpenBSD: queue.c,v 1.73 2009/11/08 21:40:05 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 #include <sys/types.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/param.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <libgen.h>
31 #include <pwd.h>
32 #include <signal.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <unistd.h>
37 
38 #include "smtpd.h"
39 
40 __dead void	queue_shutdown(void);
41 void		queue_sig_handler(int, short, void *);
42 void		queue_dispatch_control(int, short, void *);
43 void		queue_dispatch_smtp(int, short, void *);
44 void		queue_dispatch_mda(int, short, void *);
45 void		queue_dispatch_mta(int, short, void *);
46 void		queue_dispatch_lka(int, short, void *);
47 void		queue_dispatch_runner(int, short, void *);
48 void		queue_setup_events(struct smtpd *);
49 void		queue_disable_events(struct smtpd *);
50 void		queue_purge(char *);
51 
52 int		queue_create_layout_message(char *, char *);
53 void		queue_delete_layout_message(char *, char *);
54 int		queue_record_layout_envelope(char *, struct message *);
55 int		queue_remove_layout_envelope(char *, struct message *);
56 int		queue_commit_layout_message(char *, struct message *);
57 int		queue_open_layout_messagefile(char *, struct message *);
58 
59 void		queue_submit_envelope(struct smtpd *, struct message *);
60 void	        queue_commit_envelopes(struct smtpd *, struct message*);
61 
62 void
63 queue_sig_handler(int sig, short event, void *p)
64 {
65 	switch (sig) {
66 	case SIGINT:
67 	case SIGTERM:
68 		queue_shutdown();
69 		break;
70 	default:
71 		fatalx("queue_sig_handler: unexpected signal");
72 	}
73 }
74 
75 void
76 queue_dispatch_control(int sig, short event, void *p)
77 {
78 	struct smtpd		*env = p;
79 	struct imsgev		*iev;
80 	struct imsgbuf		*ibuf;
81 	struct imsg		 imsg;
82 	ssize_t			 n;
83 
84 	iev = env->sc_ievs[PROC_CONTROL];
85 	ibuf = &iev->ibuf;
86 
87 	if (event & EV_READ) {
88 		if ((n = imsg_read(ibuf)) == -1)
89 			fatal("imsg_read_error");
90 		if (n == 0) {
91 			/* this pipe is dead, so remove the event handler */
92 			event_del(&iev->ev);
93 			event_loopexit(NULL);
94 			return;
95 		}
96 	}
97 
98 	if (event & EV_WRITE) {
99 		if (msgbuf_write(&ibuf->w) == -1)
100 			fatal("msgbuf_write");
101 	}
102 
103 	for (;;) {
104 		if ((n = imsg_get(ibuf, &imsg)) == -1)
105 			fatal("queue_dispatch_control: imsg_get error");
106 		if (n == 0)
107 			break;
108 
109 		switch (imsg.hdr.type) {
110 		default:
111 			log_warnx("queue_dispatch_control: got imsg %d",
112 			    imsg.hdr.type);
113 			fatalx("queue_dispatch_control: unexpected imsg");
114 		}
115 		imsg_free(&imsg);
116 	}
117 	imsg_event_add(iev);
118 }
119 
120 void
121 queue_dispatch_smtp(int sig, short event, void *p)
122 {
123 	struct smtpd		*env = p;
124 	struct imsgev		*iev;
125 	struct imsgbuf		*ibuf;
126 	struct imsg		 imsg;
127 	ssize_t			 n;
128 
129 	iev = env->sc_ievs[PROC_SMTP];
130 	ibuf = &iev->ibuf;
131 
132 	if (event & EV_READ) {
133 		if ((n = imsg_read(ibuf)) == -1)
134 			fatal("imsg_read_error");
135 		if (n == 0) {
136 			/* this pipe is dead, so remove the event handler */
137 			event_del(&iev->ev);
138 			event_loopexit(NULL);
139 			return;
140 		}
141 	}
142 
143 	if (event & EV_WRITE) {
144 		if (msgbuf_write(&ibuf->w) == -1)
145 			fatal("msgbuf_write");
146 	}
147 
148 	for (;;) {
149 		if ((n = imsg_get(ibuf, &imsg)) == -1)
150 			fatal("queue_dispatch_smtp: imsg_get error");
151 		if (n == 0)
152 			break;
153 
154 		switch (imsg.hdr.type) {
155 		case IMSG_QUEUE_CREATE_MESSAGE: {
156 			struct message		*messagep = imsg.data;
157 			struct submit_status	 ss;
158 			int			(*f)(char *);
159 
160 			log_debug("queue_dispatch_smtp: creating message file");
161 
162 			IMSG_SIZE_CHECK(messagep);
163 
164 			ss.id = messagep->session_id;
165 			ss.code = 250;
166 			bzero(ss.u.msgid, MAX_ID_SIZE);
167 
168 			if (messagep->flags & F_MESSAGE_ENQUEUED)
169 				f = enqueue_create_layout;
170 			else
171 				f = queue_create_incoming_layout;
172 
173 			if (! f(ss.u.msgid))
174 				ss.code = 421;
175 
176 			imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1,
177 			    &ss, sizeof(ss));
178 			break;
179 		}
180 		case IMSG_QUEUE_REMOVE_MESSAGE: {
181 			struct message		*messagep = imsg.data;
182 			void			(*f)(char *);
183 
184 			IMSG_SIZE_CHECK(messagep);
185 
186 			if (messagep->flags & F_MESSAGE_ENQUEUED)
187 				f = enqueue_delete_message;
188 			else
189 				f = queue_delete_incoming_message;
190 
191 			f(messagep->message_id);
192 
193 			break;
194 		}
195 		case IMSG_QUEUE_COMMIT_MESSAGE: {
196 			struct message		*messagep = imsg.data;
197 			struct submit_status	 ss;
198 			size_t			*counter;
199 			int			(*f)(struct message *);
200 
201 			IMSG_SIZE_CHECK(messagep);
202 
203 			ss.id = messagep->session_id;
204 
205 			if (messagep->flags & F_MESSAGE_ENQUEUED) {
206 				f = enqueue_commit_message;
207 				counter = &env->stats->queue.inserts_local;
208 			} else {
209 				f = queue_commit_incoming_message;
210 				counter = &env->stats->queue.inserts_remote;
211 			}
212 
213 			if (f(messagep))
214 				(*counter)++;
215 			else
216 				ss.code = 421;
217 
218 			imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
219 			    &ss, sizeof(ss));
220 
221 			break;
222 		}
223 		case IMSG_QUEUE_MESSAGE_FILE: {
224 			struct message		*messagep = imsg.data;
225 			struct submit_status	 ss;
226 			int fd;
227 			int			(*f)(struct message *);
228 
229 			IMSG_SIZE_CHECK(messagep);
230 
231 			ss.id = messagep->session_id;
232 
233 			if (messagep->flags & F_MESSAGE_ENQUEUED)
234 				f = enqueue_open_messagefile;
235 			else
236 				f = queue_open_incoming_message_file;
237 
238 			fd = f(messagep);
239 			if (fd == -1)
240 				ss.code = 421;
241 
242 			imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd,
243 			    &ss, sizeof(ss));
244 			break;
245 		}
246 		default:
247 			log_warnx("queue_dispatch_smtp: got imsg %d",
248 			    imsg.hdr.type);
249 			fatalx("queue_dispatch_smtp: unexpected imsg");
250 		}
251 		imsg_free(&imsg);
252 	}
253 	imsg_event_add(iev);
254 }
255 
256 void
257 queue_dispatch_mda(int sig, short event, void *p)
258 {
259 	struct smtpd		*env = p;
260 	struct imsgev		*iev;
261 	struct imsgbuf		*ibuf;
262 	struct imsg		 imsg;
263 	ssize_t			 n;
264 
265 	iev = env->sc_ievs[PROC_MDA];
266 	ibuf = &iev->ibuf;
267 
268 	if (event & EV_READ) {
269 		if ((n = imsg_read(ibuf)) == -1)
270 			fatal("imsg_read_error");
271 		if (n == 0) {
272 			/* this pipe is dead, so remove the event handler */
273 			event_del(&iev->ev);
274 			event_loopexit(NULL);
275 			return;
276 		}
277 	}
278 
279 	if (event & EV_WRITE) {
280 		if (msgbuf_write(&ibuf->w) == -1)
281 			fatal("msgbuf_write");
282 	}
283 
284 	for (;;) {
285 		if ((n = imsg_get(ibuf, &imsg)) == -1)
286 			fatal("queue_dispatch_mda: imsg_get error");
287 		if (n == 0)
288 			break;
289 
290 		switch (imsg.hdr.type) {
291 
292 		case IMSG_QUEUE_MESSAGE_UPDATE: {
293 			imsg_compose_event(env->sc_ievs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE,
294 			    0, 0, -1, imsg.data, sizeof(struct message));
295 			break;
296 		}
297 
298 		default:
299 			log_warnx("got imsg %d", imsg.hdr.type);
300 			fatalx("queue_dispatch_mda: unexpected imsg");
301 		}
302 		imsg_free(&imsg);
303 	}
304 	imsg_event_add(iev);
305 }
306 
307 void
308 queue_dispatch_mta(int sig, short event, void *p)
309 {
310 	struct smtpd		*env = p;
311 	struct imsgev		*iev;
312 	struct imsgbuf		*ibuf;
313 	struct imsg		 imsg;
314 	ssize_t			 n;
315 
316 	iev = env->sc_ievs[PROC_MTA];
317 	ibuf = &iev->ibuf;
318 
319 	if (event & EV_READ) {
320 		if ((n = imsg_read(ibuf)) == -1)
321 			fatal("imsg_read_error");
322 		if (n == 0) {
323 			/* this pipe is dead, so remove the event handler */
324 			event_del(&iev->ev);
325 			event_loopexit(NULL);
326 			return;
327 		}
328 	}
329 
330 	if (event & EV_WRITE) {
331 		if (msgbuf_write(&ibuf->w) == -1)
332 			fatal("msgbuf_write");
333 	}
334 
335 	for (;;) {
336 		if ((n = imsg_get(ibuf, &imsg)) == -1)
337 			fatal("queue_dispatch_mta: imsg_get error");
338 		if (n == 0)
339 			break;
340 
341 		switch (imsg.hdr.type) {
342 
343 		case IMSG_QUEUE_MESSAGE_FD: {
344 			struct batch *batchp = imsg.data;
345 			int fd;
346 
347 			IMSG_SIZE_CHECK(batchp);
348 
349 			fd = queue_open_message_file(batchp->message_id);
350 			imsg_compose_event(iev,  IMSG_QUEUE_MESSAGE_FD, 0, 0, fd, batchp,
351 			    sizeof(*batchp));
352 			break;
353 		}
354 
355 		case IMSG_QUEUE_MESSAGE_UPDATE: {
356 			imsg_compose_event(env->sc_ievs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE,
357 			    0, 0, -1, imsg.data, sizeof(struct message));
358 			break;
359 		}
360 
361 		default:
362 			log_warnx("got imsg %d", imsg.hdr.type);
363 			fatalx("queue_dispatch_mda: unexpected imsg");
364 		}
365 		imsg_free(&imsg);
366 	}
367 	imsg_event_add(iev);
368 }
369 
370 void
371 queue_dispatch_lka(int sig, short event, void *p)
372 {
373 	struct smtpd		*env = p;
374 	struct imsgev		*iev;
375 	struct imsgbuf		*ibuf;
376 	struct imsg		 imsg;
377 	ssize_t			 n;
378 
379 	iev = env->sc_ievs[PROC_LKA];
380 	ibuf = &iev->ibuf;
381 
382 	if (event & EV_READ) {
383 		if ((n = imsg_read(ibuf)) == -1)
384 			fatal("imsg_read_error");
385 		if (n == 0) {
386 			/* this pipe is dead, so remove the event handler */
387 			event_del(&iev->ev);
388 			event_loopexit(NULL);
389 			return;
390 		}
391 	}
392 
393 	if (event & EV_WRITE) {
394 		if (msgbuf_write(&ibuf->w) == -1)
395 			fatal("msgbuf_write");
396 	}
397 
398 	for (;;) {
399 		if ((n = imsg_get(ibuf, &imsg)) == -1)
400 			fatal("queue_dispatch_lka: imsg_get error");
401 		if (n == 0)
402 			break;
403 
404 		switch (imsg.hdr.type) {
405 
406 		case IMSG_QUEUE_SUBMIT_ENVELOPE: {
407 			struct message		*messagep = imsg.data;
408 			struct submit_status	 ss;
409 			int (*f)(struct message *);
410 
411 			IMSG_SIZE_CHECK(messagep);
412 
413 			messagep->id = generate_uid();
414 			ss.id = messagep->session_id;
415 
416 			if (IS_MAILBOX(messagep->recipient) ||
417 			    IS_EXT(messagep->recipient))
418 				messagep->type = T_MDA_MESSAGE;
419 			else
420 				messagep->type = T_MTA_MESSAGE;
421 
422 			/* Write to disk */
423 			if (messagep->flags & F_MESSAGE_ENQUEUED)
424 				f = enqueue_record_envelope;
425 			else
426 				f = queue_record_incoming_envelope;
427 
428 			if (! f(messagep)) {
429 				ss.code = 421;
430 				imsg_compose_event(env->sc_ievs[PROC_SMTP],
431 				    IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss,
432 				    sizeof(ss));
433 			}
434 
435 			break;
436 		}
437 
438 		case IMSG_QUEUE_COMMIT_ENVELOPES: {
439 			struct message		*messagep = imsg.data;
440 			struct submit_status	 ss;
441 
442 			IMSG_SIZE_CHECK(messagep);
443 
444 			ss.id = messagep->session_id;
445 			ss.code = 250;
446 
447 			imsg_compose_event(env->sc_ievs[PROC_SMTP], IMSG_QUEUE_COMMIT_ENVELOPES,
448 			    0, 0, -1, &ss, sizeof(ss));
449 
450 			break;
451 		}
452 
453 		default:
454 			log_warnx("got imsg %d", imsg.hdr.type);
455 			fatalx("queue_dispatch_lka: unexpected imsg");
456 		}
457 		imsg_free(&imsg);
458 	}
459 	imsg_event_add(iev);
460 }
461 
462 void
463 queue_dispatch_runner(int sig, short event, void *p)
464 {
465 	struct smtpd		*env = p;
466 	struct imsgev		*iev;
467 	struct imsgbuf		*ibuf;
468 	struct imsg		 imsg;
469 	ssize_t			 n;
470 
471 	iev = env->sc_ievs[PROC_RUNNER];
472 	ibuf = &iev->ibuf;
473 
474 	if (event & EV_READ) {
475 		if ((n = imsg_read(ibuf)) == -1)
476 			fatal("imsg_read_error");
477 		if (n == 0) {
478 			/* this pipe is dead, so remove the event handler */
479 			event_del(&iev->ev);
480 			event_loopexit(NULL);
481 			return;
482 		}
483 	}
484 
485 	if (event & EV_WRITE) {
486 		if (msgbuf_write(&ibuf->w) == -1)
487 			fatal("msgbuf_write");
488 	}
489 
490 	for (;;) {
491 		if ((n = imsg_get(ibuf, &imsg)) == -1)
492 			fatal("queue_dispatch_runner: imsg_get error");
493 		if (n == 0)
494 			break;
495 
496 		switch (imsg.hdr.type) {
497 		default:
498 			log_warnx("got imsg %d", imsg.hdr.type);
499 			fatalx("queue_dispatch_runner: unexpected imsg");
500 		}
501 		imsg_free(&imsg);
502 	}
503 	imsg_event_add(iev);
504 }
505 
506 void
507 queue_shutdown(void)
508 {
509 	log_info("queue handler exiting");
510 	_exit(0);
511 }
512 
513 void
514 queue_setup_events(struct smtpd *env)
515 {
516 }
517 
518 void
519 queue_disable_events(struct smtpd *env)
520 {
521 }
522 
523 pid_t
524 queue(struct smtpd *env)
525 {
526 	pid_t		 pid;
527 	struct passwd	*pw;
528 
529 	struct event	 ev_sigint;
530 	struct event	 ev_sigterm;
531 
532 	struct peer peers[] = {
533 		{ PROC_CONTROL,	queue_dispatch_control },
534 		{ PROC_SMTP,	queue_dispatch_smtp },
535 		{ PROC_MDA,	queue_dispatch_mda },
536 		{ PROC_MTA,	queue_dispatch_mta },
537 		{ PROC_LKA,	queue_dispatch_lka },
538 		{ PROC_RUNNER,	queue_dispatch_runner }
539 	};
540 
541 	switch (pid = fork()) {
542 	case -1:
543 		fatal("queue: cannot fork");
544 	case 0:
545 		break;
546 	default:
547 		return (pid);
548 	}
549 
550 	purge_config(env, PURGE_EVERYTHING);
551 
552 	pw = env->sc_pw;
553 
554 #ifndef DEBUG
555 	if (chroot(PATH_SPOOL) == -1)
556 		fatal("queue: chroot");
557 	if (chdir("/") == -1)
558 		fatal("queue: chdir(\"/\")");
559 #else
560 #warning disabling privilege revocation and chroot in DEBUG MODE
561 #endif
562 
563 	smtpd_process = PROC_QUEUE;
564 	setproctitle("%s", env->sc_title[smtpd_process]);
565 
566 #ifndef DEBUG
567 	if (setgroups(1, &pw->pw_gid) ||
568 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
569 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
570 		fatal("queue: cannot drop privileges");
571 #endif
572 
573 	event_init();
574 
575 	signal_set(&ev_sigint, SIGINT, queue_sig_handler, env);
576 	signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, env);
577 	signal_add(&ev_sigint, NULL);
578 	signal_add(&ev_sigterm, NULL);
579 	signal(SIGPIPE, SIG_IGN);
580 	signal(SIGHUP, SIG_IGN);
581 
582 	config_pipes(env, peers, nitems(peers));
583 	config_peers(env, peers, nitems(peers));
584 
585 	queue_purge(PATH_INCOMING);
586 	queue_purge(PATH_ENQUEUE);
587 
588 	queue_setup_events(env);
589 	event_dispatch();
590 	queue_shutdown();
591 
592 	return (0);
593 }
594 
595 struct batch *
596 batch_by_id(struct smtpd *env, u_int64_t id)
597 {
598 	struct batch lookup;
599 
600 	lookup.id = id;
601 	return SPLAY_FIND(batchtree, &env->batch_queue, &lookup);
602 }
603 
604 
605 void
606 queue_purge(char *queuepath)
607 {
608 	char		 path[MAXPATHLEN];
609 	struct qwalk	*q;
610 
611 	q = qwalk_new(queuepath);
612 
613 	while (qwalk(q, path))
614 		queue_delete_layout_message(queuepath, basename(path));
615 
616 	qwalk_close(q);
617 }
618 
619 void
620 queue_submit_envelope(struct smtpd *env, struct message *message)
621 {
622 	imsg_compose_event(env->sc_ievs[PROC_QUEUE],
623 	    IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1,
624 	    message, sizeof(struct message));
625 }
626 
627 void
628 queue_commit_envelopes(struct smtpd *env, struct message *message)
629 {
630 	imsg_compose_event(env->sc_ievs[PROC_QUEUE],
631 	    IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1,
632 	    message, sizeof(struct message));
633 }
634