1*d3140113Seric /* $OpenBSD: scheduler.c,v 1.62 2021/06/14 17:58:16 eric Exp $ */
2f6a38d58Sgilles
3f6a38d58Sgilles /*
465c4fdfbSgilles * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
54ba24d34Sgilles * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
64ba24d34Sgilles * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
766bc57beSeric * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
8f6a38d58Sgilles *
9f6a38d58Sgilles * Permission to use, copy, modify, and distribute this software for any
10f6a38d58Sgilles * purpose with or without fee is hereby granted, provided that the above
11f6a38d58Sgilles * copyright notice and this permission notice appear in all copies.
12f6a38d58Sgilles *
13f6a38d58Sgilles * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
14f6a38d58Sgilles * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15f6a38d58Sgilles * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
16f6a38d58Sgilles * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17f6a38d58Sgilles * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18f6a38d58Sgilles * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
19f6a38d58Sgilles * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20f6a38d58Sgilles */
21f6a38d58Sgilles
224ba24d34Sgilles #include <inttypes.h>
234ba24d34Sgilles #include <pwd.h>
244ba24d34Sgilles #include <signal.h>
254ba24d34Sgilles #include <unistd.h>
26f6a38d58Sgilles
27f6a38d58Sgilles #include "smtpd.h"
28f6a38d58Sgilles #include "log.h"
29f6a38d58Sgilles
3065c4fdfbSgilles static void scheduler_imsg(struct mproc *, struct imsg *);
314ba24d34Sgilles static void scheduler_shutdown(void);
324ba24d34Sgilles static void scheduler_reset_events(void);
334ba24d34Sgilles static void scheduler_timeout(int, short, void *);
34f6a38d58Sgilles
354ba24d34Sgilles static struct scheduler_backend *backend = NULL;
36299c4efeSeric static struct event ev;
37acfdf0daSeric static size_t ninflight = 0;
38acfdf0daSeric static int *types;
395eb26528Seric static uint64_t *evpids;
405eb26528Seric static uint32_t *msgids;
415eb26528Seric static struct evpstate *state;
424ba24d34Sgilles
434ba24d34Sgilles extern const char *backend_scheduler;
444ba24d34Sgilles
454ba24d34Sgilles void
scheduler_imsg(struct mproc * p,struct imsg * imsg)4665c4fdfbSgilles scheduler_imsg(struct mproc *p, struct imsg *imsg)
47f6a38d58Sgilles {
4865c4fdfbSgilles struct bounce_req_msg req;
4965c4fdfbSgilles struct envelope evp;
504ba24d34Sgilles struct scheduler_info si;
5165c4fdfbSgilles struct msg m;
526dc81a07Seric uint64_t evpid, id, holdq;
535eb26528Seric uint32_t msgid;
54299c4efeSeric uint32_t inflight;
554fe02f32Seric size_t n, i;
5665c4fdfbSgilles time_t timestamp;
577eed50e8Seric int v, r, type;
58f6a38d58Sgilles
5943962b9cSeric if (imsg == NULL)
6043962b9cSeric scheduler_shutdown();
6143962b9cSeric
624ba24d34Sgilles switch (imsg->hdr.type) {
63913395bcSeric
64aa1d5973Seric case IMSG_QUEUE_ENVELOPE_SUBMIT:
6565c4fdfbSgilles m_msg(&m, imsg);
6665c4fdfbSgilles m_get_envelope(&m, &evp);
6765c4fdfbSgilles m_end(&m);
684744da7eSgilles log_trace(TRACE_SCHEDULER,
6965c4fdfbSgilles "scheduler: inserting evp:%016" PRIx64, evp.id);
70d6b3bcf4Seric scheduler_info(&si, &evp);
71be215280Seric stat_increment("scheduler.envelope.incoming", 1);
72913395bcSeric backend->insert(&si);
73913395bcSeric return;
74913395bcSeric
75aa1d5973Seric case IMSG_QUEUE_MESSAGE_COMMIT:
7665c4fdfbSgilles m_msg(&m, imsg);
7765c4fdfbSgilles m_get_msgid(&m, &msgid);
7865c4fdfbSgilles m_end(&m);
79913395bcSeric log_trace(TRACE_SCHEDULER,
8035e161d3Seric "scheduler: committing msg:%08" PRIx32, msgid);
81be215280Seric n = backend->commit(msgid);
82be215280Seric stat_decrement("scheduler.envelope.incoming", n);
83be215280Seric stat_increment("scheduler.envelope", n);
84913395bcSeric scheduler_reset_events();
85913395bcSeric return;
86913395bcSeric
87a9835440Ssunil case IMSG_QUEUE_DISCOVER_EVPID:
88a9835440Ssunil m_msg(&m, imsg);
89a9835440Ssunil m_get_envelope(&m, &evp);
90a9835440Ssunil m_end(&m);
91a9835440Ssunil r = backend->query(evp.id);
92a9835440Ssunil if (r) {
93a9835440Ssunil log_debug("debug: scheduler: evp:%016" PRIx64
94a9835440Ssunil " already scheduled", evp.id);
95a9835440Ssunil return;
96a9835440Ssunil }
97a9835440Ssunil log_trace(TRACE_SCHEDULER,
98a9835440Ssunil "scheduler: discovering evp:%016" PRIx64, evp.id);
99a9835440Ssunil scheduler_info(&si, &evp);
100a9835440Ssunil stat_increment("scheduler.envelope.incoming", 1);
101a9835440Ssunil backend->insert(&si);
102a9835440Ssunil return;
103a9835440Ssunil
104a9835440Ssunil case IMSG_QUEUE_DISCOVER_MSGID:
105a9835440Ssunil m_msg(&m, imsg);
106a9835440Ssunil m_get_msgid(&m, &msgid);
107a9835440Ssunil m_end(&m);
108a9835440Ssunil r = backend->query(msgid);
109a9835440Ssunil if (r) {
110a9835440Ssunil log_debug("debug: scheduler: msgid:%08" PRIx32
111a9835440Ssunil " already scheduled", msgid);
112a9835440Ssunil return;
113a9835440Ssunil }
114a9835440Ssunil log_trace(TRACE_SCHEDULER,
115a9835440Ssunil "scheduler: committing msg:%08" PRIx32, msgid);
116a9835440Ssunil n = backend->commit(msgid);
117a9835440Ssunil stat_decrement("scheduler.envelope.incoming", n);
118a9835440Ssunil stat_increment("scheduler.envelope", n);
119a9835440Ssunil scheduler_reset_events();
120a9835440Ssunil return;
121a9835440Ssunil
122aa1d5973Seric case IMSG_QUEUE_MESSAGE_ROLLBACK:
12365c4fdfbSgilles m_msg(&m, imsg);
12465c4fdfbSgilles m_get_msgid(&m, &msgid);
12565c4fdfbSgilles m_end(&m);
126913395bcSeric log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32,
127913395bcSeric msgid);
128be215280Seric n = backend->rollback(msgid);
129be215280Seric stat_decrement("scheduler.envelope.incoming", n);
1304ba24d34Sgilles scheduler_reset_events();
1314ba24d34Sgilles return;
1324ba24d34Sgilles
133aa1d5973Seric case IMSG_QUEUE_ENVELOPE_REMOVE:
134299c4efeSeric m_msg(&m, imsg);
135299c4efeSeric m_get_evpid(&m, &evpid);
136299c4efeSeric m_get_u32(&m, &inflight);
137299c4efeSeric m_end(&m);
138299c4efeSeric log_trace(TRACE_SCHEDULER,
139299c4efeSeric "scheduler: queue requested removal of evp:%016" PRIx64,
140299c4efeSeric evpid);
141299c4efeSeric stat_decrement("scheduler.envelope", 1);
142299c4efeSeric if (!inflight)
143299c4efeSeric backend->remove(evpid);
144a56bd6deSeric else {
145299c4efeSeric backend->delete(evpid);
146a56bd6deSeric ninflight -= 1;
147a56bd6deSeric stat_decrement("scheduler.envelope.inflight", 1);
148a56bd6deSeric }
149299c4efeSeric
150299c4efeSeric scheduler_reset_events();
151299c4efeSeric return;
152299c4efeSeric
153acfdf0daSeric case IMSG_QUEUE_ENVELOPE_ACK:
154acfdf0daSeric m_msg(&m, imsg);
155acfdf0daSeric m_get_evpid(&m, &evpid);
156acfdf0daSeric m_end(&m);
157acfdf0daSeric log_trace(TRACE_SCHEDULER,
158acfdf0daSeric "scheduler: queue ack removal of evp:%016" PRIx64,
159acfdf0daSeric evpid);
160acfdf0daSeric ninflight -= 1;
161acfdf0daSeric stat_decrement("scheduler.envelope.inflight", 1);
162acfdf0daSeric scheduler_reset_events();
163acfdf0daSeric return;
164acfdf0daSeric
165aa1d5973Seric case IMSG_QUEUE_DELIVERY_OK:
16665c4fdfbSgilles m_msg(&m, imsg);
16765c4fdfbSgilles m_get_evpid(&m, &evpid);
16865c4fdfbSgilles m_end(&m);
1694744da7eSgilles log_trace(TRACE_SCHEDULER,
17065c4fdfbSgilles "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid);
17165c4fdfbSgilles backend->delete(evpid);
172a56bd6deSeric ninflight -= 1;
173be215280Seric stat_increment("scheduler.delivery.ok", 1);
174be215280Seric stat_decrement("scheduler.envelope.inflight", 1);
175be215280Seric stat_decrement("scheduler.envelope", 1);
17666bc57beSeric scheduler_reset_events();
1774ba24d34Sgilles return;
1784ba24d34Sgilles
179aa1d5973Seric case IMSG_QUEUE_DELIVERY_TEMPFAIL:
18065c4fdfbSgilles m_msg(&m, imsg);
18165c4fdfbSgilles m_get_envelope(&m, &evp);
18265c4fdfbSgilles m_end(&m);
1834744da7eSgilles log_trace(TRACE_SCHEDULER,
18465c4fdfbSgilles "scheduler: updating evp:%016" PRIx64, evp.id);
185d6b3bcf4Seric scheduler_info(&si, &evp);
18666bc57beSeric backend->update(&si);
187a56bd6deSeric ninflight -= 1;
188be215280Seric stat_increment("scheduler.delivery.tempfail", 1);
189be215280Seric stat_decrement("scheduler.envelope.inflight", 1);
19065c4fdfbSgilles
19165c4fdfbSgilles for (i = 0; i < MAX_BOUNCE_WARN; i++) {
19265c4fdfbSgilles if (env->sc_bounce_warn[i] == 0)
19365c4fdfbSgilles break;
19465c4fdfbSgilles timestamp = si.creation + env->sc_bounce_warn[i];
19565c4fdfbSgilles if (si.nexttry >= timestamp &&
19665c4fdfbSgilles si.lastbounce < timestamp) {
19765c4fdfbSgilles req.evpid = evp.id;
19865c4fdfbSgilles req.timestamp = timestamp;
1995ed42fc8Ssunil req.bounce.type = B_DELAYED;
20065c4fdfbSgilles req.bounce.delay = env->sc_bounce_warn[i];
201a8e22235Sgilles req.bounce.ttl = si.ttl;
202aa1d5973Seric m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1,
20365c4fdfbSgilles &req, sizeof req);
20465c4fdfbSgilles break;
20565c4fdfbSgilles }
20665c4fdfbSgilles }
2074ba24d34Sgilles scheduler_reset_events();
2084ba24d34Sgilles return;
2094ba24d34Sgilles
210aa1d5973Seric case IMSG_QUEUE_DELIVERY_PERMFAIL:
21165c4fdfbSgilles m_msg(&m, imsg);
21265c4fdfbSgilles m_get_evpid(&m, &evpid);
21365c4fdfbSgilles m_end(&m);
2144744da7eSgilles log_trace(TRACE_SCHEDULER,
21565c4fdfbSgilles "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid);
21665c4fdfbSgilles backend->delete(evpid);
217a56bd6deSeric ninflight -= 1;
218be215280Seric stat_increment("scheduler.delivery.permfail", 1);
219be215280Seric stat_decrement("scheduler.envelope.inflight", 1);
220be215280Seric stat_decrement("scheduler.envelope", 1);
22166bc57beSeric scheduler_reset_events();
2224ba24d34Sgilles return;
2234ba24d34Sgilles
224aa1d5973Seric case IMSG_QUEUE_DELIVERY_LOOP:
22565c4fdfbSgilles m_msg(&m, imsg);
22665c4fdfbSgilles m_get_evpid(&m, &evpid);
22765c4fdfbSgilles m_end(&m);
228a7b2e833Seric log_trace(TRACE_SCHEDULER,
22965c4fdfbSgilles "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid);
23065c4fdfbSgilles backend->delete(evpid);
231a56bd6deSeric ninflight -= 1;
232be215280Seric stat_increment("scheduler.delivery.loop", 1);
233be215280Seric stat_decrement("scheduler.envelope.inflight", 1);
234be215280Seric stat_decrement("scheduler.envelope", 1);
235a7b2e833Seric scheduler_reset_events();
236a7b2e833Seric return;
237a7b2e833Seric
238aa1d5973Seric case IMSG_QUEUE_HOLDQ_HOLD:
2396dc81a07Seric m_msg(&m, imsg);
2406dc81a07Seric m_get_evpid(&m, &evpid);
2416dc81a07Seric m_get_id(&m, &holdq);
2426dc81a07Seric m_end(&m);
2436dc81a07Seric log_trace(TRACE_SCHEDULER,
2446dc81a07Seric "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64,
2456dc81a07Seric evpid, holdq);
2466dc81a07Seric backend->hold(evpid, holdq);
247a56bd6deSeric ninflight -= 1;
2486dc81a07Seric stat_decrement("scheduler.envelope.inflight", 1);
2496dc81a07Seric scheduler_reset_events();
2506dc81a07Seric return;
2516dc81a07Seric
252aa1d5973Seric case IMSG_QUEUE_HOLDQ_RELEASE:
2536dc81a07Seric m_msg(&m, imsg);
2547eed50e8Seric m_get_int(&m, &type);
2556dc81a07Seric m_get_id(&m, &holdq);
2566dc81a07Seric m_get_int(&m, &r);
2576dc81a07Seric m_end(&m);
2586dc81a07Seric log_trace(TRACE_SCHEDULER,
2595c9fb78eSeric "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")",
2607eed50e8Seric r, type, holdq);
2617eed50e8Seric backend->release(type, holdq, r);
2626dc81a07Seric scheduler_reset_events();
2636dc81a07Seric return;
2646dc81a07Seric
26565c4fdfbSgilles case IMSG_CTL_PAUSE_MDA:
266913395bcSeric log_trace(TRACE_SCHEDULER, "scheduler: pausing mda");
2674ba24d34Sgilles env->sc_flags |= SMTPD_MDA_PAUSED;
2684ba24d34Sgilles return;
2694ba24d34Sgilles
27065c4fdfbSgilles case IMSG_CTL_RESUME_MDA:
271913395bcSeric log_trace(TRACE_SCHEDULER, "scheduler: resuming mda");
2724ba24d34Sgilles env->sc_flags &= ~SMTPD_MDA_PAUSED;
2734ba24d34Sgilles scheduler_reset_events();
2744ba24d34Sgilles return;
2754ba24d34Sgilles
27665c4fdfbSgilles case IMSG_CTL_PAUSE_MTA:
277913395bcSeric log_trace(TRACE_SCHEDULER, "scheduler: pausing mta");
2784ba24d34Sgilles env->sc_flags |= SMTPD_MTA_PAUSED;
2794ba24d34Sgilles return;
2804ba24d34Sgilles
28165c4fdfbSgilles case IMSG_CTL_RESUME_MTA:
282913395bcSeric log_trace(TRACE_SCHEDULER, "scheduler: resuming mta");
2834ba24d34Sgilles env->sc_flags &= ~SMTPD_MTA_PAUSED;
2844ba24d34Sgilles scheduler_reset_events();
2854ba24d34Sgilles return;
2864ba24d34Sgilles
2874ba24d34Sgilles case IMSG_CTL_VERBOSE:
28865c4fdfbSgilles m_msg(&m, imsg);
28965c4fdfbSgilles m_get_int(&m, &v);
29065c4fdfbSgilles m_end(&m);
291871fc12cSreyk log_setverbose(v);
2924ba24d34Sgilles return;
2934ba24d34Sgilles
294acfdf0daSeric case IMSG_CTL_PROFILE:
295acfdf0daSeric m_msg(&m, imsg);
296acfdf0daSeric m_get_int(&m, &v);
297acfdf0daSeric m_end(&m);
298acfdf0daSeric profiling = v;
299acfdf0daSeric return;
300acfdf0daSeric
30165c4fdfbSgilles case IMSG_CTL_LIST_MESSAGES:
3024fe02f32Seric msgid = *(uint32_t *)(imsg->data);
3035eb26528Seric n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size);
30465c4fdfbSgilles m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1,
30565c4fdfbSgilles msgids, n * sizeof (*msgids));
3064fe02f32Seric return;
3074fe02f32Seric
30865c4fdfbSgilles case IMSG_CTL_LIST_ENVELOPES:
3094fe02f32Seric id = *(uint64_t *)(imsg->data);
3105eb26528Seric n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size);
3114fe02f32Seric for (i = 0; i < n; i++) {
31265c4fdfbSgilles m_create(p_queue, IMSG_CTL_LIST_ENVELOPES,
313299c4efeSeric imsg->hdr.peerid, 0, -1);
31465c4fdfbSgilles m_add_evpid(p_queue, state[i].evpid);
31565c4fdfbSgilles m_add_int(p_queue, state[i].flags);
31665c4fdfbSgilles m_add_time(p_queue, state[i].time);
31765c4fdfbSgilles m_close(p_queue);
3184fe02f32Seric }
31965c4fdfbSgilles m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES,
32065c4fdfbSgilles imsg->hdr.peerid, 0, -1, NULL, 0);
3214fe02f32Seric return;
3224fe02f32Seric
32365c4fdfbSgilles case IMSG_CTL_SCHEDULE:
324913395bcSeric id = *(uint64_t *)(imsg->data);
3254fe02f32Seric if (id <= 0xffffffffL)
3264fe02f32Seric log_debug("debug: scheduler: "
3274fe02f32Seric "scheduling msg:%08" PRIx64, id);
328416152afSchl else
3294fe02f32Seric log_debug("debug: scheduler: "
3304fe02f32Seric "scheduling evp:%016" PRIx64, id);
3310fcb81a3Seric r = backend->schedule(id);
3324ba24d34Sgilles scheduler_reset_events();
3330fcb81a3Seric m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
3340fcb81a3Seric 0, -1, NULL, 0);
3354ba24d34Sgilles return;
3364ba24d34Sgilles
33706e5d42dSeric case IMSG_QUEUE_ENVELOPE_SCHEDULE:
33835e161d3Seric id = *(uint64_t *)(imsg->data);
33935e161d3Seric backend->schedule(id);
34035e161d3Seric scheduler_reset_events();
34135e161d3Seric return;
34235e161d3Seric
34365c4fdfbSgilles case IMSG_CTL_REMOVE:
344913395bcSeric id = *(uint64_t *)(imsg->data);
345913395bcSeric if (id <= 0xffffffffL)
3464fe02f32Seric log_debug("debug: scheduler: "
3474fe02f32Seric "removing msg:%08" PRIx64, id);
348913395bcSeric else
3494fe02f32Seric log_debug("debug: scheduler: "
3504fe02f32Seric "removing evp:%016" PRIx64, id);
3510fcb81a3Seric r = backend->remove(id);
3524ba24d34Sgilles scheduler_reset_events();
3530fcb81a3Seric m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
3540fcb81a3Seric 0, -1, NULL, 0);
3554ba24d34Sgilles return;
35635e161d3Seric
35735e161d3Seric case IMSG_CTL_PAUSE_EVP:
35835e161d3Seric id = *(uint64_t *)(imsg->data);
35935e161d3Seric if (id <= 0xffffffffL)
36035e161d3Seric log_debug("debug: scheduler: "
36135e161d3Seric "suspending msg:%08" PRIx64, id);
36235e161d3Seric else
36335e161d3Seric log_debug("debug: scheduler: "
36435e161d3Seric "suspending evp:%016" PRIx64, id);
3650fcb81a3Seric r = backend->suspend(id);
36635e161d3Seric scheduler_reset_events();
3670fcb81a3Seric m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
3680fcb81a3Seric 0, -1, NULL, 0);
36935e161d3Seric return;
37035e161d3Seric
37135e161d3Seric case IMSG_CTL_RESUME_EVP:
37235e161d3Seric id = *(uint64_t *)(imsg->data);
37335e161d3Seric if (id <= 0xffffffffL)
37435e161d3Seric log_debug("debug: scheduler: "
37535e161d3Seric "resuming msg:%08" PRIx64, id);
37635e161d3Seric else
37735e161d3Seric log_debug("debug: scheduler: "
37835e161d3Seric "resuming evp:%016" PRIx64, id);
3790fcb81a3Seric r = backend->resume(id);
38035e161d3Seric scheduler_reset_events();
3810fcb81a3Seric m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
3820fcb81a3Seric 0, -1, NULL, 0);
38335e161d3Seric return;
3844ba24d34Sgilles }
3854ba24d34Sgilles
386ff01b044Seric fatalx("scheduler_imsg: unexpected %s imsg",
3874fe02f32Seric imsg_to_str(imsg->hdr.type));
388f6a38d58Sgilles }
389f05a516fSgilles
39066bc57beSeric static void
scheduler_shutdown(void)3914ba24d34Sgilles scheduler_shutdown(void)
3924ba24d34Sgilles {
39343962b9cSeric log_debug("debug: scheduler agent exiting");
3944ba24d34Sgilles _exit(0);
3954ba24d34Sgilles }
3964ba24d34Sgilles
39766bc57beSeric static void
scheduler_reset_events(void)3984ba24d34Sgilles scheduler_reset_events(void)
3994ba24d34Sgilles {
4004ba24d34Sgilles struct timeval tv;
4014ba24d34Sgilles
402299c4efeSeric evtimer_del(&ev);
4034ba24d34Sgilles tv.tv_sec = 0;
40465c4fdfbSgilles tv.tv_usec = 0;
405299c4efeSeric evtimer_add(&ev, &tv);
4064ba24d34Sgilles }
4074ba24d34Sgilles
408b88ab68dSeric int
scheduler(void)4094ba24d34Sgilles scheduler(void)
4104ba24d34Sgilles {
4114ba24d34Sgilles struct passwd *pw;
4124ba24d34Sgilles
413299c4efeSeric backend = scheduler_backend_lookup(backend_scheduler);
414299c4efeSeric if (backend == NULL)
415ff01b044Seric fatalx("cannot find scheduler backend \"%s\"",
416299c4efeSeric backend_scheduler);
417299c4efeSeric
418a8e22235Sgilles purge_config(PURGE_EVERYTHING & ~PURGE_DISPATCHERS);
4194ba24d34Sgilles
42011d04e02Seric if ((pw = getpwnam(SMTPD_USER)) == NULL)
42111d04e02Seric fatalx("unknown user " SMTPD_USER);
42211d04e02Seric
423299c4efeSeric config_process(PROC_SCHEDULER);
424299c4efeSeric
425acfdf0daSeric backend->init(backend_scheduler);
426299c4efeSeric
42711d04e02Seric if (chroot(PATH_CHROOT) == -1)
4284ba24d34Sgilles fatal("scheduler: chroot");
4294ba24d34Sgilles if (chdir("/") == -1)
4304ba24d34Sgilles fatal("scheduler: chdir(\"/\")");
4314ba24d34Sgilles
4324ba24d34Sgilles if (setgroups(1, &pw->pw_gid) ||
4334ba24d34Sgilles setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
4344ba24d34Sgilles setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
4354ba24d34Sgilles fatal("scheduler: cannot drop privileges");
4364ba24d34Sgilles
437118c16f3Sgilles evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids);
438118c16f3Sgilles types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types);
439118c16f3Sgilles msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids);
440118c16f3Sgilles state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state);
4415eb26528Seric
4424ba24d34Sgilles imsg_callback = scheduler_imsg;
4434ba24d34Sgilles event_init();
4444ba24d34Sgilles
44543962b9cSeric signal(SIGINT, SIG_IGN);
44643962b9cSeric signal(SIGTERM, SIG_IGN);
4474ba24d34Sgilles signal(SIGPIPE, SIG_IGN);
4484ba24d34Sgilles signal(SIGHUP, SIG_IGN);
4494ba24d34Sgilles
45065c4fdfbSgilles config_peer(PROC_CONTROL);
45165c4fdfbSgilles config_peer(PROC_QUEUE);
4524ba24d34Sgilles
453299c4efeSeric evtimer_set(&ev, scheduler_timeout, NULL);
45466bc57beSeric scheduler_reset_events();
455575d9c9fSgilles
456575d9c9fSgilles if (pledge("stdio", NULL) == -1)
457ff01b044Seric fatal("pledge");
458575d9c9fSgilles
459f94528c3Seric event_dispatch();
460f94528c3Seric fatalx("exited event loop");
4614ba24d34Sgilles
4624ba24d34Sgilles return (0);
4634ba24d34Sgilles }
4644ba24d34Sgilles
46566bc57beSeric static void
scheduler_timeout(int fd,short event,void * p)4664ba24d34Sgilles scheduler_timeout(int fd, short event, void *p)
4674ba24d34Sgilles {
4684744da7eSgilles struct timeval tv;
469acfdf0daSeric size_t i;
470acfdf0daSeric size_t d_inflight;
471acfdf0daSeric size_t d_envelope;
472acfdf0daSeric size_t d_removed;
473acfdf0daSeric size_t d_expired;
474acfdf0daSeric size_t d_updated;
475acfdf0daSeric size_t count;
476acfdf0daSeric int mask, r, delay;
4774744da7eSgilles
47866bc57beSeric tv.tv_sec = 0;
4794ba24d34Sgilles tv.tv_usec = 0;
4804ba24d34Sgilles
481acfdf0daSeric mask = SCHED_UPDATE;
48266bc57beSeric
483acfdf0daSeric if (ninflight < env->sc_scheduler_max_inflight) {
484acfdf0daSeric mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE;
485acfdf0daSeric if (!(env->sc_flags & SMTPD_MDA_PAUSED))
486acfdf0daSeric mask |= SCHED_MDA;
487acfdf0daSeric if (!(env->sc_flags & SMTPD_MTA_PAUSED))
488acfdf0daSeric mask |= SCHED_MTA;
489acfdf0daSeric }
490d6b3bcf4Seric
491acfdf0daSeric count = env->sc_scheduler_max_schedule;
492d6b3bcf4Seric
493acfdf0daSeric log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count);
494d6b3bcf4Seric
495acfdf0daSeric r = backend->batch(mask, &delay, &count, evpids, types);
496bc53cd6aSeric
497acfdf0daSeric log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count);
498acfdf0daSeric
499acfdf0daSeric if (r < 0)
500acfdf0daSeric fatalx("scheduler: error in batch handler");
501acfdf0daSeric
502acfdf0daSeric if (r == 0) {
503acfdf0daSeric
504acfdf0daSeric if (delay < -1)
505acfdf0daSeric fatalx("scheduler: invalid delay %d", delay);
506acfdf0daSeric
507acfdf0daSeric if (delay == -1) {
508acfdf0daSeric log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
509acfdf0daSeric return;
510acfdf0daSeric }
511acfdf0daSeric
512acfdf0daSeric tv.tv_sec = delay;
513acfdf0daSeric tv.tv_usec = 0;
514acfdf0daSeric log_trace(TRACE_SCHEDULER,
515acfdf0daSeric "scheduler: waiting for %s", duration_to_text(tv.tv_sec));
516acfdf0daSeric evtimer_add(&ev, &tv);
517acfdf0daSeric return;
518acfdf0daSeric }
519acfdf0daSeric
520acfdf0daSeric d_inflight = 0;
521acfdf0daSeric d_envelope = 0;
522acfdf0daSeric d_removed = 0;
523acfdf0daSeric d_expired = 0;
524acfdf0daSeric d_updated = 0;
525acfdf0daSeric
526acfdf0daSeric for (i = 0; i < count; i++) {
527acfdf0daSeric switch(types[i]) {
52866bc57beSeric case SCHED_REMOVE:
529acfdf0daSeric log_debug("debug: scheduler: evp:%016" PRIx64
530acfdf0daSeric " removed", evpids[i]);
531acfdf0daSeric m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1);
532acfdf0daSeric m_add_evpid(p_queue, evpids[i]);
533acfdf0daSeric m_close(p_queue);
534acfdf0daSeric d_envelope += 1;
535acfdf0daSeric d_removed += 1;
536acfdf0daSeric d_inflight += 1;
53766bc57beSeric break;
53866bc57beSeric
53966bc57beSeric case SCHED_EXPIRE:
540acfdf0daSeric log_debug("debug: scheduler: evp:%016" PRIx64
541acfdf0daSeric " expired", evpids[i]);
542acfdf0daSeric m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1);
543acfdf0daSeric m_add_evpid(p_queue, evpids[i]);
544acfdf0daSeric m_close(p_queue);
545acfdf0daSeric d_envelope += 1;
546acfdf0daSeric d_expired += 1;
547acfdf0daSeric d_inflight += 1;
54866bc57beSeric break;
54966bc57beSeric
55081ab3bccSeric case SCHED_UPDATE:
551acfdf0daSeric log_debug("debug: scheduler: evp:%016" PRIx64
552acfdf0daSeric " scheduled (update)", evpids[i]);
553acfdf0daSeric d_updated += 1;
55481ab3bccSeric break;
55581ab3bccSeric
55666bc57beSeric case SCHED_BOUNCE:
557acfdf0daSeric log_debug("debug: scheduler: evp:%016" PRIx64
558acfdf0daSeric " scheduled (bounce)", evpids[i]);
559acfdf0daSeric m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1);
560acfdf0daSeric m_add_evpid(p_queue, evpids[i]);
561acfdf0daSeric m_close(p_queue);
562acfdf0daSeric d_inflight += 1;
56366bc57beSeric break;
56466bc57beSeric
56566bc57beSeric case SCHED_MDA:
566acfdf0daSeric log_debug("debug: scheduler: evp:%016" PRIx64
567acfdf0daSeric " scheduled (mda)", evpids[i]);
568acfdf0daSeric m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1);
569acfdf0daSeric m_add_evpid(p_queue, evpids[i]);
570acfdf0daSeric m_close(p_queue);
571acfdf0daSeric d_inflight += 1;
57266bc57beSeric break;
57366bc57beSeric
57466bc57beSeric case SCHED_MTA:
575acfdf0daSeric log_debug("debug: scheduler: evp:%016" PRIx64
576acfdf0daSeric " scheduled (mta)", evpids[i]);
577acfdf0daSeric m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1);
578acfdf0daSeric m_add_evpid(p_queue, evpids[i]);
579acfdf0daSeric m_close(p_queue);
580acfdf0daSeric d_inflight += 1;
581d6b3bcf4Seric break;
5824ba24d34Sgilles }
583acfdf0daSeric }
584d6b3bcf4Seric
585acfdf0daSeric stat_decrement("scheduler.envelope", d_envelope);
586acfdf0daSeric stat_increment("scheduler.envelope.inflight", d_inflight);
587acfdf0daSeric stat_increment("scheduler.envelope.expired", d_expired);
588acfdf0daSeric stat_increment("scheduler.envelope.removed", d_removed);
589acfdf0daSeric stat_increment("scheduler.envelope.updated", d_updated);
590d6b3bcf4Seric
591acfdf0daSeric ninflight += d_inflight;
592d6b3bcf4Seric
593d6b3bcf4Seric tv.tv_sec = 0;
594d6b3bcf4Seric tv.tv_usec = 0;
595299c4efeSeric evtimer_add(&ev, &tv);
5964ba24d34Sgilles }
597