xref: /openbsd-src/usr.sbin/smtpd/scheduler.c (revision 50b7afb2c2c0993b0894d4e34bf857cb13ed9c80)
1 /*	$OpenBSD: scheduler.c,v 1.47 2014/07/10 14:45:02 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
7  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
8  *
9  * Permission to use, copy, modify, and distribute this software for any
10  * purpose with or without fee is hereby granted, provided that the above
11  * copyright notice and this permission notice appear in all copies.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
14  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
16  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
19  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20  */
21 
22 #include <sys/types.h>
23 #include <sys/queue.h>
24 #include <sys/tree.h>
25 #include <sys/socket.h>
26 #include <sys/stat.h>
27 
28 #include <ctype.h>
29 #include <dirent.h>
30 #include <err.h>
31 #include <errno.h>
32 #include <event.h>
33 #include <imsg.h>
34 #include <inttypes.h>
35 #include <libgen.h>
36 #include <pwd.h>
37 #include <signal.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <time.h>
42 #include <unistd.h>
43 
44 #include "smtpd.h"
45 #include "log.h"
46 
47 static void scheduler_imsg(struct mproc *, struct imsg *);
48 static void scheduler_shutdown(void);
49 static void scheduler_sig_handler(int, short, void *);
50 static void scheduler_reset_events(void);
51 static void scheduler_timeout(int, short, void *);
52 
53 static struct scheduler_backend *backend = NULL;
54 static struct event		 ev;
55 static size_t			 ninflight = 0;
56 static int			*types;
57 static uint64_t			*evpids;
58 static uint32_t			*msgids;
59 static struct evpstate		*state;
60 
61 extern const char *backend_scheduler;
62 
63 void
64 scheduler_imsg(struct mproc *p, struct imsg *imsg)
65 {
66 	struct bounce_req_msg	 req;
67 	struct envelope		 evp;
68 	struct scheduler_info	 si;
69 	struct msg		 m;
70 	uint64_t		 evpid, id, holdq;
71 	uint32_t		 msgid;
72 	uint32_t       		 inflight;
73 	size_t			 n, i;
74 	time_t			 timestamp;
75 	int			 v, r, type;
76 
77 	switch (imsg->hdr.type) {
78 
79 	case IMSG_QUEUE_ENVELOPE_SUBMIT:
80 		m_msg(&m, imsg);
81 		m_get_envelope(&m, &evp);
82 		m_end(&m);
83 		log_trace(TRACE_SCHEDULER,
84 		    "scheduler: inserting evp:%016" PRIx64, evp.id);
85 		scheduler_info(&si, &evp);
86 		stat_increment("scheduler.envelope.incoming", 1);
87 		backend->insert(&si);
88 		return;
89 
90 	case IMSG_QUEUE_MESSAGE_COMMIT:
91 		m_msg(&m, imsg);
92 		m_get_msgid(&m, &msgid);
93 		m_end(&m);
94 		log_trace(TRACE_SCHEDULER,
95 		    "scheduler: committing msg:%08" PRIx32, msgid);
96 		n = backend->commit(msgid);
97 		stat_decrement("scheduler.envelope.incoming", n);
98 		stat_increment("scheduler.envelope", n);
99 		scheduler_reset_events();
100 		return;
101 
102 	case IMSG_QUEUE_MESSAGE_ROLLBACK:
103 		m_msg(&m, imsg);
104 		m_get_msgid(&m, &msgid);
105 		m_end(&m);
106 		log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32,
107 		    msgid);
108 		n = backend->rollback(msgid);
109 		stat_decrement("scheduler.envelope.incoming", n);
110 		scheduler_reset_events();
111 		return;
112 
113 	case IMSG_QUEUE_ENVELOPE_REMOVE:
114 		m_msg(&m, imsg);
115 		m_get_evpid(&m, &evpid);
116 		m_get_u32(&m, &inflight);
117 		m_end(&m);
118 		log_trace(TRACE_SCHEDULER,
119 		    "scheduler: queue requested removal of evp:%016" PRIx64,
120 		    evpid);
121 		stat_decrement("scheduler.envelope", 1);
122 		if (! inflight)
123 			backend->remove(evpid);
124 		else {
125 			backend->delete(evpid);
126 			ninflight -= 1;
127 			stat_decrement("scheduler.envelope.inflight", 1);
128 		}
129 
130 		scheduler_reset_events();
131 		return;
132 
133 	case IMSG_QUEUE_ENVELOPE_ACK:
134 		m_msg(&m, imsg);
135 		m_get_evpid(&m, &evpid);
136 		m_end(&m);
137 		log_trace(TRACE_SCHEDULER,
138 		    "scheduler: queue ack removal of evp:%016" PRIx64,
139 		    evpid);
140 		ninflight -= 1;
141 		stat_decrement("scheduler.envelope.inflight", 1);
142 		scheduler_reset_events();
143 		return;
144 
145 	case IMSG_QUEUE_DELIVERY_OK:
146 		m_msg(&m, imsg);
147 		m_get_evpid(&m, &evpid);
148 		m_end(&m);
149 		log_trace(TRACE_SCHEDULER,
150 		    "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid);
151 		backend->delete(evpid);
152 		ninflight -= 1;
153 		stat_increment("scheduler.delivery.ok", 1);
154 		stat_decrement("scheduler.envelope.inflight", 1);
155 		stat_decrement("scheduler.envelope", 1);
156 		scheduler_reset_events();
157 		return;
158 
159 	case IMSG_QUEUE_DELIVERY_TEMPFAIL:
160 		m_msg(&m, imsg);
161 		m_get_envelope(&m, &evp);
162 		m_end(&m);
163 		log_trace(TRACE_SCHEDULER,
164 		    "scheduler: updating evp:%016" PRIx64, evp.id);
165 		scheduler_info(&si, &evp);
166 		backend->update(&si);
167 		ninflight -= 1;
168 		stat_increment("scheduler.delivery.tempfail", 1);
169 		stat_decrement("scheduler.envelope.inflight", 1);
170 
171 		for (i = 0; i < MAX_BOUNCE_WARN; i++) {
172 			if (env->sc_bounce_warn[i] == 0)
173 				break;
174 			timestamp = si.creation + env->sc_bounce_warn[i];
175 			if (si.nexttry >= timestamp &&
176 			    si.lastbounce < timestamp) {
177 	    			req.evpid = evp.id;
178 				req.timestamp = timestamp;
179 				req.bounce.type = B_WARNING;
180 				req.bounce.delay = env->sc_bounce_warn[i];
181 				req.bounce.expire = si.expire;
182 				m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1,
183 				    &req, sizeof req);
184 				break;
185 			}
186 		}
187 		scheduler_reset_events();
188 		return;
189 
190 	case IMSG_QUEUE_DELIVERY_PERMFAIL:
191 		m_msg(&m, imsg);
192 		m_get_evpid(&m, &evpid);
193 		m_end(&m);
194 		log_trace(TRACE_SCHEDULER,
195 		    "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid);
196 		backend->delete(evpid);
197 		ninflight -= 1;
198 		stat_increment("scheduler.delivery.permfail", 1);
199 		stat_decrement("scheduler.envelope.inflight", 1);
200 		stat_decrement("scheduler.envelope", 1);
201 		scheduler_reset_events();
202 		return;
203 
204 	case IMSG_QUEUE_DELIVERY_LOOP:
205 		m_msg(&m, imsg);
206 		m_get_evpid(&m, &evpid);
207 		m_end(&m);
208 		log_trace(TRACE_SCHEDULER,
209 		    "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid);
210 		backend->delete(evpid);
211 		ninflight -= 1;
212 		stat_increment("scheduler.delivery.loop", 1);
213 		stat_decrement("scheduler.envelope.inflight", 1);
214 		stat_decrement("scheduler.envelope", 1);
215 		scheduler_reset_events();
216 		return;
217 
218 	case IMSG_QUEUE_HOLDQ_HOLD:
219 		m_msg(&m, imsg);
220 		m_get_evpid(&m, &evpid);
221 		m_get_id(&m, &holdq);
222 		m_end(&m);
223 		log_trace(TRACE_SCHEDULER,
224 		    "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64,
225 		    evpid, holdq);
226 		backend->hold(evpid, holdq);
227 		ninflight -= 1;
228 		stat_decrement("scheduler.envelope.inflight", 1);
229 		scheduler_reset_events();
230 		return;
231 
232 	case IMSG_QUEUE_HOLDQ_RELEASE:
233 		m_msg(&m, imsg);
234 		m_get_int(&m, &type);
235 		m_get_id(&m, &holdq);
236 		m_get_int(&m, &r);
237 		m_end(&m);
238 		log_trace(TRACE_SCHEDULER,
239 		    "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")",
240 		    r, type, holdq);
241 		backend->release(type, holdq, r);
242 		scheduler_reset_events();
243 		return;
244 
245 	case IMSG_CTL_PAUSE_MDA:
246 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mda");
247 		env->sc_flags |= SMTPD_MDA_PAUSED;
248 		return;
249 
250 	case IMSG_CTL_RESUME_MDA:
251 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mda");
252 		env->sc_flags &= ~SMTPD_MDA_PAUSED;
253 		scheduler_reset_events();
254 		return;
255 
256 	case IMSG_CTL_PAUSE_MTA:
257 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mta");
258 		env->sc_flags |= SMTPD_MTA_PAUSED;
259 		return;
260 
261 	case IMSG_CTL_RESUME_MTA:
262 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mta");
263 		env->sc_flags &= ~SMTPD_MTA_PAUSED;
264 		scheduler_reset_events();
265 		return;
266 
267 	case IMSG_CTL_VERBOSE:
268 		m_msg(&m, imsg);
269 		m_get_int(&m, &v);
270 		m_end(&m);
271 		log_verbose(v);
272 		return;
273 
274 	case IMSG_CTL_PROFILE:
275 		m_msg(&m, imsg);
276 		m_get_int(&m, &v);
277 		m_end(&m);
278 		profiling = v;
279 		return;
280 
281 	case IMSG_CTL_LIST_MESSAGES:
282 		msgid = *(uint32_t *)(imsg->data);
283 		n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size);
284 		m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1,
285 		    msgids, n * sizeof (*msgids));
286 		return;
287 
288 	case IMSG_CTL_LIST_ENVELOPES:
289 		id = *(uint64_t *)(imsg->data);
290 		n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size);
291 		for (i = 0; i < n; i++) {
292 			m_create(p_queue, IMSG_CTL_LIST_ENVELOPES,
293 			    imsg->hdr.peerid, 0, -1);
294 			m_add_evpid(p_queue, state[i].evpid);
295 			m_add_int(p_queue, state[i].flags);
296 			m_add_time(p_queue, state[i].time);
297 			m_close(p_queue);
298 		}
299 		m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES,
300 		    imsg->hdr.peerid, 0, -1, NULL, 0);
301 		return;
302 
303 	case IMSG_CTL_SCHEDULE:
304 		id = *(uint64_t *)(imsg->data);
305 		if (id <= 0xffffffffL)
306 			log_debug("debug: scheduler: "
307 			    "scheduling msg:%08" PRIx64, id);
308 		else
309 			log_debug("debug: scheduler: "
310 			    "scheduling evp:%016" PRIx64, id);
311 		r = backend->schedule(id);
312 		scheduler_reset_events();
313 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
314 		    0, -1, NULL, 0);
315 		return;
316 
317 	case IMSG_QUEUE_ENVELOPE_SCHEDULE:
318 		id = *(uint64_t *)(imsg->data);
319 		backend->schedule(id);
320 		scheduler_reset_events();
321 		return;
322 
323 	case IMSG_CTL_REMOVE:
324 		id = *(uint64_t *)(imsg->data);
325 		if (id <= 0xffffffffL)
326 			log_debug("debug: scheduler: "
327 			    "removing msg:%08" PRIx64, id);
328 		else
329 			log_debug("debug: scheduler: "
330 			    "removing evp:%016" PRIx64, id);
331 		r = backend->remove(id);
332 		scheduler_reset_events();
333 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
334 		    0, -1, NULL, 0);
335 		return;
336 
337 	case IMSG_CTL_PAUSE_EVP:
338 		id = *(uint64_t *)(imsg->data);
339 		if (id <= 0xffffffffL)
340 			log_debug("debug: scheduler: "
341 			    "suspending msg:%08" PRIx64, id);
342 		else
343 			log_debug("debug: scheduler: "
344 			    "suspending evp:%016" PRIx64, id);
345 		r = backend->suspend(id);
346 		scheduler_reset_events();
347 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
348 		    0, -1, NULL, 0);
349 		return;
350 
351 	case IMSG_CTL_RESUME_EVP:
352 		id = *(uint64_t *)(imsg->data);
353 		if (id <= 0xffffffffL)
354 			log_debug("debug: scheduler: "
355 			    "resuming msg:%08" PRIx64, id);
356 		else
357 			log_debug("debug: scheduler: "
358 			    "resuming evp:%016" PRIx64, id);
359 		r = backend->resume(id);
360 		scheduler_reset_events();
361 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
362 		    0, -1, NULL, 0);
363 		return;
364 	}
365 
366 	errx(1, "scheduler_imsg: unexpected %s imsg",
367 	    imsg_to_str(imsg->hdr.type));
368 }
369 
370 static void
371 scheduler_sig_handler(int sig, short event, void *p)
372 {
373 	switch (sig) {
374 	case SIGINT:
375 	case SIGTERM:
376 		scheduler_shutdown();
377 		break;
378 	default:
379 		fatalx("scheduler_sig_handler: unexpected signal");
380 	}
381 }
382 
383 static void
384 scheduler_shutdown(void)
385 {
386 	log_info("info: scheduler handler exiting");
387 	_exit(0);
388 }
389 
390 static void
391 scheduler_reset_events(void)
392 {
393 	struct timeval	 tv;
394 
395 	evtimer_del(&ev);
396 	tv.tv_sec = 0;
397 	tv.tv_usec = 0;
398 	evtimer_add(&ev, &tv);
399 }
400 
401 pid_t
402 scheduler(void)
403 {
404 	pid_t		 pid;
405 	struct passwd	*pw;
406 	struct event	 ev_sigint;
407 	struct event	 ev_sigterm;
408 
409 	backend = scheduler_backend_lookup(backend_scheduler);
410 	if (backend == NULL)
411 		errx(1, "cannot find scheduler backend \"%s\"",
412 		    backend_scheduler);
413 
414 	switch (pid = fork()) {
415 	case -1:
416 		fatal("scheduler: cannot fork");
417 	case 0:
418 		post_fork(PROC_SCHEDULER);
419 		break;
420 	default:
421 		return (pid);
422 	}
423 
424 	purge_config(PURGE_EVERYTHING);
425 
426 	if ((pw = getpwnam(SMTPD_USER)) == NULL)
427 		fatalx("unknown user " SMTPD_USER);
428 
429 	config_process(PROC_SCHEDULER);
430 
431 	backend->init(backend_scheduler);
432 
433 	if (chroot(PATH_CHROOT) == -1)
434 		fatal("scheduler: chroot");
435 	if (chdir("/") == -1)
436 		fatal("scheduler: chdir(\"/\")");
437 
438 	if (setgroups(1, &pw->pw_gid) ||
439 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
440 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
441 		fatal("scheduler: cannot drop privileges");
442 
443 	evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids, "scheduler: init evpids");
444 	types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types, "scheduler: init types");
445 	msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids, "scheduler: list msg");
446 	state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state, "scheduler: list evp");
447 
448 	imsg_callback = scheduler_imsg;
449 	event_init();
450 
451 	signal_set(&ev_sigint, SIGINT, scheduler_sig_handler, NULL);
452 	signal_set(&ev_sigterm, SIGTERM, scheduler_sig_handler, NULL);
453 	signal_add(&ev_sigint, NULL);
454 	signal_add(&ev_sigterm, NULL);
455 	signal(SIGPIPE, SIG_IGN);
456 	signal(SIGHUP, SIG_IGN);
457 
458 	config_peer(PROC_CONTROL);
459 	config_peer(PROC_QUEUE);
460 	config_done();
461 
462 	evtimer_set(&ev, scheduler_timeout, NULL);
463 	scheduler_reset_events();
464 	if (event_dispatch() < 0)
465 		fatal("event_dispatch");
466 	scheduler_shutdown();
467 
468 	return (0);
469 }
470 
471 static void
472 scheduler_timeout(int fd, short event, void *p)
473 {
474 	struct timeval		tv;
475 	size_t			i;
476 	size_t			d_inflight;
477 	size_t			d_envelope;
478 	size_t			d_removed;
479 	size_t			d_expired;
480 	size_t			d_updated;
481 	size_t			count;
482 	int			mask, r, delay;
483 
484 	tv.tv_sec = 0;
485 	tv.tv_usec = 0;
486 
487 	mask = SCHED_UPDATE;
488 
489 	if (ninflight <  env->sc_scheduler_max_inflight) {
490 		mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE;
491 		if (!(env->sc_flags & SMTPD_MDA_PAUSED))
492 			mask |= SCHED_MDA;
493 		if (!(env->sc_flags & SMTPD_MTA_PAUSED))
494 			mask |= SCHED_MTA;
495 	}
496 
497 	count = env->sc_scheduler_max_schedule;
498 
499 	log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count);
500 
501 	r = backend->batch(mask, &delay, &count, evpids, types);
502 
503 	log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count);
504 
505 	if (r < 0)
506 		fatalx("scheduler: error in batch handler");
507 
508 	if (r == 0) {
509 
510 		if (delay < -1)
511 			fatalx("scheduler: invalid delay %d", delay);
512 
513 		if (delay == -1) {
514 			log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
515 			return;
516 		}
517 
518 		tv.tv_sec = delay;
519 		tv.tv_usec = 0;
520 		log_trace(TRACE_SCHEDULER,
521 		    "scheduler: waiting for %s", duration_to_text(tv.tv_sec));
522 		evtimer_add(&ev, &tv);
523 		return;
524 	}
525 
526 	d_inflight = 0;
527 	d_envelope = 0;
528 	d_removed = 0;
529 	d_expired = 0;
530 	d_updated = 0;
531 
532 	for (i = 0; i < count; i++) {
533 		switch(types[i]) {
534 		case SCHED_REMOVE:
535 			log_debug("debug: scheduler: evp:%016" PRIx64
536 			    " removed", evpids[i]);
537 			m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1);
538 			m_add_evpid(p_queue, evpids[i]);
539 			m_close(p_queue);
540 			d_envelope += 1;
541 			d_removed += 1;
542 			d_inflight += 1;
543 			break;
544 
545 		case SCHED_EXPIRE:
546 			log_debug("debug: scheduler: evp:%016" PRIx64
547 			    " expired", evpids[i]);
548 			m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1);
549 			m_add_evpid(p_queue, evpids[i]);
550 			m_close(p_queue);
551 			d_envelope += 1;
552 			d_expired += 1;
553 			d_inflight += 1;
554 			break;
555 
556 		case SCHED_UPDATE:
557 			log_debug("debug: scheduler: evp:%016" PRIx64
558 			    " scheduled (update)", evpids[i]);
559 			d_updated += 1;
560 			break;
561 
562 		case SCHED_BOUNCE:
563 			log_debug("debug: scheduler: evp:%016" PRIx64
564 			    " scheduled (bounce)", evpids[i]);
565 			m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1);
566 			m_add_evpid(p_queue, evpids[i]);
567 			m_close(p_queue);
568 			d_inflight += 1;
569 			break;
570 
571 		case SCHED_MDA:
572 			log_debug("debug: scheduler: evp:%016" PRIx64
573 			    " scheduled (mda)", evpids[i]);
574 			m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1);
575 			m_add_evpid(p_queue, evpids[i]);
576 			m_close(p_queue);
577 			d_inflight += 1;
578 			break;
579 
580 		case SCHED_MTA:
581 			log_debug("debug: scheduler: evp:%016" PRIx64
582 			    " scheduled (mta)", evpids[i]);
583 			m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1);
584 			m_add_evpid(p_queue, evpids[i]);
585 			m_close(p_queue);
586 			d_inflight += 1;
587 			break;
588 		}
589 	}
590 
591 	stat_decrement("scheduler.envelope", d_envelope);
592 	stat_increment("scheduler.envelope.inflight", d_inflight);
593 	stat_increment("scheduler.envelope.expired", d_expired);
594 	stat_increment("scheduler.envelope.removed", d_removed);
595 	stat_increment("scheduler.envelope.updated", d_updated);
596 
597 	ninflight += d_inflight;
598 
599 	tv.tv_sec = 0;
600 	tv.tv_usec = 0;
601 	evtimer_add(&ev, &tv);
602 }
603