xref: /openbsd-src/usr.sbin/smtpd/queue.c (revision 50b7afb2c2c0993b0894d4e34bf857cb13ed9c80)
1 /*	$OpenBSD: queue.c,v 1.165 2014/07/10 15:54:55 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
7  *
8  * Permission to use, copy, modify, and distribute this software for any
9  * purpose with or without fee is hereby granted, provided that the above
10  * copyright notice and this permission notice appear in all copies.
11  *
12  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
13  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
14  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
15  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
16  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
17  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
18  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
19  */
20 
21 #include <sys/types.h>
22 #include <sys/queue.h>
23 #include <sys/tree.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 
27 #include <err.h>
28 #include <event.h>
29 #include <imsg.h>
30 #include <inttypes.h>
31 #include <libgen.h>
32 #include <pwd.h>
33 #include <signal.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 
40 #include "smtpd.h"
41 #include "log.h"
42 
43 static void queue_imsg(struct mproc *, struct imsg *);
44 static void queue_timeout(int, short, void *);
45 static void queue_bounce(struct envelope *, struct delivery_bounce *);
46 static void queue_shutdown(void);
47 static void queue_sig_handler(int, short, void *);
48 static void queue_log(const struct envelope *, const char *, const char *);
49 
50 static size_t	flow_agent_hiwat = 10 * 1024 * 1024;
51 static size_t	flow_agent_lowat =   1 * 1024 * 1024;
52 static size_t	flow_scheduler_hiwat = 10 * 1024 * 1024;
53 static size_t	flow_scheduler_lowat = 1 * 1024 * 1024;
54 
55 #define LIMIT_AGENT	0x01
56 #define LIMIT_SCHEDULER	0x02
57 
58 static int limit = 0;
59 
60 static void
61 queue_imsg(struct mproc *p, struct imsg *imsg)
62 {
63 	struct delivery_bounce	 bounce;
64 	struct bounce_req_msg	*req_bounce;
65 	struct envelope		 evp;
66 	struct msg		 m;
67 	const char		*reason;
68 	uint64_t		 reqid, evpid, holdq;
69 	uint32_t		 msgid;
70 	time_t			 nexttry;
71 	int			 fd, mta_ext, ret, v, flags, code;
72 
73 	memset(&bounce, 0, sizeof(struct delivery_bounce));
74 	if (p->proc == PROC_PONY) {
75 
76 		switch (imsg->hdr.type) {
77 		case IMSG_SMTP_MESSAGE_CREATE:
78 			m_msg(&m, imsg);
79 			m_get_id(&m, &reqid);
80 			m_end(&m);
81 
82 			ret = queue_message_create(&msgid);
83 
84 			m_create(p, IMSG_SMTP_MESSAGE_CREATE, 0, 0, -1);
85 			m_add_id(p, reqid);
86 			if (ret == 0)
87 				m_add_int(p, 0);
88 			else {
89 				m_add_int(p, 1);
90 				m_add_msgid(p, msgid);
91 			}
92 			m_close(p);
93 			return;
94 
95 		case IMSG_SMTP_MESSAGE_ROLLBACK:
96 			m_msg(&m, imsg);
97 			m_get_msgid(&m, &msgid);
98 			m_end(&m);
99 
100 			queue_message_delete(msgid);
101 
102 			m_create(p_scheduler, IMSG_QUEUE_MESSAGE_ROLLBACK,
103 			    0, 0, -1);
104 			m_add_msgid(p_scheduler, msgid);
105 			m_close(p_scheduler);
106 			return;
107 
108 		case IMSG_SMTP_MESSAGE_COMMIT:
109 			m_msg(&m, imsg);
110 			m_get_id(&m, &reqid);
111 			m_get_msgid(&m, &msgid);
112 			m_end(&m);
113 
114 			ret = queue_message_commit(msgid);
115 
116 			m_create(p, IMSG_SMTP_MESSAGE_COMMIT, 0, 0, -1);
117 			m_add_id(p, reqid);
118 			m_add_int(p, (ret == 0) ? 0 : 1);
119 			m_close(p);
120 
121 			if (ret) {
122 				m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
123 				    0, 0, -1);
124 				m_add_msgid(p_scheduler, msgid);
125 				m_close(p_scheduler);
126 			}
127 			return;
128 
129 		case IMSG_SMTP_MESSAGE_OPEN:
130 			m_msg(&m, imsg);
131 			m_get_id(&m, &reqid);
132 			m_get_msgid(&m, &msgid);
133 			m_end(&m);
134 
135 			fd = queue_message_fd_rw(msgid);
136 
137 			m_create(p, IMSG_SMTP_MESSAGE_OPEN, 0, 0, fd);
138 			m_add_id(p, reqid);
139 			m_add_int(p, (fd == -1) ? 0 : 1);
140 			m_close(p);
141 			return;
142 
143 		case IMSG_QUEUE_SMTP_SESSION:
144 			bounce_fd(imsg->fd);
145 			return;
146 		}
147 	}
148 
149 	if (p->proc == PROC_LKA) {
150 		switch (imsg->hdr.type) {
151 		case IMSG_LKA_ENVELOPE_SUBMIT:
152 			m_msg(&m, imsg);
153 			m_get_id(&m, &reqid);
154 			m_get_envelope(&m, &evp);
155 			m_end(&m);
156 
157 			if (evp.id == 0)
158 				log_warnx("warn: imsg_queue_submit_envelope: evpid=0");
159 			if (evpid_to_msgid(evp.id) == 0)
160 				log_warnx("warn: imsg_queue_submit_envelope: msgid=0, "
161 				    "evpid=%016"PRIx64, evp.id);
162 			ret = queue_envelope_create(&evp);
163 			m_create(p_pony, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
164 			m_add_id(p_pony, reqid);
165 			if (ret == 0)
166 				m_add_int(p_pony, 0);
167 			else {
168 				m_add_int(p_pony, 1);
169 				m_add_evpid(p_pony, evp.id);
170 			}
171 			m_close(p_pony);
172 			if (ret) {
173 				m_create(p_scheduler,
174 				    IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
175 				m_add_envelope(p_scheduler, &evp);
176 				m_close(p_scheduler);
177 
178 			}
179 			return;
180 
181 		case IMSG_LKA_ENVELOPE_COMMIT:
182 			m_msg(&m, imsg);
183 			m_get_id(&m, &reqid);
184 			m_end(&m);
185 			m_create(p_pony, IMSG_QUEUE_ENVELOPE_COMMIT, 0, 0, -1);
186 			m_add_id(p_pony, reqid);
187 			m_add_int(p_pony, 1);
188 			m_close(p_pony);
189 			return;
190 		}
191 	}
192 
193 	if (p->proc == PROC_SCHEDULER) {
194 		switch (imsg->hdr.type) {
195 		case IMSG_SCHED_ENVELOPE_REMOVE:
196 			m_msg(&m, imsg);
197 			m_get_evpid(&m, &evpid);
198 			m_end(&m);
199 
200 			m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1);
201 			m_add_evpid(p_scheduler, evpid);
202 			m_close(p_scheduler);
203 
204 			/* already removed by scheduler */
205 			if (queue_envelope_load(evpid, &evp) == 0)
206 				return;
207 
208 			queue_log(&evp, "Remove", "Removed by administrator");
209 			queue_envelope_delete(evpid);
210 			return;
211 
212 		case IMSG_SCHED_ENVELOPE_EXPIRE:
213 			m_msg(&m, imsg);
214 			m_get_evpid(&m, &evpid);
215 			m_end(&m);
216 
217 			m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1);
218 			m_add_evpid(p_scheduler, evpid);
219 			m_close(p_scheduler);
220 
221 			/* already removed by scheduler*/
222 			if (queue_envelope_load(evpid, &evp) == 0)
223 				return;
224 
225 			bounce.type = B_ERROR;
226 			envelope_set_errormsg(&evp, "Envelope expired");
227 			envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
228 			envelope_set_esc_code(&evp, ESC_DELIVERY_TIME_EXPIRED);
229 			queue_bounce(&evp, &bounce);
230 			queue_log(&evp, "Expire", "Envelope expired");
231 			queue_envelope_delete(evpid);
232 			return;
233 
234 		case IMSG_SCHED_ENVELOPE_BOUNCE:
235 			req_bounce = imsg->data;
236 			evpid = req_bounce->evpid;
237 
238 			if (queue_envelope_load(evpid, &evp) == 0) {
239 				log_warnx("queue: bounce: failed to load envelope");
240 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
241 				m_add_evpid(p_scheduler, evpid);
242 				m_add_u32(p_scheduler, 0); /* not in-flight */
243 				m_close(p_scheduler);
244 				return;
245 			}
246 			queue_bounce(&evp, &req_bounce->bounce);
247 			evp.lastbounce = req_bounce->timestamp;
248 			if (!queue_envelope_update(&evp))
249 				log_warnx("warn: could not update envelope %016"PRIx64, evpid);
250 			return;
251 
252 		case IMSG_SCHED_ENVELOPE_DELIVER:
253 			m_msg(&m, imsg);
254 			m_get_evpid(&m, &evpid);
255 			m_end(&m);
256 			if (queue_envelope_load(evpid, &evp) == 0) {
257 				log_warnx("queue: deliver: failed to load envelope");
258 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
259 				m_add_evpid(p_scheduler, evpid);
260 				m_add_u32(p_scheduler, 1); /* in-flight */
261 				m_close(p_scheduler);
262 				return;
263 			}
264 			evp.lasttry = time(NULL);
265 			m_create(p_pony, IMSG_QUEUE_DELIVER, 0, 0, -1);
266 			m_add_envelope(p_pony, &evp);
267 			m_close(p_pony);
268 			return;
269 
270 		case IMSG_SCHED_ENVELOPE_INJECT:
271 			m_msg(&m, imsg);
272 			m_get_evpid(&m, &evpid);
273 			m_end(&m);
274 			bounce_add(evpid);
275 			return;
276 
277 		case IMSG_SCHED_ENVELOPE_TRANSFER:
278 			m_msg(&m, imsg);
279 			m_get_evpid(&m, &evpid);
280 			m_end(&m);
281 			if (queue_envelope_load(evpid, &evp) == 0) {
282 				log_warnx("queue: failed to load envelope");
283 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
284 				m_add_evpid(p_scheduler, evpid);
285 				m_add_u32(p_scheduler, 1); /* in-flight */
286 				m_close(p_scheduler);
287 				return;
288 			}
289 			evp.lasttry = time(NULL);
290 			m_create(p_pony, IMSG_QUEUE_TRANSFER, 0, 0, -1);
291 			m_add_envelope(p_pony, &evp);
292 			m_close(p_pony);
293 			return;
294 
295 		case IMSG_CTL_LIST_ENVELOPES:
296 			if (imsg->hdr.len == sizeof imsg->hdr) {
297 				m_forward(p_control, imsg);
298 				return;
299 			}
300 
301 			m_msg(&m, imsg);
302 			m_get_evpid(&m, &evpid);
303 			m_get_int(&m, &flags);
304 			m_get_time(&m, &nexttry);
305 			m_end(&m);
306 
307 			if (queue_envelope_load(evpid, &evp) == 0)
308 				return; /* Envelope is gone, drop it */
309 
310 			/*
311 			 * XXX consistency: The envelope might already be on
312 			 * its way back to the scheduler.  We need to detect
313 			 * this properly and report that state.
314 			 */
315 			evp.flags |= flags;
316 			/* In the past if running or runnable */
317 			evp.nexttry = nexttry;
318 			if (flags & EF_INFLIGHT) {
319 				/*
320 				 * Not exactly correct but pretty close: The
321 				 * value is not recorded on the envelope unless
322 				 * a tempfail occurs.
323 				 */
324 				evp.lasttry = nexttry;
325 			}
326 			m_compose(p_control, IMSG_CTL_LIST_ENVELOPES,
327 			    imsg->hdr.peerid, 0, -1, &evp, sizeof evp);
328 			return;
329 		}
330 	}
331 
332 	if (p->proc == PROC_PONY) {
333 		switch (imsg->hdr.type) {
334 		case IMSG_MDA_OPEN_MESSAGE:
335 		case IMSG_MTA_OPEN_MESSAGE:
336 			m_msg(&m, imsg);
337 			m_get_id(&m, &reqid);
338 			m_get_msgid(&m, &msgid);
339 			m_end(&m);
340 			fd = queue_message_fd_r(msgid);
341 			m_create(p, imsg->hdr.type, 0, 0, fd);
342 			m_add_id(p, reqid);
343 			m_close(p);
344 			return;
345 
346 		case IMSG_MDA_DELIVERY_OK:
347 		case IMSG_MTA_DELIVERY_OK:
348 			m_msg(&m, imsg);
349 			m_get_evpid(&m, &evpid);
350 			if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK)
351 				m_get_int(&m, &mta_ext);
352 			m_end(&m);
353 			if (queue_envelope_load(evpid, &evp) == 0) {
354 				log_warn("queue: dsn: failed to load envelope");
355 				return;
356 			}
357 			if (evp.dsn_notify & DSN_SUCCESS) {
358 				bounce.type = B_DSN;
359 				bounce.dsn_ret = evp.dsn_ret;
360 
361 				if (imsg->hdr.type == IMSG_MDA_DELIVERY_OK)
362 					queue_bounce(&evp, &bounce);
363 				else if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK &&
364 				    (mta_ext & MTA_EXT_DSN) == 0) {
365 					bounce.mta_without_dsn = 1;
366 					queue_bounce(&evp, &bounce);
367 				}
368 			}
369 			queue_envelope_delete(evpid);
370 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_OK, 0, 0, -1);
371 			m_add_evpid(p_scheduler, evpid);
372 			m_close(p_scheduler);
373 			return;
374 
375 		case IMSG_MDA_DELIVERY_TEMPFAIL:
376 		case IMSG_MTA_DELIVERY_TEMPFAIL:
377 			m_msg(&m, imsg);
378 			m_get_evpid(&m, &evpid);
379 			m_get_string(&m, &reason);
380 			m_get_int(&m, &code);
381 			m_end(&m);
382 			if (queue_envelope_load(evpid, &evp) == 0) {
383 				log_warnx("queue: tempfail: failed to load envelope");
384 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
385 				m_add_evpid(p_scheduler, evpid);
386 				m_add_u32(p_scheduler, 1); /* in-flight */
387 				m_close(p_scheduler);
388 				return;
389 			}
390 			envelope_set_errormsg(&evp, "%s", reason);
391 			envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
392 			envelope_set_esc_code(&evp, code);
393 			evp.retry++;
394 			if (!queue_envelope_update(&evp))
395 				log_warnx("warn: could not update envelope %016"PRIx64, evpid);
396 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1);
397 			m_add_envelope(p_scheduler, &evp);
398 			m_close(p_scheduler);
399 			return;
400 
401 		case IMSG_MDA_DELIVERY_PERMFAIL:
402 		case IMSG_MTA_DELIVERY_PERMFAIL:
403 			m_msg(&m, imsg);
404 			m_get_evpid(&m, &evpid);
405 			m_get_string(&m, &reason);
406 			m_get_int(&m, &code);
407 			m_end(&m);
408 			if (queue_envelope_load(evpid, &evp) == 0) {
409 				log_warnx("queue: permfail: failed to load envelope");
410 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
411 				m_add_evpid(p_scheduler, evpid);
412 				m_add_u32(p_scheduler, 1); /* in-flight */
413 				m_close(p_scheduler);
414 				return;
415 			}
416 			bounce.type = B_ERROR;
417 			envelope_set_errormsg(&evp, "%s", reason);
418 			envelope_set_esc_class(&evp, ESC_STATUS_PERMFAIL);
419 			envelope_set_esc_code(&evp, code);
420 			queue_bounce(&evp, &bounce);
421 			queue_envelope_delete(evpid);
422 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1);
423 			m_add_evpid(p_scheduler, evpid);
424 			m_close(p_scheduler);
425 			return;
426 
427 		case IMSG_MDA_DELIVERY_LOOP:
428 		case IMSG_MTA_DELIVERY_LOOP:
429 			m_msg(&m, imsg);
430 			m_get_evpid(&m, &evpid);
431 			m_end(&m);
432 			if (queue_envelope_load(evpid, &evp) == 0) {
433 				log_warnx("queue: loop: failed to load envelope");
434 				m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1);
435 				m_add_evpid(p_scheduler, evpid);
436 				m_add_u32(p_scheduler, 1); /* in-flight */
437 				m_close(p_scheduler);
438 				return;
439 			}
440 			envelope_set_errormsg(&evp, "%s", "Loop detected");
441 			envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL);
442 			envelope_set_esc_code(&evp, ESC_ROUTING_LOOP_DETECTED);
443 			bounce.type = B_ERROR;
444 			queue_bounce(&evp, &bounce);
445 			queue_envelope_delete(evp.id);
446 			m_create(p_scheduler, IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1);
447 			m_add_evpid(p_scheduler, evp.id);
448 			m_close(p_scheduler);
449 			return;
450 
451 		case IMSG_MTA_DELIVERY_HOLD:
452 		case IMSG_MDA_DELIVERY_HOLD:
453 			imsg->hdr.type = IMSG_QUEUE_HOLDQ_HOLD;
454 			m_forward(p_scheduler, imsg);
455 			return;
456 
457 		case IMSG_MTA_SCHEDULE:
458 			imsg->hdr.type = IMSG_QUEUE_ENVELOPE_SCHEDULE;
459 			m_forward(p_scheduler, imsg);
460 			return;
461 
462 		case IMSG_MTA_HOLDQ_RELEASE:
463 		case IMSG_MDA_HOLDQ_RELEASE:
464 			m_msg(&m, imsg);
465 			m_get_id(&m, &holdq);
466 			m_get_int(&m, &v);
467 			m_end(&m);
468 			m_create(p_scheduler, IMSG_QUEUE_HOLDQ_RELEASE, 0, 0, -1);
469 			if (imsg->hdr.type == IMSG_MTA_HOLDQ_RELEASE)
470 				m_add_int(p_scheduler, D_MTA);
471 			else
472 				m_add_int(p_scheduler, D_MDA);
473 			m_add_id(p_scheduler, holdq);
474 			m_add_int(p_scheduler, v);
475 			m_close(p_scheduler);
476 			return;
477 		}
478 	}
479 
480 	if (p->proc == PROC_CONTROL) {
481 		switch (imsg->hdr.type) {
482 		case IMSG_CTL_PAUSE_MDA:
483 		case IMSG_CTL_PAUSE_MTA:
484 		case IMSG_CTL_RESUME_MDA:
485 		case IMSG_CTL_RESUME_MTA:
486 			m_forward(p_scheduler, imsg);
487 			return;
488 
489 		case IMSG_CTL_VERBOSE:
490 			m_msg(&m, imsg);
491 			m_get_int(&m, &v);
492 			m_end(&m);
493 			log_verbose(v);
494 			return;
495 
496 		case IMSG_CTL_PROFILE:
497 			m_msg(&m, imsg);
498 			m_get_int(&m, &v);
499 			m_end(&m);
500 			profiling = v;
501 			return;
502 		}
503 	}
504 
505 	errx(1, "queue_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
506 }
507 
508 static void
509 queue_bounce(struct envelope *e, struct delivery_bounce *d)
510 {
511 	struct envelope	b;
512 
513 	b = *e;
514 	b.type = D_BOUNCE;
515 	b.agent.bounce = *d;
516 	b.retry = 0;
517 	b.lasttry = 0;
518 	b.creation = time(NULL);
519 	b.expire = 3600 * 24 * 7;
520 
521 	if (e->dsn_notify & DSN_NEVER)
522 		return;
523 
524 	if (b.id == 0)
525 		log_warnx("warn: queue_bounce: evpid=0");
526 	if (evpid_to_msgid(b.id) == 0)
527 		log_warnx("warn: queue_bounce: msgid=0, evpid=%016"PRIx64,
528 			b.id);
529 	if (e->type == D_BOUNCE) {
530 		log_warnx("warn: queue: double bounce!");
531 	} else if (e->sender.user[0] == '\0') {
532 		log_warnx("warn: queue: no return path!");
533 	} else if (!queue_envelope_create(&b)) {
534 		log_warnx("warn: queue: cannot bounce!");
535 	} else {
536 		log_debug("debug: queue: bouncing evp:%016" PRIx64
537 		    " as evp:%016" PRIx64, e->id, b.id);
538 
539 		m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
540 		m_add_envelope(p_scheduler, &b);
541 		m_close(p_scheduler);
542 
543 		m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 0, 0, -1);
544 		m_add_msgid(p_scheduler, evpid_to_msgid(b.id));
545 		m_close(p_scheduler);
546 
547 		stat_increment("queue.bounce", 1);
548 	}
549 }
550 
551 static void
552 queue_sig_handler(int sig, short event, void *p)
553 {
554 	switch (sig) {
555 	case SIGINT:
556 	case SIGTERM:
557 		queue_shutdown();
558 		break;
559 	default:
560 		fatalx("queue_sig_handler: unexpected signal");
561 	}
562 }
563 
564 static void
565 queue_shutdown(void)
566 {
567 	log_info("info: queue handler exiting");
568 	queue_close();
569 	_exit(0);
570 }
571 
572 pid_t
573 queue(void)
574 {
575 	pid_t		 pid;
576 	struct passwd	*pw;
577 	struct timeval	 tv;
578 	struct event	 ev_qload;
579 	struct event	 ev_sigint;
580 	struct event	 ev_sigterm;
581 
582 	switch (pid = fork()) {
583 	case -1:
584 		fatal("queue: cannot fork");
585 	case 0:
586 		post_fork(PROC_QUEUE);
587 		break;
588 	default:
589 		return (pid);
590 	}
591 
592 	purge_config(PURGE_EVERYTHING);
593 
594 	if ((pw = getpwnam(SMTPD_QUEUE_USER)) == NULL)
595 		if ((pw = getpwnam(SMTPD_USER)) == NULL)
596 			fatalx("unknown user " SMTPD_USER);
597 
598 	env->sc_queue_flags |= QUEUE_EVPCACHE;
599 	env->sc_queue_evpcache_size = 1024;
600 
601 	if (chroot(PATH_SPOOL) == -1)
602 		fatal("queue: chroot");
603 	if (chdir("/") == -1)
604 		fatal("queue: chdir(\"/\")");
605 
606 	config_process(PROC_QUEUE);
607 
608 	if (env->sc_queue_flags & QUEUE_COMPRESSION)
609 		log_info("queue: queue compression enabled");
610 
611 	if (env->sc_queue_key) {
612 		if (! crypto_setup(env->sc_queue_key, strlen(env->sc_queue_key)))
613 			fatalx("crypto_setup: invalid key for queue encryption");
614 		log_info("queue: queue encryption enabled");
615 	}
616 
617 	if (setgroups(1, &pw->pw_gid) ||
618 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
619 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
620 		fatal("queue: cannot drop privileges");
621 
622 	imsg_callback = queue_imsg;
623 	event_init();
624 
625 	signal_set(&ev_sigint, SIGINT, queue_sig_handler, NULL);
626 	signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, NULL);
627 	signal_add(&ev_sigint, NULL);
628 	signal_add(&ev_sigterm, NULL);
629 	signal(SIGPIPE, SIG_IGN);
630 	signal(SIGHUP, SIG_IGN);
631 
632 	config_peer(PROC_PARENT);
633 	config_peer(PROC_CONTROL);
634 	config_peer(PROC_LKA);
635 	config_peer(PROC_SCHEDULER);
636 	config_peer(PROC_PONY);
637 	config_done();
638 
639 	/* setup queue loading task */
640 	evtimer_set(&ev_qload, queue_timeout, &ev_qload);
641 	tv.tv_sec = 0;
642 	tv.tv_usec = 10;
643 	evtimer_add(&ev_qload, &tv);
644 
645 	if (event_dispatch() <  0)
646 		fatal("event_dispatch");
647 	queue_shutdown();
648 
649 	return (0);
650 }
651 
652 static void
653 queue_timeout(int fd, short event, void *p)
654 {
655 	static uint32_t	 msgid = 0;
656 	struct envelope	 evp;
657 	struct event	*ev = p;
658 	struct timeval	 tv;
659 	int		 r;
660 
661 	r = queue_envelope_walk(&evp);
662 	if (r == -1) {
663 		if (msgid) {
664 			m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
665 			    0, 0, -1);
666 			m_add_msgid(p_scheduler, msgid);
667 			m_close(p_scheduler);
668 		}
669 		log_debug("debug: queue: done loading queue into scheduler");
670 		return;
671 	}
672 
673 	if (r) {
674 		if (msgid && evpid_to_msgid(evp.id) != msgid) {
675 			m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT,
676 			    0, 0, -1);
677 			m_add_msgid(p_scheduler, msgid);
678 			m_close(p_scheduler);
679 		}
680 		msgid = evpid_to_msgid(evp.id);
681 		m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1);
682 		m_add_envelope(p_scheduler, &evp);
683 		m_close(p_scheduler);
684 	}
685 
686 	tv.tv_sec = 0;
687 	tv.tv_usec = 10;
688 	evtimer_add(ev, &tv);
689 }
690 
691 static void
692 queue_log(const struct envelope *e, const char *prefix, const char *status)
693 {
694 	char rcpt[SMTPD_MAXLINESIZE];
695 
696 	(void)strlcpy(rcpt, "-", sizeof rcpt);
697 	if (strcmp(e->rcpt.user, e->dest.user) ||
698 	    strcmp(e->rcpt.domain, e->dest.domain))
699 		(void)snprintf(rcpt, sizeof rcpt, "%s@%s",
700 		    e->rcpt.user, e->rcpt.domain);
701 
702 	log_info("%s: %s for %016" PRIx64 ": from=<%s@%s>, to=<%s@%s>, "
703 	    "rcpt=<%s>, delay=%s, stat=%s",
704 	    e->type == D_MDA ? "delivery" : "relay",
705 	    prefix,
706 	    e->id, e->sender.user, e->sender.domain,
707 	    e->dest.user, e->dest.domain,
708 	    rcpt,
709 	    duration_to_text(time(NULL) - e->creation),
710 	    status);
711 }
712 
713 void
714 queue_flow_control(void)
715 {
716 	size_t	bufsz;
717 	int	oldlimit = limit;
718 	int	set, unset;
719 
720 	bufsz = p_pony->bytes_queued;
721 	if (bufsz <= flow_agent_lowat)
722 		limit &= ~LIMIT_AGENT;
723 	else if (bufsz > flow_agent_hiwat)
724 		limit |= LIMIT_AGENT;
725 
726 	if (p_scheduler->bytes_queued <= flow_scheduler_lowat)
727 		limit &= ~LIMIT_SCHEDULER;
728 	else if (p_scheduler->bytes_queued > flow_scheduler_hiwat)
729 		limit |= LIMIT_SCHEDULER;
730 
731 	set = limit & (limit ^ oldlimit);
732 	unset = oldlimit & (limit ^ oldlimit);
733 
734 	if (set & LIMIT_SCHEDULER) {
735 		log_warnx("warn: queue: Hiwat reached on scheduler buffer: "
736 		    "suspending transfer, delivery and lookup input");
737 		mproc_disable(p_pony);
738 		mproc_disable(p_lka);
739 	}
740 	else if (unset & LIMIT_SCHEDULER) {
741 		log_warnx("warn: queue: Down to lowat on scheduler buffer: "
742 		    "resuming transfer, delivery and lookup input");
743 		mproc_enable(p_pony);
744 		mproc_enable(p_lka);
745 	}
746 
747 	if (set & LIMIT_AGENT) {
748 		log_warnx("warn: queue: Hiwat reached on transfer and delivery "
749 		    "buffers: suspending scheduler input");
750 		mproc_disable(p_scheduler);
751 	}
752 	else if (unset & LIMIT_AGENT) {
753 		log_warnx("warn: queue: Down to lowat on transfer and delivery "
754 		    "buffers: resuming scheduler input");
755 		mproc_enable(p_scheduler);
756 	}
757 }
758