xref: /openbsd-src/usr.sbin/smtpd/scheduler.c (revision d3140113bef2b86d3af61dd20c05a8630ff966c2)
1*d3140113Seric /*	$OpenBSD: scheduler.c,v 1.62 2021/06/14 17:58:16 eric Exp $	*/
2f6a38d58Sgilles 
3f6a38d58Sgilles /*
465c4fdfbSgilles  * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
54ba24d34Sgilles  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
64ba24d34Sgilles  * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
766bc57beSeric  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
8f6a38d58Sgilles  *
9f6a38d58Sgilles  * Permission to use, copy, modify, and distribute this software for any
10f6a38d58Sgilles  * purpose with or without fee is hereby granted, provided that the above
11f6a38d58Sgilles  * copyright notice and this permission notice appear in all copies.
12f6a38d58Sgilles  *
13f6a38d58Sgilles  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
14f6a38d58Sgilles  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15f6a38d58Sgilles  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
16f6a38d58Sgilles  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17f6a38d58Sgilles  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18f6a38d58Sgilles  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
19f6a38d58Sgilles  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20f6a38d58Sgilles  */
21f6a38d58Sgilles 
224ba24d34Sgilles #include <inttypes.h>
234ba24d34Sgilles #include <pwd.h>
244ba24d34Sgilles #include <signal.h>
254ba24d34Sgilles #include <unistd.h>
26f6a38d58Sgilles 
27f6a38d58Sgilles #include "smtpd.h"
28f6a38d58Sgilles #include "log.h"
29f6a38d58Sgilles 
3065c4fdfbSgilles static void scheduler_imsg(struct mproc *, struct imsg *);
314ba24d34Sgilles static void scheduler_shutdown(void);
324ba24d34Sgilles static void scheduler_reset_events(void);
334ba24d34Sgilles static void scheduler_timeout(int, short, void *);
34f6a38d58Sgilles 
354ba24d34Sgilles static struct scheduler_backend *backend = NULL;
36299c4efeSeric static struct event		 ev;
37acfdf0daSeric static size_t			 ninflight = 0;
38acfdf0daSeric static int			*types;
395eb26528Seric static uint64_t			*evpids;
405eb26528Seric static uint32_t			*msgids;
415eb26528Seric static struct evpstate		*state;
424ba24d34Sgilles 
434ba24d34Sgilles extern const char *backend_scheduler;
444ba24d34Sgilles 
454ba24d34Sgilles void
scheduler_imsg(struct mproc * p,struct imsg * imsg)4665c4fdfbSgilles scheduler_imsg(struct mproc *p, struct imsg *imsg)
47f6a38d58Sgilles {
4865c4fdfbSgilles 	struct bounce_req_msg	 req;
4965c4fdfbSgilles 	struct envelope		 evp;
504ba24d34Sgilles 	struct scheduler_info	 si;
5165c4fdfbSgilles 	struct msg		 m;
526dc81a07Seric 	uint64_t		 evpid, id, holdq;
535eb26528Seric 	uint32_t		 msgid;
54299c4efeSeric 	uint32_t       		 inflight;
554fe02f32Seric 	size_t			 n, i;
5665c4fdfbSgilles 	time_t			 timestamp;
577eed50e8Seric 	int			 v, r, type;
58f6a38d58Sgilles 
5943962b9cSeric 	if (imsg == NULL)
6043962b9cSeric 		scheduler_shutdown();
6143962b9cSeric 
624ba24d34Sgilles 	switch (imsg->hdr.type) {
63913395bcSeric 
64aa1d5973Seric 	case IMSG_QUEUE_ENVELOPE_SUBMIT:
6565c4fdfbSgilles 		m_msg(&m, imsg);
6665c4fdfbSgilles 		m_get_envelope(&m, &evp);
6765c4fdfbSgilles 		m_end(&m);
684744da7eSgilles 		log_trace(TRACE_SCHEDULER,
6965c4fdfbSgilles 		    "scheduler: inserting evp:%016" PRIx64, evp.id);
70d6b3bcf4Seric 		scheduler_info(&si, &evp);
71be215280Seric 		stat_increment("scheduler.envelope.incoming", 1);
72913395bcSeric 		backend->insert(&si);
73913395bcSeric 		return;
74913395bcSeric 
75aa1d5973Seric 	case IMSG_QUEUE_MESSAGE_COMMIT:
7665c4fdfbSgilles 		m_msg(&m, imsg);
7765c4fdfbSgilles 		m_get_msgid(&m, &msgid);
7865c4fdfbSgilles 		m_end(&m);
79913395bcSeric 		log_trace(TRACE_SCHEDULER,
8035e161d3Seric 		    "scheduler: committing msg:%08" PRIx32, msgid);
81be215280Seric 		n = backend->commit(msgid);
82be215280Seric 		stat_decrement("scheduler.envelope.incoming", n);
83be215280Seric 		stat_increment("scheduler.envelope", n);
84913395bcSeric 		scheduler_reset_events();
85913395bcSeric 		return;
86913395bcSeric 
87a9835440Ssunil 	case IMSG_QUEUE_DISCOVER_EVPID:
88a9835440Ssunil 		m_msg(&m, imsg);
89a9835440Ssunil 		m_get_envelope(&m, &evp);
90a9835440Ssunil 		m_end(&m);
91a9835440Ssunil 		r = backend->query(evp.id);
92a9835440Ssunil 		if (r) {
93a9835440Ssunil 			log_debug("debug: scheduler: evp:%016" PRIx64
94a9835440Ssunil 			    " already scheduled", evp.id);
95a9835440Ssunil 			return;
96a9835440Ssunil 		}
97a9835440Ssunil 		log_trace(TRACE_SCHEDULER,
98a9835440Ssunil 		    "scheduler: discovering evp:%016" PRIx64, evp.id);
99a9835440Ssunil 		scheduler_info(&si, &evp);
100a9835440Ssunil 		stat_increment("scheduler.envelope.incoming", 1);
101a9835440Ssunil 		backend->insert(&si);
102a9835440Ssunil 		return;
103a9835440Ssunil 
104a9835440Ssunil 	case IMSG_QUEUE_DISCOVER_MSGID:
105a9835440Ssunil 		m_msg(&m, imsg);
106a9835440Ssunil 		m_get_msgid(&m, &msgid);
107a9835440Ssunil 		m_end(&m);
108a9835440Ssunil 		r = backend->query(msgid);
109a9835440Ssunil 		if (r) {
110a9835440Ssunil 			log_debug("debug: scheduler: msgid:%08" PRIx32
111a9835440Ssunil 			    " already scheduled", msgid);
112a9835440Ssunil 			return;
113a9835440Ssunil 		}
114a9835440Ssunil 		log_trace(TRACE_SCHEDULER,
115a9835440Ssunil 		    "scheduler: committing msg:%08" PRIx32, msgid);
116a9835440Ssunil 		n = backend->commit(msgid);
117a9835440Ssunil 		stat_decrement("scheduler.envelope.incoming", n);
118a9835440Ssunil 		stat_increment("scheduler.envelope", n);
119a9835440Ssunil 		scheduler_reset_events();
120a9835440Ssunil 		return;
121a9835440Ssunil 
122aa1d5973Seric 	case IMSG_QUEUE_MESSAGE_ROLLBACK:
12365c4fdfbSgilles 		m_msg(&m, imsg);
12465c4fdfbSgilles 		m_get_msgid(&m, &msgid);
12565c4fdfbSgilles 		m_end(&m);
126913395bcSeric 		log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32,
127913395bcSeric 		    msgid);
128be215280Seric 		n = backend->rollback(msgid);
129be215280Seric 		stat_decrement("scheduler.envelope.incoming", n);
1304ba24d34Sgilles 		scheduler_reset_events();
1314ba24d34Sgilles 		return;
1324ba24d34Sgilles 
133aa1d5973Seric 	case IMSG_QUEUE_ENVELOPE_REMOVE:
134299c4efeSeric 		m_msg(&m, imsg);
135299c4efeSeric 		m_get_evpid(&m, &evpid);
136299c4efeSeric 		m_get_u32(&m, &inflight);
137299c4efeSeric 		m_end(&m);
138299c4efeSeric 		log_trace(TRACE_SCHEDULER,
139299c4efeSeric 		    "scheduler: queue requested removal of evp:%016" PRIx64,
140299c4efeSeric 		    evpid);
141299c4efeSeric 		stat_decrement("scheduler.envelope", 1);
142299c4efeSeric 		if (!inflight)
143299c4efeSeric 			backend->remove(evpid);
144a56bd6deSeric 		else {
145299c4efeSeric 			backend->delete(evpid);
146a56bd6deSeric 			ninflight -= 1;
147a56bd6deSeric 			stat_decrement("scheduler.envelope.inflight", 1);
148a56bd6deSeric 		}
149299c4efeSeric 
150299c4efeSeric 		scheduler_reset_events();
151299c4efeSeric 		return;
152299c4efeSeric 
153acfdf0daSeric 	case IMSG_QUEUE_ENVELOPE_ACK:
154acfdf0daSeric 		m_msg(&m, imsg);
155acfdf0daSeric 		m_get_evpid(&m, &evpid);
156acfdf0daSeric 		m_end(&m);
157acfdf0daSeric 		log_trace(TRACE_SCHEDULER,
158acfdf0daSeric 		    "scheduler: queue ack removal of evp:%016" PRIx64,
159acfdf0daSeric 		    evpid);
160acfdf0daSeric 		ninflight -= 1;
161acfdf0daSeric 		stat_decrement("scheduler.envelope.inflight", 1);
162acfdf0daSeric 		scheduler_reset_events();
163acfdf0daSeric 		return;
164acfdf0daSeric 
165aa1d5973Seric 	case IMSG_QUEUE_DELIVERY_OK:
16665c4fdfbSgilles 		m_msg(&m, imsg);
16765c4fdfbSgilles 		m_get_evpid(&m, &evpid);
16865c4fdfbSgilles 		m_end(&m);
1694744da7eSgilles 		log_trace(TRACE_SCHEDULER,
17065c4fdfbSgilles 		    "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid);
17165c4fdfbSgilles 		backend->delete(evpid);
172a56bd6deSeric 		ninflight -= 1;
173be215280Seric 		stat_increment("scheduler.delivery.ok", 1);
174be215280Seric 		stat_decrement("scheduler.envelope.inflight", 1);
175be215280Seric 		stat_decrement("scheduler.envelope", 1);
17666bc57beSeric 		scheduler_reset_events();
1774ba24d34Sgilles 		return;
1784ba24d34Sgilles 
179aa1d5973Seric 	case IMSG_QUEUE_DELIVERY_TEMPFAIL:
18065c4fdfbSgilles 		m_msg(&m, imsg);
18165c4fdfbSgilles 		m_get_envelope(&m, &evp);
18265c4fdfbSgilles 		m_end(&m);
1834744da7eSgilles 		log_trace(TRACE_SCHEDULER,
18465c4fdfbSgilles 		    "scheduler: updating evp:%016" PRIx64, evp.id);
185d6b3bcf4Seric 		scheduler_info(&si, &evp);
18666bc57beSeric 		backend->update(&si);
187a56bd6deSeric 		ninflight -= 1;
188be215280Seric 		stat_increment("scheduler.delivery.tempfail", 1);
189be215280Seric 		stat_decrement("scheduler.envelope.inflight", 1);
19065c4fdfbSgilles 
19165c4fdfbSgilles 		for (i = 0; i < MAX_BOUNCE_WARN; i++) {
19265c4fdfbSgilles 			if (env->sc_bounce_warn[i] == 0)
19365c4fdfbSgilles 				break;
19465c4fdfbSgilles 			timestamp = si.creation + env->sc_bounce_warn[i];
19565c4fdfbSgilles 			if (si.nexttry >= timestamp &&
19665c4fdfbSgilles 			    si.lastbounce < timestamp) {
19765c4fdfbSgilles 	    			req.evpid = evp.id;
19865c4fdfbSgilles 				req.timestamp = timestamp;
1995ed42fc8Ssunil 				req.bounce.type = B_DELAYED;
20065c4fdfbSgilles 				req.bounce.delay = env->sc_bounce_warn[i];
201a8e22235Sgilles 				req.bounce.ttl = si.ttl;
202aa1d5973Seric 				m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1,
20365c4fdfbSgilles 				    &req, sizeof req);
20465c4fdfbSgilles 				break;
20565c4fdfbSgilles 			}
20665c4fdfbSgilles 		}
2074ba24d34Sgilles 		scheduler_reset_events();
2084ba24d34Sgilles 		return;
2094ba24d34Sgilles 
210aa1d5973Seric 	case IMSG_QUEUE_DELIVERY_PERMFAIL:
21165c4fdfbSgilles 		m_msg(&m, imsg);
21265c4fdfbSgilles 		m_get_evpid(&m, &evpid);
21365c4fdfbSgilles 		m_end(&m);
2144744da7eSgilles 		log_trace(TRACE_SCHEDULER,
21565c4fdfbSgilles 		    "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid);
21665c4fdfbSgilles 		backend->delete(evpid);
217a56bd6deSeric 		ninflight -= 1;
218be215280Seric 		stat_increment("scheduler.delivery.permfail", 1);
219be215280Seric 		stat_decrement("scheduler.envelope.inflight", 1);
220be215280Seric 		stat_decrement("scheduler.envelope", 1);
22166bc57beSeric 		scheduler_reset_events();
2224ba24d34Sgilles 		return;
2234ba24d34Sgilles 
224aa1d5973Seric 	case IMSG_QUEUE_DELIVERY_LOOP:
22565c4fdfbSgilles 		m_msg(&m, imsg);
22665c4fdfbSgilles 		m_get_evpid(&m, &evpid);
22765c4fdfbSgilles 		m_end(&m);
228a7b2e833Seric 		log_trace(TRACE_SCHEDULER,
22965c4fdfbSgilles 		    "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid);
23065c4fdfbSgilles 		backend->delete(evpid);
231a56bd6deSeric 		ninflight -= 1;
232be215280Seric 		stat_increment("scheduler.delivery.loop", 1);
233be215280Seric 		stat_decrement("scheduler.envelope.inflight", 1);
234be215280Seric 		stat_decrement("scheduler.envelope", 1);
235a7b2e833Seric 		scheduler_reset_events();
236a7b2e833Seric 		return;
237a7b2e833Seric 
238aa1d5973Seric 	case IMSG_QUEUE_HOLDQ_HOLD:
2396dc81a07Seric 		m_msg(&m, imsg);
2406dc81a07Seric 		m_get_evpid(&m, &evpid);
2416dc81a07Seric 		m_get_id(&m, &holdq);
2426dc81a07Seric 		m_end(&m);
2436dc81a07Seric 		log_trace(TRACE_SCHEDULER,
2446dc81a07Seric 		    "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64,
2456dc81a07Seric 		    evpid, holdq);
2466dc81a07Seric 		backend->hold(evpid, holdq);
247a56bd6deSeric 		ninflight -= 1;
2486dc81a07Seric 		stat_decrement("scheduler.envelope.inflight", 1);
2496dc81a07Seric 		scheduler_reset_events();
2506dc81a07Seric 		return;
2516dc81a07Seric 
252aa1d5973Seric 	case IMSG_QUEUE_HOLDQ_RELEASE:
2536dc81a07Seric 		m_msg(&m, imsg);
2547eed50e8Seric 		m_get_int(&m, &type);
2556dc81a07Seric 		m_get_id(&m, &holdq);
2566dc81a07Seric 		m_get_int(&m, &r);
2576dc81a07Seric 		m_end(&m);
2586dc81a07Seric 		log_trace(TRACE_SCHEDULER,
2595c9fb78eSeric 		    "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")",
2607eed50e8Seric 		    r, type, holdq);
2617eed50e8Seric 		backend->release(type, holdq, r);
2626dc81a07Seric 		scheduler_reset_events();
2636dc81a07Seric 		return;
2646dc81a07Seric 
26565c4fdfbSgilles 	case IMSG_CTL_PAUSE_MDA:
266913395bcSeric 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mda");
2674ba24d34Sgilles 		env->sc_flags |= SMTPD_MDA_PAUSED;
2684ba24d34Sgilles 		return;
2694ba24d34Sgilles 
27065c4fdfbSgilles 	case IMSG_CTL_RESUME_MDA:
271913395bcSeric 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mda");
2724ba24d34Sgilles 		env->sc_flags &= ~SMTPD_MDA_PAUSED;
2734ba24d34Sgilles 		scheduler_reset_events();
2744ba24d34Sgilles 		return;
2754ba24d34Sgilles 
27665c4fdfbSgilles 	case IMSG_CTL_PAUSE_MTA:
277913395bcSeric 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mta");
2784ba24d34Sgilles 		env->sc_flags |= SMTPD_MTA_PAUSED;
2794ba24d34Sgilles 		return;
2804ba24d34Sgilles 
28165c4fdfbSgilles 	case IMSG_CTL_RESUME_MTA:
282913395bcSeric 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mta");
2834ba24d34Sgilles 		env->sc_flags &= ~SMTPD_MTA_PAUSED;
2844ba24d34Sgilles 		scheduler_reset_events();
2854ba24d34Sgilles 		return;
2864ba24d34Sgilles 
2874ba24d34Sgilles 	case IMSG_CTL_VERBOSE:
28865c4fdfbSgilles 		m_msg(&m, imsg);
28965c4fdfbSgilles 		m_get_int(&m, &v);
29065c4fdfbSgilles 		m_end(&m);
291871fc12cSreyk 		log_setverbose(v);
2924ba24d34Sgilles 		return;
2934ba24d34Sgilles 
294acfdf0daSeric 	case IMSG_CTL_PROFILE:
295acfdf0daSeric 		m_msg(&m, imsg);
296acfdf0daSeric 		m_get_int(&m, &v);
297acfdf0daSeric 		m_end(&m);
298acfdf0daSeric 		profiling = v;
299acfdf0daSeric 		return;
300acfdf0daSeric 
30165c4fdfbSgilles 	case IMSG_CTL_LIST_MESSAGES:
3024fe02f32Seric 		msgid = *(uint32_t *)(imsg->data);
3035eb26528Seric 		n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size);
30465c4fdfbSgilles 		m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1,
30565c4fdfbSgilles 		    msgids, n * sizeof (*msgids));
3064fe02f32Seric 		return;
3074fe02f32Seric 
30865c4fdfbSgilles 	case IMSG_CTL_LIST_ENVELOPES:
3094fe02f32Seric 		id = *(uint64_t *)(imsg->data);
3105eb26528Seric 		n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size);
3114fe02f32Seric 		for (i = 0; i < n; i++) {
31265c4fdfbSgilles 			m_create(p_queue, IMSG_CTL_LIST_ENVELOPES,
313299c4efeSeric 			    imsg->hdr.peerid, 0, -1);
31465c4fdfbSgilles 			m_add_evpid(p_queue, state[i].evpid);
31565c4fdfbSgilles 			m_add_int(p_queue, state[i].flags);
31665c4fdfbSgilles 			m_add_time(p_queue, state[i].time);
31765c4fdfbSgilles 			m_close(p_queue);
3184fe02f32Seric 		}
31965c4fdfbSgilles 		m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES,
32065c4fdfbSgilles 		    imsg->hdr.peerid, 0, -1, NULL, 0);
3214fe02f32Seric 		return;
3224fe02f32Seric 
32365c4fdfbSgilles 	case IMSG_CTL_SCHEDULE:
324913395bcSeric 		id = *(uint64_t *)(imsg->data);
3254fe02f32Seric 		if (id <= 0xffffffffL)
3264fe02f32Seric 			log_debug("debug: scheduler: "
3274fe02f32Seric 			    "scheduling msg:%08" PRIx64, id);
328416152afSchl 		else
3294fe02f32Seric 			log_debug("debug: scheduler: "
3304fe02f32Seric 			    "scheduling evp:%016" PRIx64, id);
3310fcb81a3Seric 		r = backend->schedule(id);
3324ba24d34Sgilles 		scheduler_reset_events();
3330fcb81a3Seric 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
3340fcb81a3Seric 		    0, -1, NULL, 0);
3354ba24d34Sgilles 		return;
3364ba24d34Sgilles 
33706e5d42dSeric 	case IMSG_QUEUE_ENVELOPE_SCHEDULE:
33835e161d3Seric 		id = *(uint64_t *)(imsg->data);
33935e161d3Seric 		backend->schedule(id);
34035e161d3Seric 		scheduler_reset_events();
34135e161d3Seric 		return;
34235e161d3Seric 
34365c4fdfbSgilles 	case IMSG_CTL_REMOVE:
344913395bcSeric 		id = *(uint64_t *)(imsg->data);
345913395bcSeric 		if (id <= 0xffffffffL)
3464fe02f32Seric 			log_debug("debug: scheduler: "
3474fe02f32Seric 			    "removing msg:%08" PRIx64, id);
348913395bcSeric 		else
3494fe02f32Seric 			log_debug("debug: scheduler: "
3504fe02f32Seric 			    "removing evp:%016" PRIx64, id);
3510fcb81a3Seric 		r = backend->remove(id);
3524ba24d34Sgilles 		scheduler_reset_events();
3530fcb81a3Seric 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
3540fcb81a3Seric 		    0, -1, NULL, 0);
3554ba24d34Sgilles 		return;
35635e161d3Seric 
35735e161d3Seric 	case IMSG_CTL_PAUSE_EVP:
35835e161d3Seric 		id = *(uint64_t *)(imsg->data);
35935e161d3Seric 		if (id <= 0xffffffffL)
36035e161d3Seric 			log_debug("debug: scheduler: "
36135e161d3Seric 			    "suspending msg:%08" PRIx64, id);
36235e161d3Seric 		else
36335e161d3Seric 			log_debug("debug: scheduler: "
36435e161d3Seric 			    "suspending evp:%016" PRIx64, id);
3650fcb81a3Seric 		r = backend->suspend(id);
36635e161d3Seric 		scheduler_reset_events();
3670fcb81a3Seric 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
3680fcb81a3Seric 		    0, -1, NULL, 0);
36935e161d3Seric 		return;
37035e161d3Seric 
37135e161d3Seric 	case IMSG_CTL_RESUME_EVP:
37235e161d3Seric 		id = *(uint64_t *)(imsg->data);
37335e161d3Seric 		if (id <= 0xffffffffL)
37435e161d3Seric 			log_debug("debug: scheduler: "
37535e161d3Seric 			    "resuming msg:%08" PRIx64, id);
37635e161d3Seric 		else
37735e161d3Seric 			log_debug("debug: scheduler: "
37835e161d3Seric 			    "resuming evp:%016" PRIx64, id);
3790fcb81a3Seric 		r = backend->resume(id);
38035e161d3Seric 		scheduler_reset_events();
3810fcb81a3Seric 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
3820fcb81a3Seric 		    0, -1, NULL, 0);
38335e161d3Seric 		return;
3844ba24d34Sgilles 	}
3854ba24d34Sgilles 
386ff01b044Seric 	fatalx("scheduler_imsg: unexpected %s imsg",
3874fe02f32Seric 	    imsg_to_str(imsg->hdr.type));
388f6a38d58Sgilles }
389f05a516fSgilles 
39066bc57beSeric static void
scheduler_shutdown(void)3914ba24d34Sgilles scheduler_shutdown(void)
3924ba24d34Sgilles {
39343962b9cSeric 	log_debug("debug: scheduler agent exiting");
3944ba24d34Sgilles 	_exit(0);
3954ba24d34Sgilles }
3964ba24d34Sgilles 
39766bc57beSeric static void
scheduler_reset_events(void)3984ba24d34Sgilles scheduler_reset_events(void)
3994ba24d34Sgilles {
4004ba24d34Sgilles 	struct timeval	 tv;
4014ba24d34Sgilles 
402299c4efeSeric 	evtimer_del(&ev);
4034ba24d34Sgilles 	tv.tv_sec = 0;
40465c4fdfbSgilles 	tv.tv_usec = 0;
405299c4efeSeric 	evtimer_add(&ev, &tv);
4064ba24d34Sgilles }
4074ba24d34Sgilles 
408b88ab68dSeric int
scheduler(void)4094ba24d34Sgilles scheduler(void)
4104ba24d34Sgilles {
4114ba24d34Sgilles 	struct passwd	*pw;
4124ba24d34Sgilles 
413299c4efeSeric 	backend = scheduler_backend_lookup(backend_scheduler);
414299c4efeSeric 	if (backend == NULL)
415ff01b044Seric 		fatalx("cannot find scheduler backend \"%s\"",
416299c4efeSeric 		    backend_scheduler);
417299c4efeSeric 
418a8e22235Sgilles 	purge_config(PURGE_EVERYTHING & ~PURGE_DISPATCHERS);
4194ba24d34Sgilles 
42011d04e02Seric 	if ((pw = getpwnam(SMTPD_USER)) == NULL)
42111d04e02Seric 		fatalx("unknown user " SMTPD_USER);
42211d04e02Seric 
423299c4efeSeric 	config_process(PROC_SCHEDULER);
424299c4efeSeric 
425acfdf0daSeric 	backend->init(backend_scheduler);
426299c4efeSeric 
42711d04e02Seric 	if (chroot(PATH_CHROOT) == -1)
4284ba24d34Sgilles 		fatal("scheduler: chroot");
4294ba24d34Sgilles 	if (chdir("/") == -1)
4304ba24d34Sgilles 		fatal("scheduler: chdir(\"/\")");
4314ba24d34Sgilles 
4324ba24d34Sgilles 	if (setgroups(1, &pw->pw_gid) ||
4334ba24d34Sgilles 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
4344ba24d34Sgilles 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
4354ba24d34Sgilles 		fatal("scheduler: cannot drop privileges");
4364ba24d34Sgilles 
437118c16f3Sgilles 	evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids);
438118c16f3Sgilles 	types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types);
439118c16f3Sgilles 	msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids);
440118c16f3Sgilles 	state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state);
4415eb26528Seric 
4424ba24d34Sgilles 	imsg_callback = scheduler_imsg;
4434ba24d34Sgilles 	event_init();
4444ba24d34Sgilles 
44543962b9cSeric 	signal(SIGINT, SIG_IGN);
44643962b9cSeric 	signal(SIGTERM, SIG_IGN);
4474ba24d34Sgilles 	signal(SIGPIPE, SIG_IGN);
4484ba24d34Sgilles 	signal(SIGHUP, SIG_IGN);
4494ba24d34Sgilles 
45065c4fdfbSgilles 	config_peer(PROC_CONTROL);
45165c4fdfbSgilles 	config_peer(PROC_QUEUE);
4524ba24d34Sgilles 
453299c4efeSeric 	evtimer_set(&ev, scheduler_timeout, NULL);
45466bc57beSeric 	scheduler_reset_events();
455575d9c9fSgilles 
456575d9c9fSgilles 	if (pledge("stdio", NULL) == -1)
457ff01b044Seric 		fatal("pledge");
458575d9c9fSgilles 
459f94528c3Seric 	event_dispatch();
460f94528c3Seric 	fatalx("exited event loop");
4614ba24d34Sgilles 
4624ba24d34Sgilles 	return (0);
4634ba24d34Sgilles }
4644ba24d34Sgilles 
46566bc57beSeric static void
scheduler_timeout(int fd,short event,void * p)4664ba24d34Sgilles scheduler_timeout(int fd, short event, void *p)
4674ba24d34Sgilles {
4684744da7eSgilles 	struct timeval		tv;
469acfdf0daSeric 	size_t			i;
470acfdf0daSeric 	size_t			d_inflight;
471acfdf0daSeric 	size_t			d_envelope;
472acfdf0daSeric 	size_t			d_removed;
473acfdf0daSeric 	size_t			d_expired;
474acfdf0daSeric 	size_t			d_updated;
475acfdf0daSeric 	size_t			count;
476acfdf0daSeric 	int			mask, r, delay;
4774744da7eSgilles 
47866bc57beSeric 	tv.tv_sec = 0;
4794ba24d34Sgilles 	tv.tv_usec = 0;
4804ba24d34Sgilles 
481acfdf0daSeric 	mask = SCHED_UPDATE;
48266bc57beSeric 
483acfdf0daSeric 	if (ninflight <  env->sc_scheduler_max_inflight) {
484acfdf0daSeric 		mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE;
485acfdf0daSeric 		if (!(env->sc_flags & SMTPD_MDA_PAUSED))
486acfdf0daSeric 			mask |= SCHED_MDA;
487acfdf0daSeric 		if (!(env->sc_flags & SMTPD_MTA_PAUSED))
488acfdf0daSeric 			mask |= SCHED_MTA;
489acfdf0daSeric 	}
490d6b3bcf4Seric 
491acfdf0daSeric 	count = env->sc_scheduler_max_schedule;
492d6b3bcf4Seric 
493acfdf0daSeric 	log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count);
494d6b3bcf4Seric 
495acfdf0daSeric 	r = backend->batch(mask, &delay, &count, evpids, types);
496bc53cd6aSeric 
497acfdf0daSeric 	log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count);
498acfdf0daSeric 
499acfdf0daSeric 	if (r < 0)
500acfdf0daSeric 		fatalx("scheduler: error in batch handler");
501acfdf0daSeric 
502acfdf0daSeric 	if (r == 0) {
503acfdf0daSeric 
504acfdf0daSeric 		if (delay < -1)
505acfdf0daSeric 			fatalx("scheduler: invalid delay %d", delay);
506acfdf0daSeric 
507acfdf0daSeric 		if (delay == -1) {
508acfdf0daSeric 			log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
509acfdf0daSeric 			return;
510acfdf0daSeric 		}
511acfdf0daSeric 
512acfdf0daSeric 		tv.tv_sec = delay;
513acfdf0daSeric 		tv.tv_usec = 0;
514acfdf0daSeric 		log_trace(TRACE_SCHEDULER,
515acfdf0daSeric 		    "scheduler: waiting for %s", duration_to_text(tv.tv_sec));
516acfdf0daSeric 		evtimer_add(&ev, &tv);
517acfdf0daSeric 		return;
518acfdf0daSeric 	}
519acfdf0daSeric 
520acfdf0daSeric 	d_inflight = 0;
521acfdf0daSeric 	d_envelope = 0;
522acfdf0daSeric 	d_removed = 0;
523acfdf0daSeric 	d_expired = 0;
524acfdf0daSeric 	d_updated = 0;
525acfdf0daSeric 
526acfdf0daSeric 	for (i = 0; i < count; i++) {
527acfdf0daSeric 		switch(types[i]) {
52866bc57beSeric 		case SCHED_REMOVE:
529acfdf0daSeric 			log_debug("debug: scheduler: evp:%016" PRIx64
530acfdf0daSeric 			    " removed", evpids[i]);
531acfdf0daSeric 			m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1);
532acfdf0daSeric 			m_add_evpid(p_queue, evpids[i]);
533acfdf0daSeric 			m_close(p_queue);
534acfdf0daSeric 			d_envelope += 1;
535acfdf0daSeric 			d_removed += 1;
536acfdf0daSeric 			d_inflight += 1;
53766bc57beSeric 			break;
53866bc57beSeric 
53966bc57beSeric 		case SCHED_EXPIRE:
540acfdf0daSeric 			log_debug("debug: scheduler: evp:%016" PRIx64
541acfdf0daSeric 			    " expired", evpids[i]);
542acfdf0daSeric 			m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1);
543acfdf0daSeric 			m_add_evpid(p_queue, evpids[i]);
544acfdf0daSeric 			m_close(p_queue);
545acfdf0daSeric 			d_envelope += 1;
546acfdf0daSeric 			d_expired += 1;
547acfdf0daSeric 			d_inflight += 1;
54866bc57beSeric 			break;
54966bc57beSeric 
55081ab3bccSeric 		case SCHED_UPDATE:
551acfdf0daSeric 			log_debug("debug: scheduler: evp:%016" PRIx64
552acfdf0daSeric 			    " scheduled (update)", evpids[i]);
553acfdf0daSeric 			d_updated += 1;
55481ab3bccSeric 			break;
55581ab3bccSeric 
55666bc57beSeric 		case SCHED_BOUNCE:
557acfdf0daSeric 			log_debug("debug: scheduler: evp:%016" PRIx64
558acfdf0daSeric 			    " scheduled (bounce)", evpids[i]);
559acfdf0daSeric 			m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1);
560acfdf0daSeric 			m_add_evpid(p_queue, evpids[i]);
561acfdf0daSeric 			m_close(p_queue);
562acfdf0daSeric 			d_inflight += 1;
56366bc57beSeric 			break;
56466bc57beSeric 
56566bc57beSeric 		case SCHED_MDA:
566acfdf0daSeric 			log_debug("debug: scheduler: evp:%016" PRIx64
567acfdf0daSeric 			    " scheduled (mda)", evpids[i]);
568acfdf0daSeric 			m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1);
569acfdf0daSeric 			m_add_evpid(p_queue, evpids[i]);
570acfdf0daSeric 			m_close(p_queue);
571acfdf0daSeric 			d_inflight += 1;
57266bc57beSeric 			break;
57366bc57beSeric 
57466bc57beSeric 		case SCHED_MTA:
575acfdf0daSeric 			log_debug("debug: scheduler: evp:%016" PRIx64
576acfdf0daSeric 			    " scheduled (mta)", evpids[i]);
577acfdf0daSeric 			m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1);
578acfdf0daSeric 			m_add_evpid(p_queue, evpids[i]);
579acfdf0daSeric 			m_close(p_queue);
580acfdf0daSeric 			d_inflight += 1;
581d6b3bcf4Seric 			break;
5824ba24d34Sgilles 		}
583acfdf0daSeric 	}
584d6b3bcf4Seric 
585acfdf0daSeric 	stat_decrement("scheduler.envelope", d_envelope);
586acfdf0daSeric 	stat_increment("scheduler.envelope.inflight", d_inflight);
587acfdf0daSeric 	stat_increment("scheduler.envelope.expired", d_expired);
588acfdf0daSeric 	stat_increment("scheduler.envelope.removed", d_removed);
589acfdf0daSeric 	stat_increment("scheduler.envelope.updated", d_updated);
590d6b3bcf4Seric 
591acfdf0daSeric 	ninflight += d_inflight;
592d6b3bcf4Seric 
593d6b3bcf4Seric 	tv.tv_sec = 0;
594d6b3bcf4Seric 	tv.tv_usec = 0;
595299c4efeSeric 	evtimer_add(&ev, &tv);
5964ba24d34Sgilles }
597