xref: /openbsd-src/usr.sbin/smtpd/queue.c (revision d13be5d47e4149db2549a9828e244d59dbc43f15)
1 /*	$OpenBSD: queue.c,v 1.106 2011/09/01 19:56:49 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 #include <sys/types.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/param.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 
27 #include <event.h>
28 #include <imsg.h>
29 #include <libgen.h>
30 #include <pwd.h>
31 #include <signal.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <unistd.h>
36 
37 #include "smtpd.h"
38 #include "log.h"
39 
40 static void queue_imsg(struct imsgev *, struct imsg *);
41 static void queue_pass_to_runner(struct imsgev *, struct imsg *);
42 static void queue_shutdown(void);
43 static void queue_sig_handler(int, short, void *);
44 static void queue_purge(enum queue_kind, char *);
45 
46 static void
47 queue_imsg(struct imsgev *iev, struct imsg *imsg)
48 {
49 	struct submit_status	 ss;
50 	struct envelope		*e;
51 	struct ramqueue_batch	*rq_batch;
52 	int			 fd, ret;
53 
54 	if (iev->proc == PROC_SMTP) {
55 		e = imsg->data;
56 
57 		switch (imsg->hdr.type) {
58 		case IMSG_QUEUE_CREATE_MESSAGE:
59 			ss.id = e->session_id;
60 			ss.code = 250;
61 			ss.u.msgid = 0;
62 			if (e->delivery.flags & DF_ENQUEUED)
63 				ret = queue_message_create(Q_ENQUEUE, &ss.u.msgid);
64 			else
65 				ret = queue_message_create(Q_INCOMING, &ss.u.msgid);
66 			if (ret == 0)
67 				ss.code = 421;
68 			imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1,
69 			    &ss, sizeof ss);
70 			return;
71 
72 		case IMSG_QUEUE_REMOVE_MESSAGE:
73 			if (e->delivery.flags & DF_ENQUEUED)
74 				queue_message_purge(Q_ENQUEUE, evpid_to_msgid(e->delivery.id));
75 			else
76 				queue_message_purge(Q_INCOMING, evpid_to_msgid(e->delivery.id));
77 			return;
78 
79 		case IMSG_QUEUE_COMMIT_MESSAGE:
80 			ss.id = e->session_id;
81 			if (e->delivery.flags & DF_ENQUEUED) {
82 				if (queue_message_commit(Q_ENQUEUE, evpid_to_msgid(e->delivery.id)))
83 					stat_increment(STATS_QUEUE_LOCAL);
84 				else
85 					ss.code = 421;
86 			} else {
87 				if (queue_message_commit(Q_INCOMING, evpid_to_msgid(e->delivery.id)))
88 					stat_increment(STATS_QUEUE_REMOTE);
89 				else
90 					ss.code = 421;
91 			}
92 			imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
93 			    &ss, sizeof ss);
94 
95 			if (ss.code != 421)
96 				queue_pass_to_runner(iev, imsg);
97 
98 			return;
99 
100 		case IMSG_QUEUE_MESSAGE_FILE:
101 			ss.id = e->session_id;
102 			if (e->delivery.flags & DF_ENQUEUED)
103 				fd = queue_message_fd_rw(Q_ENQUEUE, evpid_to_msgid(e->delivery.id));
104 			else
105 				fd = queue_message_fd_rw(Q_INCOMING, evpid_to_msgid(e->delivery.id));
106 			if (fd == -1)
107 				ss.code = 421;
108 			imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd,
109 			    &ss, sizeof ss);
110 			return;
111 
112 		case IMSG_SMTP_ENQUEUE:
113 			queue_pass_to_runner(iev, imsg);
114 			return;
115 		}
116 	}
117 
118 	if (iev->proc == PROC_LKA) {
119 		e = imsg->data;
120 
121 		switch (imsg->hdr.type) {
122 		case IMSG_QUEUE_SUBMIT_ENVELOPE:
123 			ss.id = e->session_id;
124 
125 			/* Write to disk */
126 			if (e->delivery.flags & DF_ENQUEUED)
127 				ret = queue_envelope_create(Q_ENQUEUE, e);
128 			else
129 				ret = queue_envelope_create(Q_INCOMING, e);
130 
131 			if (ret == 0) {
132 				ss.code = 421;
133 				imsg_compose_event(env->sc_ievs[PROC_SMTP],
134 				    IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss,
135 				    sizeof ss);
136 			}
137 			return;
138 
139 		case IMSG_QUEUE_COMMIT_ENVELOPES:
140 			ss.id = e->session_id;
141 			ss.code = 250;
142 			imsg_compose_event(env->sc_ievs[PROC_SMTP],
143 			    IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, &ss,
144 			    sizeof ss);
145 			return;
146 		}
147 	}
148 
149 	if (iev->proc == PROC_RUNNER) {
150 		/* forward imsgs from runner on its behalf */
151 		imsg_compose_event(env->sc_ievs[imsg->hdr.peerid], imsg->hdr.type,
152 		    0, imsg->hdr.pid, imsg->fd, (char *)imsg->data,
153 		    imsg->hdr.len - sizeof imsg->hdr);
154 		return;
155 	}
156 
157 	if (iev->proc == PROC_MTA) {
158 		switch (imsg->hdr.type) {
159 		case IMSG_QUEUE_MESSAGE_FD:
160 			rq_batch = imsg->data;
161 			fd = queue_message_fd_r(Q_QUEUE, rq_batch->msgid);
162 			imsg_compose_event(iev,  IMSG_QUEUE_MESSAGE_FD, 0, 0,
163 			    fd, rq_batch, sizeof *rq_batch);
164 			return;
165 
166 		case IMSG_QUEUE_MESSAGE_UPDATE:
167 		case IMSG_BATCH_DONE:
168 			queue_pass_to_runner(iev, imsg);
169 			return;
170 		}
171 	}
172 
173 	if (iev->proc == PROC_MDA) {
174 		switch (imsg->hdr.type) {
175 		case IMSG_QUEUE_MESSAGE_UPDATE:
176 		case IMSG_MDA_SESS_NEW:
177 			queue_pass_to_runner(iev, imsg);
178 			return;
179 		}
180 	}
181 
182 	if (iev->proc == PROC_CONTROL) {
183 		switch (imsg->hdr.type) {
184 		case IMSG_QUEUE_PAUSE_LOCAL:
185 		case IMSG_QUEUE_PAUSE_OUTGOING:
186 		case IMSG_QUEUE_RESUME_LOCAL:
187 		case IMSG_QUEUE_RESUME_OUTGOING:
188 		case IMSG_QUEUE_SCHEDULE:
189 		case IMSG_QUEUE_REMOVE:
190 			queue_pass_to_runner(iev, imsg);
191 			return;
192 		}
193 	}
194 
195 	if (iev->proc == PROC_PARENT) {
196 		switch (imsg->hdr.type) {
197 		case IMSG_PARENT_ENQUEUE_OFFLINE:
198 			queue_pass_to_runner(iev, imsg);
199 			return;
200 
201 		case IMSG_CTL_VERBOSE:
202 			log_verbose(*(int *)imsg->data);
203 			queue_pass_to_runner(iev, imsg);
204 			return;
205 		}
206 	}
207 
208 	fatalx("queue_imsg: unexpected imsg");
209 }
210 
211 static void
212 queue_pass_to_runner(struct imsgev *iev, struct imsg *imsg)
213 {
214 	imsg_compose_event(env->sc_ievs[PROC_RUNNER], imsg->hdr.type,
215 	    iev->proc, imsg->hdr.pid, imsg->fd, imsg->data,
216 	    imsg->hdr.len - sizeof imsg->hdr);
217 }
218 
219 static void
220 queue_sig_handler(int sig, short event, void *p)
221 {
222 	switch (sig) {
223 	case SIGINT:
224 	case SIGTERM:
225 		queue_shutdown();
226 		break;
227 	default:
228 		fatalx("queue_sig_handler: unexpected signal");
229 	}
230 }
231 
232 static void
233 queue_shutdown(void)
234 {
235 	log_info("queue handler exiting");
236 	_exit(0);
237 }
238 
239 pid_t
240 queue(void)
241 {
242 	pid_t		 pid;
243 	struct passwd	*pw;
244 
245 	struct event	 ev_sigint;
246 	struct event	 ev_sigterm;
247 
248 	struct peer peers[] = {
249 		{ PROC_PARENT,	imsg_dispatch },
250 		{ PROC_CONTROL,	imsg_dispatch },
251 		{ PROC_SMTP,	imsg_dispatch },
252 		{ PROC_MDA,	imsg_dispatch },
253 		{ PROC_MTA,	imsg_dispatch },
254 		{ PROC_LKA,	imsg_dispatch },
255 		{ PROC_RUNNER,	imsg_dispatch }
256 	};
257 
258 	switch (pid = fork()) {
259 	case -1:
260 		fatal("queue: cannot fork");
261 	case 0:
262 		break;
263 	default:
264 		return (pid);
265 	}
266 
267 	purge_config(PURGE_EVERYTHING);
268 
269 	pw = env->sc_pw;
270 
271 	if (chroot(PATH_SPOOL) == -1)
272 		fatal("queue: chroot");
273 	if (chdir("/") == -1)
274 		fatal("queue: chdir(\"/\")");
275 
276 	smtpd_process = PROC_QUEUE;
277 	setproctitle("%s", env->sc_title[smtpd_process]);
278 
279 	if (setgroups(1, &pw->pw_gid) ||
280 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
281 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
282 		fatal("queue: cannot drop privileges");
283 
284 	imsg_callback = queue_imsg;
285 	event_init();
286 
287 	signal_set(&ev_sigint, SIGINT, queue_sig_handler, NULL);
288 	signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, NULL);
289 	signal_add(&ev_sigint, NULL);
290 	signal_add(&ev_sigterm, NULL);
291 	signal(SIGPIPE, SIG_IGN);
292 	signal(SIGHUP, SIG_IGN);
293 
294 	/*
295 	 * queue opens fds for four purposes: smtp, mta, mda, and bounces.
296 	 * Therefore, use all available fd space and set the maxconn (=max
297 	 * session count for mta and mda) to a quarter of this value.
298 	 */
299 	fdlimit(1.0);
300 	if ((env->sc_maxconn = availdesc() / 4) < 1)
301 		fatalx("runner: fd starvation");
302 
303 	config_pipes(peers, nitems(peers));
304 	config_peers(peers, nitems(peers));
305 
306 	queue_purge(Q_INCOMING, PATH_INCOMING);
307 	queue_purge(Q_ENQUEUE, PATH_ENQUEUE);
308 
309 	if (event_dispatch() <  0)
310 		fatal("event_dispatch");
311 	queue_shutdown();
312 
313 	return (0);
314 }
315 
316 static void
317 queue_purge(enum queue_kind qkind, char *queuepath)
318 {
319 	char		 path[MAXPATHLEN];
320 	struct qwalk	*q;
321 
322 	q = qwalk_new(queuepath);
323 
324 	while (qwalk(q, path)) {
325 		u_int32_t msgid;
326 
327 		if ((msgid = filename_to_msgid(basename(path))) == 0) {
328 			log_warnx("queue_purge: invalid evpid");
329 			continue;
330 		}
331 		queue_message_purge(qkind, msgid);
332 	}
333 
334 	qwalk_close(q);
335 }
336 
337 void
338 queue_submit_envelope(struct envelope *ep)
339 {
340 	imsg_compose_event(env->sc_ievs[PROC_QUEUE],
341 	    IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1,
342 	    ep, sizeof(*ep));
343 }
344 
345 void
346 queue_commit_envelopes(struct envelope *ep)
347 {
348 	imsg_compose_event(env->sc_ievs[PROC_QUEUE],
349 	    IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1,
350 	    ep, sizeof(*ep));
351 }
352