xref: /openbsd-src/usr.sbin/smtpd/queue.c (revision fb8aa7497fded39583f40e800732f9c046411717)
1 /*	$OpenBSD: queue.c,v 1.178 2016/05/28 21:21:20 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
7  *
8  * Permission to use, copy, modify, and distribute this software for any
9  * purpose with or without fee is hereby granted, provided that the above
10  * copyright notice and this permission notice appear in all copies.
11  *
12  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
13  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
14  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
15  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
16  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
17  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
18  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
19  */
20 
21 #include <sys/types.h>
22 #include <sys/queue.h>
23 #include <sys/tree.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 
27 #include <err.h>
28 #include <event.h>
29 #include <imsg.h>
30 #include <inttypes.h>
31 #include <libgen.h>
32 #include <pwd.h>
33 #include <signal.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #include <limits.h>
40 
41 #include "smtpd.h"
42 #include "log.h"
43 
44 static void queue_imsg(struct mproc *, struct imsg *);
45 static void queue_timeout(int, short, void *);
46 static void queue_bounce(struct envelope *, struct delivery_bounce *);
47 static void queue_shutdown(void);
48 static void queue_sig_handler(int, short, void *);
49 static void queue_log(const struct envelope *, const char *, const char *);
50 static void queue_msgid_walk(int, short, void *);
51 
52 static size_t	flow_agent_hiwat = 10 * 1024 * 1024;
53 static size_t	flow_agent_lowat =   1 * 1024 * 1024;
54 static size_t	flow_scheduler_hiwat = 10 * 1024 * 1024;
55 static size_t	flow_scheduler_lowat = 1 * 1024 * 1024;
56 
57 #define LIMIT_AGENT	0x01
58 #define LIMIT_SCHEDULER	0x02
59 
60 static int limit = 0;
61 
62 static void
63 queue_imsg(struct mproc *p, struct imsg *imsg)
64 {
65 	struct delivery_bounce	 bounce;
66 	struct msg_walkinfo	*wi;
67 	struct timeval		 tv;
68 	struct bounce_req_msg	*req_bounce;
69 	struct envelope		 evp;
70 	struct msg		 m;
71 	const char		*reason;
72 	uint64_t		 reqid, evpid, holdq;
73 	uint32_t		 msgid;
74 	time_t			 nexttry;
75 	size_t			 n_evp;
76 	int			 fd, mta_ext, ret, v, flags, code;
77 
78 	memset(&bounce, 0, sizeof(struct delivery_bounce));
79 	if (p->proc == PROC_PONY) {
80 
81 		switch (imsg->hdr.type) {
82 		case IMSG_SMTP_MESSAGE_CREATE:
83 			m_msg(&m, imsg);
84 			m_get_id(&m, &reqid);
85 			m_end(&m);
86 
87 			ret = queue_message_create(&msgid);
88 
89 			m_create(p, IMSG_SMTP_MESSAGE_CREATE, 0, 0, -1);
90 			m_add_id(p, reqid);
91 			if (ret == 0)
92 				m_add_int(p, 0);
93 			else {
94 				m_add_int(p, 1);
95 				m_add_msgid(p, msgid);
96 			}
97 			m_close(p);
98 			return;
99 
100 		case IMSG_SMTP_MESSAGE_ROLLBACK:
101 			m_msg(&m, imsg);
102 			m_get_msgid(&m, &msgid);
103 			m_end(&m);
104 
105 			queue_message_delete(msgid);
106 
107 			m_create(p_scheduler, IMSG_QUEUE_MESSAGE_ROLLBACK,
108 			    0, 0, -1);
109 			m_add_msgid(p_scheduler, msgid);
110 			m_close(p_scheduler);
111 			return;
112 
113 		case IMSG_SMTP_MESSAGE_COMMIT:
114 			m_msg(&m, imsg);
115 			m_get_id(&m, &reqid);
116 			m_get_msgid(&m, &msgid);
117 			m_end(&m);
118 
119 			ret = queue_message_commit(msgid);
120 
121 			m_create(p, IMSG_SMTP_MESSAGE_COMMIT, 0, 0, -1);
122 			m_add_id(p, reqid);
123 			m_add_int(p, (ret == 0) ? 0 : 1);
124 			m_close(p);
125 
126 			if (ret) {
127 				m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
128 				    0, 0, -1);
129 				m_add_msgid(p_scheduler, msgid);
130 				m_close(p_scheduler);
131 			}
132 			return;
133 
134 		case IMSG_SMTP_MESSAGE_OPEN:
135 			m_msg(&m, imsg);
136 			m_get_id(&m, &reqid);
137 			m_get_msgid(&m, &msgid);
138 			m_end(&m);
139 
140 			fd = queue_message_fd_rw(msgid);
141 
142 			m_create(p, IMSG_SMTP_MESSAGE_OPEN, 0, 0, fd);
143 			m_add_id(p, reqid);
144 			m_add_int(p, (fd == -1) ? 0 : 1);
145 			m_close(p);
146 			return;
147 
148 		case IMSG_QUEUE_SMTP_SESSION:
149 			bounce_fd(imsg->fd);
150 			return;
151 		}
152 	}
153 
154 	if (p->proc == PROC_LKA) {
155 		switch (imsg->hdr.type) {
156 		case IMSG_LKA_ENVELOPE_SUBMIT:
157 			m_msg(&m, imsg);
158 			m_get_id(&m, &reqid);
159 			m_get_envelope(&m, &evp);
160 			m_end(&m);
161 
162 			if (evp.id == 0)
163 				log_warnx("warn: imsg_queue_submit_envelope: evpid=0");
164 			if (evpid_to_msgid(evp.id) == 0)
165 				log_warnx("warn: imsg_queue_submit_envelope: msgid=0, "
166 				    "evpid=%016"PRIx64, evp.id);
167 			ret = queue_envelope_create(&evp);
168 			m_create(p_pony, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
169 			m_add_id(p_pony, reqid);
170 			if (ret == 0)
171 				m_add_int(p_pony, 0);
172 			else {
173 				m_add_int(p_pony, 1);
174 				m_add_evpid(p_pony, evp.id);
175 			}
176 			m_close(p_pony);
177 			if (ret) {
178 				m_create(p_scheduler,
179 				    IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
180 				m_add_envelope(p_scheduler, &evp);
181 				m_close(p_scheduler);
182 
183 			}
184 			return;
185 
186 		case IMSG_LKA_ENVELOPE_COMMIT:
187 			m_msg(&m, imsg);
188 			m_get_id(&m, &reqid);
189 			m_end(&m);
190 			m_create(p_pony, IMSG_QUEUE_ENVELOPE_COMMIT, 0, 0, -1);
191 			m_add_id(p_pony, reqid);
192 			m_add_int(p_pony, 1);
193 			m_close(p_pony);
194 			return;
195 		}
196 	}
197 
198 	if (p->proc == PROC_SCHEDULER) {
199 		switch (imsg->hdr.type) {
200 		case IMSG_SCHED_ENVELOPE_REMOVE:
201 			m_msg(&m, imsg);
202 			m_get_evpid(&m, &evpid);
203 			m_end(&m);
204 
205 			m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1);
206 			m_add_evpid(p_scheduler, evpid);
207 			m_close(p_scheduler);
208 
209 			/* already removed by scheduler */
210 			if (queue_envelope_load(evpid, &evp) == 0)
211 				return;
212 
213 			queue_log(&evp, "Remove", "Removed by administrator");
214 			queue_envelope_delete(evpid);
215 			return;
216 
217 		case IMSG_SCHED_ENVELOPE_EXPIRE:
218 			m_msg(&m, imsg);
219 			m_get_evpid(&m, &evpid);
220 			m_end(&m);
221 
222 			m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1);
223 			m_add_evpid(p_scheduler, evpid);
224 			m_close(p_scheduler);
225 
226 			/* already removed by scheduler*/
227 			if (queue_envelope_load(evpid, &evp) == 0)
228 				return;
229 
230 			bounce.type = B_ERROR;
231 			envelope_set_errormsg(&evp, "Envelope expired");
232 			envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
233 			envelope_set_esc_code(&evp, ESC_DELIVERY_TIME_EXPIRED);
234 			queue_bounce(&evp, &bounce);
235 			queue_log(&evp, "Expire", "Envelope expired");
236 			queue_envelope_delete(evpid);
237 			return;
238 
239 		case IMSG_SCHED_ENVELOPE_BOUNCE:
240 			CHECK_IMSG_DATA_SIZE(imsg, sizeof *req_bounce);
241 			req_bounce = imsg->data;
242 			evpid = req_bounce->evpid;
243 
244 			if (queue_envelope_load(evpid, &evp) == 0) {
245 				log_warnx("queue: bounce: failed to load envelope");
246 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
247 				m_add_evpid(p_scheduler, evpid);
248 				m_add_u32(p_scheduler, 0); /* not in-flight */
249 				m_close(p_scheduler);
250 				return;
251 			}
252 			queue_bounce(&evp, &req_bounce->bounce);
253 			evp.lastbounce = req_bounce->timestamp;
254 			if (!queue_envelope_update(&evp))
255 				log_warnx("warn: could not update envelope %016"PRIx64, evpid);
256 			return;
257 
258 		case IMSG_SCHED_ENVELOPE_DELIVER:
259 			m_msg(&m, imsg);
260 			m_get_evpid(&m, &evpid);
261 			m_end(&m);
262 			if (queue_envelope_load(evpid, &evp) == 0) {
263 				log_warnx("queue: deliver: failed to load envelope");
264 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
265 				m_add_evpid(p_scheduler, evpid);
266 				m_add_u32(p_scheduler, 1); /* in-flight */
267 				m_close(p_scheduler);
268 				return;
269 			}
270 			evp.lasttry = time(NULL);
271 			m_create(p_pony, IMSG_QUEUE_DELIVER, 0, 0, -1);
272 			m_add_envelope(p_pony, &evp);
273 			m_close(p_pony);
274 			return;
275 
276 		case IMSG_SCHED_ENVELOPE_INJECT:
277 			m_msg(&m, imsg);
278 			m_get_evpid(&m, &evpid);
279 			m_end(&m);
280 			bounce_add(evpid);
281 			return;
282 
283 		case IMSG_SCHED_ENVELOPE_TRANSFER:
284 			m_msg(&m, imsg);
285 			m_get_evpid(&m, &evpid);
286 			m_end(&m);
287 			if (queue_envelope_load(evpid, &evp) == 0) {
288 				log_warnx("queue: failed to load envelope");
289 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
290 				m_add_evpid(p_scheduler, evpid);
291 				m_add_u32(p_scheduler, 1); /* in-flight */
292 				m_close(p_scheduler);
293 				return;
294 			}
295 			evp.lasttry = time(NULL);
296 			m_create(p_pony, IMSG_QUEUE_TRANSFER, 0, 0, -1);
297 			m_add_envelope(p_pony, &evp);
298 			m_close(p_pony);
299 			return;
300 
301 		case IMSG_CTL_LIST_ENVELOPES:
302 			if (imsg->hdr.len == sizeof imsg->hdr) {
303 				m_forward(p_control, imsg);
304 				return;
305 			}
306 
307 			m_msg(&m, imsg);
308 			m_get_evpid(&m, &evpid);
309 			m_get_int(&m, &flags);
310 			m_get_time(&m, &nexttry);
311 			m_end(&m);
312 
313 			if (queue_envelope_load(evpid, &evp) == 0)
314 				return; /* Envelope is gone, drop it */
315 
316 			/*
317 			 * XXX consistency: The envelope might already be on
318 			 * its way back to the scheduler.  We need to detect
319 			 * this properly and report that state.
320 			 */
321 			if (flags & EF_INFLIGHT) {
322 				/*
323 				 * Not exactly correct but pretty close: The
324 				 * value is not recorded on the envelope unless
325 				 * a tempfail occurs.
326 				 */
327 				evp.lasttry = nexttry;
328 			}
329 
330 			m_create(p_control, IMSG_CTL_LIST_ENVELOPES,
331 			    imsg->hdr.peerid, 0, -1);
332 			m_add_int(p_control, flags);
333 			m_add_time(p_control, nexttry);
334 			m_add_envelope(p_control, &evp);
335 			m_close(p_control);
336 			return;
337 		}
338 	}
339 
340 	if (p->proc == PROC_PONY) {
341 		switch (imsg->hdr.type) {
342 		case IMSG_MDA_OPEN_MESSAGE:
343 		case IMSG_MTA_OPEN_MESSAGE:
344 			m_msg(&m, imsg);
345 			m_get_id(&m, &reqid);
346 			m_get_msgid(&m, &msgid);
347 			m_end(&m);
348 			fd = queue_message_fd_r(msgid);
349 			m_create(p, imsg->hdr.type, 0, 0, fd);
350 			m_add_id(p, reqid);
351 			m_close(p);
352 			return;
353 
354 		case IMSG_MDA_DELIVERY_OK:
355 		case IMSG_MTA_DELIVERY_OK:
356 			m_msg(&m, imsg);
357 			m_get_evpid(&m, &evpid);
358 			if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK)
359 				m_get_int(&m, &mta_ext);
360 			m_end(&m);
361 			if (queue_envelope_load(evpid, &evp) == 0) {
362 				log_warn("queue: dsn: failed to load envelope");
363 				return;
364 			}
365 			if (evp.dsn_notify & DSN_SUCCESS) {
366 				bounce.type = B_DSN;
367 				bounce.dsn_ret = evp.dsn_ret;
368 				envelope_set_esc_class(&evp, ESC_STATUS_OK);
369 				if (imsg->hdr.type == IMSG_MDA_DELIVERY_OK)
370 					queue_bounce(&evp, &bounce);
371 				else if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK &&
372 				    (mta_ext & MTA_EXT_DSN) == 0) {
373 					bounce.mta_without_dsn = 1;
374 					queue_bounce(&evp, &bounce);
375 				}
376 			}
377 			queue_envelope_delete(evpid);
378 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_OK, 0, 0, -1);
379 			m_add_evpid(p_scheduler, evpid);
380 			m_close(p_scheduler);
381 			return;
382 
383 		case IMSG_MDA_DELIVERY_TEMPFAIL:
384 		case IMSG_MTA_DELIVERY_TEMPFAIL:
385 			m_msg(&m, imsg);
386 			m_get_evpid(&m, &evpid);
387 			m_get_string(&m, &reason);
388 			m_get_int(&m, &code);
389 			m_end(&m);
390 			if (queue_envelope_load(evpid, &evp) == 0) {
391 				log_warnx("queue: tempfail: failed to load envelope");
392 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
393 				m_add_evpid(p_scheduler, evpid);
394 				m_add_u32(p_scheduler, 1); /* in-flight */
395 				m_close(p_scheduler);
396 				return;
397 			}
398 			envelope_set_errormsg(&evp, "%s", reason);
399 			envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
400 			envelope_set_esc_code(&evp, code);
401 			evp.retry++;
402 			if (!queue_envelope_update(&evp))
403 				log_warnx("warn: could not update envelope %016"PRIx64, evpid);
404 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1);
405 			m_add_envelope(p_scheduler, &evp);
406 			m_close(p_scheduler);
407 			return;
408 
409 		case IMSG_MDA_DELIVERY_PERMFAIL:
410 		case IMSG_MTA_DELIVERY_PERMFAIL:
411 			m_msg(&m, imsg);
412 			m_get_evpid(&m, &evpid);
413 			m_get_string(&m, &reason);
414 			m_get_int(&m, &code);
415 			m_end(&m);
416 			if (queue_envelope_load(evpid, &evp) == 0) {
417 				log_warnx("queue: permfail: failed to load envelope");
418 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
419 				m_add_evpid(p_scheduler, evpid);
420 				m_add_u32(p_scheduler, 1); /* in-flight */
421 				m_close(p_scheduler);
422 				return;
423 			}
424 			bounce.type = B_ERROR;
425 			envelope_set_errormsg(&evp, "%s", reason);
426 			envelope_set_esc_class(&evp, ESC_STATUS_PERMFAIL);
427 			envelope_set_esc_code(&evp, code);
428 			queue_bounce(&evp, &bounce);
429 			queue_envelope_delete(evpid);
430 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1);
431 			m_add_evpid(p_scheduler, evpid);
432 			m_close(p_scheduler);
433 			return;
434 
435 		case IMSG_MDA_DELIVERY_LOOP:
436 		case IMSG_MTA_DELIVERY_LOOP:
437 			m_msg(&m, imsg);
438 			m_get_evpid(&m, &evpid);
439 			m_end(&m);
440 			if (queue_envelope_load(evpid, &evp) == 0) {
441 				log_warnx("queue: loop: failed to load envelope");
442 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
443 				m_add_evpid(p_scheduler, evpid);
444 				m_add_u32(p_scheduler, 1); /* in-flight */
445 				m_close(p_scheduler);
446 				return;
447 			}
448 			envelope_set_errormsg(&evp, "%s", "Loop detected");
449 			envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
450 			envelope_set_esc_code(&evp, ESC_ROUTING_LOOP_DETECTED);
451 			bounce.type = B_ERROR;
452 			queue_bounce(&evp, &bounce);
453 			queue_envelope_delete(evp.id);
454 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1);
455 			m_add_evpid(p_scheduler, evp.id);
456 			m_close(p_scheduler);
457 			return;
458 
459 		case IMSG_MTA_DELIVERY_HOLD:
460 		case IMSG_MDA_DELIVERY_HOLD:
461 			imsg->hdr.type = IMSG_QUEUE_HOLDQ_HOLD;
462 			m_forward(p_scheduler, imsg);
463 			return;
464 
465 		case IMSG_MTA_SCHEDULE:
466 			imsg->hdr.type = IMSG_QUEUE_ENVELOPE_SCHEDULE;
467 			m_forward(p_scheduler, imsg);
468 			return;
469 
470 		case IMSG_MTA_HOLDQ_RELEASE:
471 		case IMSG_MDA_HOLDQ_RELEASE:
472 			m_msg(&m, imsg);
473 			m_get_id(&m, &holdq);
474 			m_get_int(&m, &v);
475 			m_end(&m);
476 			m_create(p_scheduler, IMSG_QUEUE_HOLDQ_RELEASE, 0, 0, -1);
477 			if (imsg->hdr.type == IMSG_MTA_HOLDQ_RELEASE)
478 				m_add_int(p_scheduler, D_MTA);
479 			else
480 				m_add_int(p_scheduler, D_MDA);
481 			m_add_id(p_scheduler, holdq);
482 			m_add_int(p_scheduler, v);
483 			m_close(p_scheduler);
484 			return;
485 		}
486 	}
487 
488 	if (p->proc == PROC_CONTROL) {
489 		switch (imsg->hdr.type) {
490 		case IMSG_CTL_PAUSE_MDA:
491 		case IMSG_CTL_PAUSE_MTA:
492 		case IMSG_CTL_RESUME_MDA:
493 		case IMSG_CTL_RESUME_MTA:
494 			m_forward(p_scheduler, imsg);
495 			return;
496 
497 		case IMSG_CTL_VERBOSE:
498 			m_msg(&m, imsg);
499 			m_get_int(&m, &v);
500 			m_end(&m);
501 			log_verbose(v);
502 			return;
503 
504 		case IMSG_CTL_PROFILE:
505 			m_msg(&m, imsg);
506 			m_get_int(&m, &v);
507 			m_end(&m);
508 			profiling = v;
509 			return;
510 
511 		case IMSG_CTL_DISCOVER_EVPID:
512 			m_msg(&m, imsg);
513 			m_get_evpid(&m, &evpid);
514 			m_end(&m);
515 			if (queue_envelope_load(evpid, &evp) == 0) {
516 				log_warnx("queue: discover: failed to load "
517 				    "envelope %016" PRIx64, evpid);
518 				n_evp = 0;
519 				m_compose(p_control, imsg->hdr.type,
520 				    imsg->hdr.peerid, 0, -1,
521 				    &n_evp, sizeof n_evp);
522 				return;
523 			}
524 
525 			m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID,
526 			    0, 0, -1);
527 			m_add_envelope(p_scheduler, &evp);
528 			m_close(p_scheduler);
529 
530 			m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID,
531 			    0, 0, -1);
532 			m_add_msgid(p_scheduler, evpid_to_msgid(evpid));
533 			m_close(p_scheduler);
534 			n_evp = 1;
535 			m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid,
536 			    0, -1, &n_evp, sizeof n_evp);
537 			return;
538 
539 		case IMSG_CTL_DISCOVER_MSGID:
540 			m_msg(&m, imsg);
541 			m_get_msgid(&m, &msgid);
542 			m_end(&m);
543 			/* handle concurrent walk requests */
544 			wi = xcalloc(1, sizeof *wi, "queu_imsg");
545 			wi->msgid = msgid;
546 			wi->peerid = imsg->hdr.peerid;
547 			evtimer_set(&wi->ev, queue_msgid_walk, wi);
548 			tv.tv_sec = 0;
549 			tv.tv_usec = 10;
550 			evtimer_add(&wi->ev, &tv);
551 			return;
552 
553 		case IMSG_CTL_UNCORRUPT_MSGID:
554 			m_msg(&m, imsg);
555 			m_get_msgid(&m, &msgid);
556 			m_end(&m);
557 			ret = queue_message_uncorrupt(msgid);
558 			m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid,
559 			    0, -1, &ret, sizeof ret);
560 			return;
561 		}
562 	}
563 
564 	errx(1, "queue_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
565 }
566 
567 static void
568 queue_msgid_walk(int fd, short event, void *arg)
569 {
570 	struct envelope		 evp;
571 	struct timeval		 tv;
572 	struct msg_walkinfo	*wi = arg;
573 	int			 r;
574 
575 	r = queue_message_walk(&evp, wi->msgid, &wi->done, &wi->data);
576 	if (r == -1) {
577 		if (wi->n_evp) {
578 			m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID,
579 			    0, 0, -1);
580 			m_add_msgid(p_scheduler, wi->msgid);
581 			m_close(p_scheduler);
582 		}
583 
584 		m_compose(p_control, IMSG_CTL_DISCOVER_MSGID, wi->peerid, 0, -1,
585 		    &wi->n_evp, sizeof wi->n_evp);
586 		evtimer_del(&wi->ev);
587 		free(wi);
588 		return;
589 	}
590 
591 	if (r) {
592 		m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 0, 0, -1);
593 		m_add_envelope(p_scheduler, &evp);
594 		m_close(p_scheduler);
595 		wi->n_evp += 1;
596 	}
597 
598 	tv.tv_sec = 0;
599 	tv.tv_usec = 10;
600 	evtimer_set(&wi->ev, queue_msgid_walk, wi);
601 	evtimer_add(&wi->ev, &tv);
602 }
603 
604 static void
605 queue_bounce(struct envelope *e, struct delivery_bounce *d)
606 {
607 	struct envelope	b;
608 
609 	b = *e;
610 	b.type = D_BOUNCE;
611 	b.agent.bounce = *d;
612 	b.retry = 0;
613 	b.lasttry = 0;
614 	b.creation = time(NULL);
615 	b.expire = 3600 * 24 * 7;
616 
617 	if (e->dsn_notify & DSN_NEVER)
618 		return;
619 
620 	if (b.id == 0)
621 		log_warnx("warn: queue_bounce: evpid=0");
622 	if (evpid_to_msgid(b.id) == 0)
623 		log_warnx("warn: queue_bounce: msgid=0, evpid=%016"PRIx64,
624 			b.id);
625 	if (e->type == D_BOUNCE) {
626 		log_warnx("warn: queue: double bounce!");
627 	} else if (e->sender.user[0] == '\0') {
628 		log_warnx("warn: queue: no return path!");
629 	} else if (!queue_envelope_create(&b)) {
630 		log_warnx("warn: queue: cannot bounce!");
631 	} else {
632 		log_debug("debug: queue: bouncing evp:%016" PRIx64
633 		    " as evp:%016" PRIx64, e->id, b.id);
634 
635 		m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
636 		m_add_envelope(p_scheduler, &b);
637 		m_close(p_scheduler);
638 
639 		m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 0, 0, -1);
640 		m_add_msgid(p_scheduler, evpid_to_msgid(b.id));
641 		m_close(p_scheduler);
642 
643 		stat_increment("queue.bounce", 1);
644 	}
645 }
646 
647 static void
648 queue_sig_handler(int sig, short event, void *p)
649 {
650 	switch (sig) {
651 	case SIGINT:
652 	case SIGTERM:
653 		queue_shutdown();
654 		break;
655 	default:
656 		fatalx("queue_sig_handler: unexpected signal");
657 	}
658 }
659 
660 static void
661 queue_shutdown(void)
662 {
663 	log_info("info: queue handler exiting");
664 	queue_close();
665 	_exit(0);
666 }
667 
668 int
669 queue(void)
670 {
671 	struct passwd	*pw;
672 	struct timeval	 tv;
673 	struct event	 ev_qload;
674 	struct event	 ev_sigint;
675 	struct event	 ev_sigterm;
676 
677 	purge_config(PURGE_EVERYTHING);
678 
679 	if ((pw = getpwnam(SMTPD_QUEUE_USER)) == NULL)
680 		if ((pw = getpwnam(SMTPD_USER)) == NULL)
681 			fatalx("unknown user " SMTPD_USER);
682 
683 	env->sc_queue_flags |= QUEUE_EVPCACHE;
684 	env->sc_queue_evpcache_size = 1024;
685 
686 	if (chroot(PATH_SPOOL) == -1)
687 		fatal("queue: chroot");
688 	if (chdir("/") == -1)
689 		fatal("queue: chdir(\"/\")");
690 
691 	config_process(PROC_QUEUE);
692 
693 	if (env->sc_queue_flags & QUEUE_COMPRESSION)
694 		log_info("queue: queue compression enabled");
695 
696 	if (env->sc_queue_key) {
697 		if (!crypto_setup(env->sc_queue_key, strlen(env->sc_queue_key)))
698 			fatalx("crypto_setup: invalid key for queue encryption");
699 		log_info("queue: queue encryption enabled");
700 	}
701 
702 	if (setgroups(1, &pw->pw_gid) ||
703 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
704 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
705 		fatal("queue: cannot drop privileges");
706 
707 	imsg_callback = queue_imsg;
708 	event_init();
709 
710 	signal_set(&ev_sigint, SIGINT, queue_sig_handler, NULL);
711 	signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, NULL);
712 	signal_add(&ev_sigint, NULL);
713 	signal_add(&ev_sigterm, NULL);
714 	signal(SIGPIPE, SIG_IGN);
715 	signal(SIGHUP, SIG_IGN);
716 
717 	config_peer(PROC_PARENT);
718 	config_peer(PROC_CONTROL);
719 	config_peer(PROC_LKA);
720 	config_peer(PROC_SCHEDULER);
721 	config_peer(PROC_PONY);
722 	config_done();
723 
724 	/* setup queue loading task */
725 	evtimer_set(&ev_qload, queue_timeout, &ev_qload);
726 	tv.tv_sec = 0;
727 	tv.tv_usec = 10;
728 	evtimer_add(&ev_qload, &tv);
729 
730 	if (pledge("stdio rpath wpath cpath flock recvfd sendfd", NULL) == -1)
731 		err(1, "pledge");
732 
733 	if (event_dispatch() <  0)
734 		fatal("event_dispatch");
735 	queue_shutdown();
736 
737 	return (0);
738 }
739 
740 static void
741 queue_timeout(int fd, short event, void *p)
742 {
743 	static uint32_t	 msgid = 0;
744 	struct envelope	 evp;
745 	struct event	*ev = p;
746 	struct timeval	 tv;
747 	int		 r;
748 
749 	r = queue_envelope_walk(&evp);
750 	if (r == -1) {
751 		if (msgid) {
752 			m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
753 			    0, 0, -1);
754 			m_add_msgid(p_scheduler, msgid);
755 			m_close(p_scheduler);
756 		}
757 		log_debug("debug: queue: done loading queue into scheduler");
758 		return;
759 	}
760 
761 	if (r) {
762 		if (msgid && evpid_to_msgid(evp.id) != msgid) {
763 			m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
764 			    0, 0, -1);
765 			m_add_msgid(p_scheduler, msgid);
766 			m_close(p_scheduler);
767 		}
768 		msgid = evpid_to_msgid(evp.id);
769 		m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
770 		m_add_envelope(p_scheduler, &evp);
771 		m_close(p_scheduler);
772 	}
773 
774 	tv.tv_sec = 0;
775 	tv.tv_usec = 10;
776 	evtimer_add(ev, &tv);
777 }
778 
779 static void
780 queue_log(const struct envelope *e, const char *prefix, const char *status)
781 {
782 	char rcpt[LINE_MAX];
783 
784 	(void)strlcpy(rcpt, "-", sizeof rcpt);
785 	if (strcmp(e->rcpt.user, e->dest.user) ||
786 	    strcmp(e->rcpt.domain, e->dest.domain))
787 		(void)snprintf(rcpt, sizeof rcpt, "%s@%s",
788 		    e->rcpt.user, e->rcpt.domain);
789 
790 	log_info("%s: %s for %016" PRIx64 ": from=<%s@%s>, to=<%s@%s>, "
791 	    "rcpt=<%s>, delay=%s, stat=%s",
792 	    e->type == D_MDA ? "delivery" : "relay",
793 	    prefix,
794 	    e->id, e->sender.user, e->sender.domain,
795 	    e->dest.user, e->dest.domain,
796 	    rcpt,
797 	    duration_to_text(time(NULL) - e->creation),
798 	    status);
799 }
800 
801 void
802 queue_flow_control(void)
803 {
804 	size_t	bufsz;
805 	int	oldlimit = limit;
806 	int	set, unset;
807 
808 	bufsz = p_pony->bytes_queued;
809 	if (bufsz <= flow_agent_lowat)
810 		limit &= ~LIMIT_AGENT;
811 	else if (bufsz > flow_agent_hiwat)
812 		limit |= LIMIT_AGENT;
813 
814 	if (p_scheduler->bytes_queued <= flow_scheduler_lowat)
815 		limit &= ~LIMIT_SCHEDULER;
816 	else if (p_scheduler->bytes_queued > flow_scheduler_hiwat)
817 		limit |= LIMIT_SCHEDULER;
818 
819 	set = limit & (limit ^ oldlimit);
820 	unset = oldlimit & (limit ^ oldlimit);
821 
822 	if (set & LIMIT_SCHEDULER) {
823 		log_warnx("warn: queue: Hiwat reached on scheduler buffer: "
824 		    "suspending transfer, delivery and lookup input");
825 		mproc_disable(p_pony);
826 		mproc_disable(p_lka);
827 	}
828 	else if (unset & LIMIT_SCHEDULER) {
829 		log_warnx("warn: queue: Down to lowat on scheduler buffer: "
830 		    "resuming transfer, delivery and lookup input");
831 		mproc_enable(p_pony);
832 		mproc_enable(p_lka);
833 	}
834 
835 	if (set & LIMIT_AGENT) {
836 		log_warnx("warn: queue: Hiwat reached on transfer and delivery "
837 		    "buffers: suspending scheduler input");
838 		mproc_disable(p_scheduler);
839 	}
840 	else if (unset & LIMIT_AGENT) {
841 		log_warnx("warn: queue: Down to lowat on transfer and delivery "
842 		    "buffers: resuming scheduler input");
843 		mproc_enable(p_scheduler);
844 	}
845 }
846