xref: /openbsd-src/usr.sbin/smtpd/queue.c (revision f2da64fbbbf1b03f09f390ab01267c93dfd77c4c)
1 /*	$OpenBSD: queue.c,v 1.182 2016/09/08 12:06:43 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
7  *
8  * Permission to use, copy, modify, and distribute this software for any
9  * purpose with or without fee is hereby granted, provided that the above
10  * copyright notice and this permission notice appear in all copies.
11  *
12  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
13  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
14  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
15  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
16  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
17  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
18  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
19  */
20 
21 #include <sys/types.h>
22 #include <sys/queue.h>
23 #include <sys/tree.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 
27 #include <err.h>
28 #include <event.h>
29 #include <imsg.h>
30 #include <inttypes.h>
31 #include <libgen.h>
32 #include <pwd.h>
33 #include <signal.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #include <limits.h>
40 
41 #include "smtpd.h"
42 #include "log.h"
43 
44 static void queue_imsg(struct mproc *, struct imsg *);
45 static void queue_timeout(int, short, void *);
46 static void queue_bounce(struct envelope *, struct delivery_bounce *);
47 static void queue_shutdown(void);
48 static void queue_log(const struct envelope *, const char *, const char *);
49 static void queue_msgid_walk(int, short, void *);
50 
51 
52 static void
53 queue_imsg(struct mproc *p, struct imsg *imsg)
54 {
55 	struct delivery_bounce	 bounce;
56 	struct msg_walkinfo	*wi;
57 	struct timeval		 tv;
58 	struct bounce_req_msg	*req_bounce;
59 	struct envelope		 evp;
60 	struct msg		 m;
61 	const char		*reason;
62 	uint64_t		 reqid, evpid, holdq;
63 	uint32_t		 msgid;
64 	time_t			 nexttry;
65 	size_t			 n_evp;
66 	int			 fd, mta_ext, ret, v, flags, code;
67 
68 	if (imsg == NULL)
69 		queue_shutdown();
70 
71 	memset(&bounce, 0, sizeof(struct delivery_bounce));
72 	if (p->proc == PROC_PONY) {
73 
74 		switch (imsg->hdr.type) {
75 		case IMSG_SMTP_MESSAGE_CREATE:
76 			m_msg(&m, imsg);
77 			m_get_id(&m, &reqid);
78 			m_end(&m);
79 
80 			ret = queue_message_create(&msgid);
81 
82 			m_create(p, IMSG_SMTP_MESSAGE_CREATE, 0, 0, -1);
83 			m_add_id(p, reqid);
84 			if (ret == 0)
85 				m_add_int(p, 0);
86 			else {
87 				m_add_int(p, 1);
88 				m_add_msgid(p, msgid);
89 			}
90 			m_close(p);
91 			return;
92 
93 		case IMSG_SMTP_MESSAGE_ROLLBACK:
94 			m_msg(&m, imsg);
95 			m_get_msgid(&m, &msgid);
96 			m_end(&m);
97 
98 			queue_message_delete(msgid);
99 
100 			m_create(p_scheduler, IMSG_QUEUE_MESSAGE_ROLLBACK,
101 			    0, 0, -1);
102 			m_add_msgid(p_scheduler, msgid);
103 			m_close(p_scheduler);
104 			return;
105 
106 		case IMSG_SMTP_MESSAGE_COMMIT:
107 			m_msg(&m, imsg);
108 			m_get_id(&m, &reqid);
109 			m_get_msgid(&m, &msgid);
110 			m_end(&m);
111 
112 			ret = queue_message_commit(msgid);
113 
114 			m_create(p, IMSG_SMTP_MESSAGE_COMMIT, 0, 0, -1);
115 			m_add_id(p, reqid);
116 			m_add_int(p, (ret == 0) ? 0 : 1);
117 			m_close(p);
118 
119 			if (ret) {
120 				m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
121 				    0, 0, -1);
122 				m_add_msgid(p_scheduler, msgid);
123 				m_close(p_scheduler);
124 			}
125 			return;
126 
127 		case IMSG_SMTP_MESSAGE_OPEN:
128 			m_msg(&m, imsg);
129 			m_get_id(&m, &reqid);
130 			m_get_msgid(&m, &msgid);
131 			m_end(&m);
132 
133 			fd = queue_message_fd_rw(msgid);
134 
135 			m_create(p, IMSG_SMTP_MESSAGE_OPEN, 0, 0, fd);
136 			m_add_id(p, reqid);
137 			m_add_int(p, (fd == -1) ? 0 : 1);
138 			m_close(p);
139 			return;
140 
141 		case IMSG_QUEUE_SMTP_SESSION:
142 			bounce_fd(imsg->fd);
143 			return;
144 		}
145 	}
146 
147 	if (p->proc == PROC_LKA) {
148 		switch (imsg->hdr.type) {
149 		case IMSG_LKA_ENVELOPE_SUBMIT:
150 			m_msg(&m, imsg);
151 			m_get_id(&m, &reqid);
152 			m_get_envelope(&m, &evp);
153 			m_end(&m);
154 
155 			if (evp.id == 0)
156 				log_warnx("warn: imsg_queue_submit_envelope: evpid=0");
157 			if (evpid_to_msgid(evp.id) == 0)
158 				log_warnx("warn: imsg_queue_submit_envelope: msgid=0, "
159 				    "evpid=%016"PRIx64, evp.id);
160 			ret = queue_envelope_create(&evp);
161 			m_create(p_pony, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
162 			m_add_id(p_pony, reqid);
163 			if (ret == 0)
164 				m_add_int(p_pony, 0);
165 			else {
166 				m_add_int(p_pony, 1);
167 				m_add_evpid(p_pony, evp.id);
168 			}
169 			m_close(p_pony);
170 			if (ret) {
171 				m_create(p_scheduler,
172 				    IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
173 				m_add_envelope(p_scheduler, &evp);
174 				m_close(p_scheduler);
175 
176 			}
177 			return;
178 
179 		case IMSG_LKA_ENVELOPE_COMMIT:
180 			m_msg(&m, imsg);
181 			m_get_id(&m, &reqid);
182 			m_end(&m);
183 			m_create(p_pony, IMSG_QUEUE_ENVELOPE_COMMIT, 0, 0, -1);
184 			m_add_id(p_pony, reqid);
185 			m_add_int(p_pony, 1);
186 			m_close(p_pony);
187 			return;
188 		}
189 	}
190 
191 	if (p->proc == PROC_SCHEDULER) {
192 		switch (imsg->hdr.type) {
193 		case IMSG_SCHED_ENVELOPE_REMOVE:
194 			m_msg(&m, imsg);
195 			m_get_evpid(&m, &evpid);
196 			m_end(&m);
197 
198 			m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1);
199 			m_add_evpid(p_scheduler, evpid);
200 			m_close(p_scheduler);
201 
202 			/* already removed by scheduler */
203 			if (queue_envelope_load(evpid, &evp) == 0)
204 				return;
205 
206 			queue_log(&evp, "Remove", "Removed by administrator");
207 			queue_envelope_delete(evpid);
208 			return;
209 
210 		case IMSG_SCHED_ENVELOPE_EXPIRE:
211 			m_msg(&m, imsg);
212 			m_get_evpid(&m, &evpid);
213 			m_end(&m);
214 
215 			m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1);
216 			m_add_evpid(p_scheduler, evpid);
217 			m_close(p_scheduler);
218 
219 			/* already removed by scheduler*/
220 			if (queue_envelope_load(evpid, &evp) == 0)
221 				return;
222 
223 			bounce.type = B_ERROR;
224 			envelope_set_errormsg(&evp, "Envelope expired");
225 			envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
226 			envelope_set_esc_code(&evp, ESC_DELIVERY_TIME_EXPIRED);
227 			queue_bounce(&evp, &bounce);
228 			queue_log(&evp, "Expire", "Envelope expired");
229 			queue_envelope_delete(evpid);
230 			return;
231 
232 		case IMSG_SCHED_ENVELOPE_BOUNCE:
233 			CHECK_IMSG_DATA_SIZE(imsg, sizeof *req_bounce);
234 			req_bounce = imsg->data;
235 			evpid = req_bounce->evpid;
236 
237 			if (queue_envelope_load(evpid, &evp) == 0) {
238 				log_warnx("queue: bounce: failed to load envelope");
239 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
240 				m_add_evpid(p_scheduler, evpid);
241 				m_add_u32(p_scheduler, 0); /* not in-flight */
242 				m_close(p_scheduler);
243 				return;
244 			}
245 			queue_bounce(&evp, &req_bounce->bounce);
246 			evp.lastbounce = req_bounce->timestamp;
247 			if (!queue_envelope_update(&evp))
248 				log_warnx("warn: could not update envelope %016"PRIx64, evpid);
249 			return;
250 
251 		case IMSG_SCHED_ENVELOPE_DELIVER:
252 			m_msg(&m, imsg);
253 			m_get_evpid(&m, &evpid);
254 			m_end(&m);
255 			if (queue_envelope_load(evpid, &evp) == 0) {
256 				log_warnx("queue: deliver: failed to load envelope");
257 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
258 				m_add_evpid(p_scheduler, evpid);
259 				m_add_u32(p_scheduler, 1); /* in-flight */
260 				m_close(p_scheduler);
261 				return;
262 			}
263 			evp.lasttry = time(NULL);
264 			m_create(p_pony, IMSG_QUEUE_DELIVER, 0, 0, -1);
265 			m_add_envelope(p_pony, &evp);
266 			m_close(p_pony);
267 			return;
268 
269 		case IMSG_SCHED_ENVELOPE_INJECT:
270 			m_msg(&m, imsg);
271 			m_get_evpid(&m, &evpid);
272 			m_end(&m);
273 			bounce_add(evpid);
274 			return;
275 
276 		case IMSG_SCHED_ENVELOPE_TRANSFER:
277 			m_msg(&m, imsg);
278 			m_get_evpid(&m, &evpid);
279 			m_end(&m);
280 			if (queue_envelope_load(evpid, &evp) == 0) {
281 				log_warnx("queue: failed to load envelope");
282 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
283 				m_add_evpid(p_scheduler, evpid);
284 				m_add_u32(p_scheduler, 1); /* in-flight */
285 				m_close(p_scheduler);
286 				return;
287 			}
288 			evp.lasttry = time(NULL);
289 			m_create(p_pony, IMSG_QUEUE_TRANSFER, 0, 0, -1);
290 			m_add_envelope(p_pony, &evp);
291 			m_close(p_pony);
292 			return;
293 
294 		case IMSG_CTL_LIST_ENVELOPES:
295 			if (imsg->hdr.len == sizeof imsg->hdr) {
296 				m_forward(p_control, imsg);
297 				return;
298 			}
299 
300 			m_msg(&m, imsg);
301 			m_get_evpid(&m, &evpid);
302 			m_get_int(&m, &flags);
303 			m_get_time(&m, &nexttry);
304 			m_end(&m);
305 
306 			if (queue_envelope_load(evpid, &evp) == 0)
307 				return; /* Envelope is gone, drop it */
308 
309 			/*
310 			 * XXX consistency: The envelope might already be on
311 			 * its way back to the scheduler.  We need to detect
312 			 * this properly and report that state.
313 			 */
314 			if (flags & EF_INFLIGHT) {
315 				/*
316 				 * Not exactly correct but pretty close: The
317 				 * value is not recorded on the envelope unless
318 				 * a tempfail occurs.
319 				 */
320 				evp.lasttry = nexttry;
321 			}
322 
323 			m_create(p_control, IMSG_CTL_LIST_ENVELOPES,
324 			    imsg->hdr.peerid, 0, -1);
325 			m_add_int(p_control, flags);
326 			m_add_time(p_control, nexttry);
327 			m_add_envelope(p_control, &evp);
328 			m_close(p_control);
329 			return;
330 		}
331 	}
332 
333 	if (p->proc == PROC_PONY) {
334 		switch (imsg->hdr.type) {
335 		case IMSG_MDA_OPEN_MESSAGE:
336 		case IMSG_MTA_OPEN_MESSAGE:
337 			m_msg(&m, imsg);
338 			m_get_id(&m, &reqid);
339 			m_get_msgid(&m, &msgid);
340 			m_end(&m);
341 			fd = queue_message_fd_r(msgid);
342 			m_create(p, imsg->hdr.type, 0, 0, fd);
343 			m_add_id(p, reqid);
344 			m_close(p);
345 			return;
346 
347 		case IMSG_MDA_DELIVERY_OK:
348 		case IMSG_MTA_DELIVERY_OK:
349 			m_msg(&m, imsg);
350 			m_get_evpid(&m, &evpid);
351 			if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK)
352 				m_get_int(&m, &mta_ext);
353 			m_end(&m);
354 			if (queue_envelope_load(evpid, &evp) == 0) {
355 				log_warn("queue: dsn: failed to load envelope");
356 				return;
357 			}
358 			if (evp.dsn_notify & DSN_SUCCESS) {
359 				bounce.type = B_DSN;
360 				bounce.dsn_ret = evp.dsn_ret;
361 				envelope_set_esc_class(&evp, ESC_STATUS_OK);
362 				if (imsg->hdr.type == IMSG_MDA_DELIVERY_OK)
363 					queue_bounce(&evp, &bounce);
364 				else if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK &&
365 				    (mta_ext & MTA_EXT_DSN) == 0) {
366 					bounce.mta_without_dsn = 1;
367 					queue_bounce(&evp, &bounce);
368 				}
369 			}
370 			queue_envelope_delete(evpid);
371 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_OK, 0, 0, -1);
372 			m_add_evpid(p_scheduler, evpid);
373 			m_close(p_scheduler);
374 			return;
375 
376 		case IMSG_MDA_DELIVERY_TEMPFAIL:
377 		case IMSG_MTA_DELIVERY_TEMPFAIL:
378 			m_msg(&m, imsg);
379 			m_get_evpid(&m, &evpid);
380 			m_get_string(&m, &reason);
381 			m_get_int(&m, &code);
382 			m_end(&m);
383 			if (queue_envelope_load(evpid, &evp) == 0) {
384 				log_warnx("queue: tempfail: failed to load envelope");
385 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
386 				m_add_evpid(p_scheduler, evpid);
387 				m_add_u32(p_scheduler, 1); /* in-flight */
388 				m_close(p_scheduler);
389 				return;
390 			}
391 			envelope_set_errormsg(&evp, "%s", reason);
392 			envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
393 			envelope_set_esc_code(&evp, code);
394 			evp.retry++;
395 			if (!queue_envelope_update(&evp))
396 				log_warnx("warn: could not update envelope %016"PRIx64, evpid);
397 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1);
398 			m_add_envelope(p_scheduler, &evp);
399 			m_close(p_scheduler);
400 			return;
401 
402 		case IMSG_MDA_DELIVERY_PERMFAIL:
403 		case IMSG_MTA_DELIVERY_PERMFAIL:
404 			m_msg(&m, imsg);
405 			m_get_evpid(&m, &evpid);
406 			m_get_string(&m, &reason);
407 			m_get_int(&m, &code);
408 			m_end(&m);
409 			if (queue_envelope_load(evpid, &evp) == 0) {
410 				log_warnx("queue: permfail: failed to load envelope");
411 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
412 				m_add_evpid(p_scheduler, evpid);
413 				m_add_u32(p_scheduler, 1); /* in-flight */
414 				m_close(p_scheduler);
415 				return;
416 			}
417 			bounce.type = B_ERROR;
418 			envelope_set_errormsg(&evp, "%s", reason);
419 			envelope_set_esc_class(&evp, ESC_STATUS_PERMFAIL);
420 			envelope_set_esc_code(&evp, code);
421 			queue_bounce(&evp, &bounce);
422 			queue_envelope_delete(evpid);
423 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1);
424 			m_add_evpid(p_scheduler, evpid);
425 			m_close(p_scheduler);
426 			return;
427 
428 		case IMSG_MDA_DELIVERY_LOOP:
429 		case IMSG_MTA_DELIVERY_LOOP:
430 			m_msg(&m, imsg);
431 			m_get_evpid(&m, &evpid);
432 			m_end(&m);
433 			if (queue_envelope_load(evpid, &evp) == 0) {
434 				log_warnx("queue: loop: failed to load envelope");
435 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
436 				m_add_evpid(p_scheduler, evpid);
437 				m_add_u32(p_scheduler, 1); /* in-flight */
438 				m_close(p_scheduler);
439 				return;
440 			}
441 			envelope_set_errormsg(&evp, "%s", "Loop detected");
442 			envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
443 			envelope_set_esc_code(&evp, ESC_ROUTING_LOOP_DETECTED);
444 			bounce.type = B_ERROR;
445 			queue_bounce(&evp, &bounce);
446 			queue_envelope_delete(evp.id);
447 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1);
448 			m_add_evpid(p_scheduler, evp.id);
449 			m_close(p_scheduler);
450 			return;
451 
452 		case IMSG_MTA_DELIVERY_HOLD:
453 		case IMSG_MDA_DELIVERY_HOLD:
454 			imsg->hdr.type = IMSG_QUEUE_HOLDQ_HOLD;
455 			m_forward(p_scheduler, imsg);
456 			return;
457 
458 		case IMSG_MTA_SCHEDULE:
459 			imsg->hdr.type = IMSG_QUEUE_ENVELOPE_SCHEDULE;
460 			m_forward(p_scheduler, imsg);
461 			return;
462 
463 		case IMSG_MTA_HOLDQ_RELEASE:
464 		case IMSG_MDA_HOLDQ_RELEASE:
465 			m_msg(&m, imsg);
466 			m_get_id(&m, &holdq);
467 			m_get_int(&m, &v);
468 			m_end(&m);
469 			m_create(p_scheduler, IMSG_QUEUE_HOLDQ_RELEASE, 0, 0, -1);
470 			if (imsg->hdr.type == IMSG_MTA_HOLDQ_RELEASE)
471 				m_add_int(p_scheduler, D_MTA);
472 			else
473 				m_add_int(p_scheduler, D_MDA);
474 			m_add_id(p_scheduler, holdq);
475 			m_add_int(p_scheduler, v);
476 			m_close(p_scheduler);
477 			return;
478 		}
479 	}
480 
481 	if (p->proc == PROC_CONTROL) {
482 		switch (imsg->hdr.type) {
483 		case IMSG_CTL_PAUSE_MDA:
484 		case IMSG_CTL_PAUSE_MTA:
485 		case IMSG_CTL_RESUME_MDA:
486 		case IMSG_CTL_RESUME_MTA:
487 			m_forward(p_scheduler, imsg);
488 			return;
489 
490 		case IMSG_CTL_VERBOSE:
491 			m_msg(&m, imsg);
492 			m_get_int(&m, &v);
493 			m_end(&m);
494 			log_verbose(v);
495 			return;
496 
497 		case IMSG_CTL_PROFILE:
498 			m_msg(&m, imsg);
499 			m_get_int(&m, &v);
500 			m_end(&m);
501 			profiling = v;
502 			return;
503 
504 		case IMSG_CTL_DISCOVER_EVPID:
505 			m_msg(&m, imsg);
506 			m_get_evpid(&m, &evpid);
507 			m_end(&m);
508 			if (queue_envelope_load(evpid, &evp) == 0) {
509 				log_warnx("queue: discover: failed to load "
510 				    "envelope %016" PRIx64, evpid);
511 				n_evp = 0;
512 				m_compose(p_control, imsg->hdr.type,
513 				    imsg->hdr.peerid, 0, -1,
514 				    &n_evp, sizeof n_evp);
515 				return;
516 			}
517 
518 			m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID,
519 			    0, 0, -1);
520 			m_add_envelope(p_scheduler, &evp);
521 			m_close(p_scheduler);
522 
523 			m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID,
524 			    0, 0, -1);
525 			m_add_msgid(p_scheduler, evpid_to_msgid(evpid));
526 			m_close(p_scheduler);
527 			n_evp = 1;
528 			m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid,
529 			    0, -1, &n_evp, sizeof n_evp);
530 			return;
531 
532 		case IMSG_CTL_DISCOVER_MSGID:
533 			m_msg(&m, imsg);
534 			m_get_msgid(&m, &msgid);
535 			m_end(&m);
536 			/* handle concurrent walk requests */
537 			wi = xcalloc(1, sizeof *wi, "queu_imsg");
538 			wi->msgid = msgid;
539 			wi->peerid = imsg->hdr.peerid;
540 			evtimer_set(&wi->ev, queue_msgid_walk, wi);
541 			tv.tv_sec = 0;
542 			tv.tv_usec = 10;
543 			evtimer_add(&wi->ev, &tv);
544 			return;
545 
546 		case IMSG_CTL_UNCORRUPT_MSGID:
547 			m_msg(&m, imsg);
548 			m_get_msgid(&m, &msgid);
549 			m_end(&m);
550 			ret = queue_message_uncorrupt(msgid);
551 			m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid,
552 			    0, -1, &ret, sizeof ret);
553 			return;
554 		}
555 	}
556 
557 	errx(1, "queue_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
558 }
559 
560 static void
561 queue_msgid_walk(int fd, short event, void *arg)
562 {
563 	struct envelope		 evp;
564 	struct timeval		 tv;
565 	struct msg_walkinfo	*wi = arg;
566 	int			 r;
567 
568 	r = queue_message_walk(&evp, wi->msgid, &wi->done, &wi->data);
569 	if (r == -1) {
570 		if (wi->n_evp) {
571 			m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID,
572 			    0, 0, -1);
573 			m_add_msgid(p_scheduler, wi->msgid);
574 			m_close(p_scheduler);
575 		}
576 
577 		m_compose(p_control, IMSG_CTL_DISCOVER_MSGID, wi->peerid, 0, -1,
578 		    &wi->n_evp, sizeof wi->n_evp);
579 		evtimer_del(&wi->ev);
580 		free(wi);
581 		return;
582 	}
583 
584 	if (r) {
585 		m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 0, 0, -1);
586 		m_add_envelope(p_scheduler, &evp);
587 		m_close(p_scheduler);
588 		wi->n_evp += 1;
589 	}
590 
591 	tv.tv_sec = 0;
592 	tv.tv_usec = 10;
593 	evtimer_set(&wi->ev, queue_msgid_walk, wi);
594 	evtimer_add(&wi->ev, &tv);
595 }
596 
597 static void
598 queue_bounce(struct envelope *e, struct delivery_bounce *d)
599 {
600 	struct envelope	b;
601 
602 	b = *e;
603 	b.type = D_BOUNCE;
604 	b.agent.bounce = *d;
605 	b.retry = 0;
606 	b.lasttry = 0;
607 	b.creation = time(NULL);
608 	b.expire = 3600 * 24 * 7;
609 
610 	if (e->dsn_notify & DSN_NEVER)
611 		return;
612 
613 	if (b.id == 0)
614 		log_warnx("warn: queue_bounce: evpid=0");
615 	if (evpid_to_msgid(b.id) == 0)
616 		log_warnx("warn: queue_bounce: msgid=0, evpid=%016"PRIx64,
617 			b.id);
618 	if (e->type == D_BOUNCE) {
619 		log_warnx("warn: queue: double bounce!");
620 	} else if (e->sender.user[0] == '\0') {
621 		log_warnx("warn: queue: no return path!");
622 	} else if (!queue_envelope_create(&b)) {
623 		log_warnx("warn: queue: cannot bounce!");
624 	} else {
625 		log_debug("debug: queue: bouncing evp:%016" PRIx64
626 		    " as evp:%016" PRIx64, e->id, b.id);
627 
628 		m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
629 		m_add_envelope(p_scheduler, &b);
630 		m_close(p_scheduler);
631 
632 		m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 0, 0, -1);
633 		m_add_msgid(p_scheduler, evpid_to_msgid(b.id));
634 		m_close(p_scheduler);
635 
636 		stat_increment("queue.bounce", 1);
637 	}
638 }
639 
640 static void
641 queue_shutdown(void)
642 {
643 	log_debug("debug: queue agent exiting");
644 	queue_close();
645 	_exit(0);
646 }
647 
648 int
649 queue(void)
650 {
651 	struct passwd	*pw;
652 	struct timeval	 tv;
653 	struct event	 ev_qload;
654 
655 	purge_config(PURGE_EVERYTHING);
656 
657 	if ((pw = getpwnam(SMTPD_QUEUE_USER)) == NULL)
658 		if ((pw = getpwnam(SMTPD_USER)) == NULL)
659 			fatalx("unknown user " SMTPD_USER);
660 
661 	env->sc_queue_flags |= QUEUE_EVPCACHE;
662 	env->sc_queue_evpcache_size = 1024;
663 
664 	if (chroot(PATH_SPOOL) == -1)
665 		fatal("queue: chroot");
666 	if (chdir("/") == -1)
667 		fatal("queue: chdir(\"/\")");
668 
669 	config_process(PROC_QUEUE);
670 
671 	if (env->sc_queue_flags & QUEUE_COMPRESSION)
672 		log_info("queue: queue compression enabled");
673 
674 	if (env->sc_queue_key) {
675 		if (!crypto_setup(env->sc_queue_key, strlen(env->sc_queue_key)))
676 			fatalx("crypto_setup: invalid key for queue encryption");
677 		log_info("queue: queue encryption enabled");
678 	}
679 
680 	if (setgroups(1, &pw->pw_gid) ||
681 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
682 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
683 		fatal("queue: cannot drop privileges");
684 
685 	imsg_callback = queue_imsg;
686 	event_init();
687 
688 	signal(SIGINT, SIG_IGN);
689 	signal(SIGTERM, SIG_IGN);
690 	signal(SIGPIPE, SIG_IGN);
691 	signal(SIGHUP, SIG_IGN);
692 
693 	config_peer(PROC_PARENT);
694 	config_peer(PROC_CONTROL);
695 	config_peer(PROC_LKA);
696 	config_peer(PROC_SCHEDULER);
697 	config_peer(PROC_PONY);
698 
699 	/* setup queue loading task */
700 	evtimer_set(&ev_qload, queue_timeout, &ev_qload);
701 	tv.tv_sec = 0;
702 	tv.tv_usec = 10;
703 	evtimer_add(&ev_qload, &tv);
704 
705 	if (pledge("stdio rpath wpath cpath flock recvfd sendfd", NULL) == -1)
706 		err(1, "pledge");
707 
708 	event_dispatch();
709 	fatalx("exited event loop");
710 
711 	return (0);
712 }
713 
714 static void
715 queue_timeout(int fd, short event, void *p)
716 {
717 	static uint32_t	 msgid = 0;
718 	struct envelope	 evp;
719 	struct event	*ev = p;
720 	struct timeval	 tv;
721 	int		 r;
722 
723 	r = queue_envelope_walk(&evp);
724 	if (r == -1) {
725 		if (msgid) {
726 			m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
727 			    0, 0, -1);
728 			m_add_msgid(p_scheduler, msgid);
729 			m_close(p_scheduler);
730 		}
731 		log_debug("debug: queue: done loading queue into scheduler");
732 		return;
733 	}
734 
735 	if (r) {
736 		if (msgid && evpid_to_msgid(evp.id) != msgid) {
737 			m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
738 			    0, 0, -1);
739 			m_add_msgid(p_scheduler, msgid);
740 			m_close(p_scheduler);
741 		}
742 		msgid = evpid_to_msgid(evp.id);
743 		m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
744 		m_add_envelope(p_scheduler, &evp);
745 		m_close(p_scheduler);
746 	}
747 
748 	tv.tv_sec = 0;
749 	tv.tv_usec = 10;
750 	evtimer_add(ev, &tv);
751 }
752 
753 static void
754 queue_log(const struct envelope *e, const char *prefix, const char *status)
755 {
756 	char rcpt[LINE_MAX];
757 
758 	(void)strlcpy(rcpt, "-", sizeof rcpt);
759 	if (strcmp(e->rcpt.user, e->dest.user) ||
760 	    strcmp(e->rcpt.domain, e->dest.domain))
761 		(void)snprintf(rcpt, sizeof rcpt, "%s@%s",
762 		    e->rcpt.user, e->rcpt.domain);
763 
764 	log_info("%s: %s for %016" PRIx64 ": from=<%s@%s>, to=<%s@%s>, "
765 	    "rcpt=<%s>, delay=%s, stat=%s",
766 	    e->type == D_MDA ? "delivery" : "relay",
767 	    prefix,
768 	    e->id, e->sender.user, e->sender.domain,
769 	    e->dest.user, e->dest.domain,
770 	    rcpt,
771 	    duration_to_text(time(NULL) - e->creation),
772 	    status);
773 }
774