1*ad8d242dSop /* $OpenBSD: scheduler_ramqueue.c,v 1.49 2024/09/03 18:27:04 op Exp $ */ 2f6a38d58Sgilles 3f6a38d58Sgilles /* 465c4fdfbSgilles * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> 56dc81a07Seric * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 6f6a38d58Sgilles * 7f6a38d58Sgilles * Permission to use, copy, modify, and distribute this software for any 8f6a38d58Sgilles * purpose with or without fee is hereby granted, provided that the above 9f6a38d58Sgilles * copyright notice and this permission notice appear in all copies. 10f6a38d58Sgilles * 11f6a38d58Sgilles * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 12f6a38d58Sgilles * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13f6a38d58Sgilles * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 14f6a38d58Sgilles * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 15f6a38d58Sgilles * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 16f6a38d58Sgilles * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 17f6a38d58Sgilles * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 18f6a38d58Sgilles */ 19f6a38d58Sgilles 200e63ed3bSchl #include <inttypes.h> 21f6a38d58Sgilles #include <stdlib.h> 22f6a38d58Sgilles #include <string.h> 230dcffd0dSop #include <time.h> 24f6a38d58Sgilles 25f6a38d58Sgilles #include "smtpd.h" 26f6a38d58Sgilles #include "log.h" 27f6a38d58Sgilles 2866bc57beSeric TAILQ_HEAD(evplist, rq_envelope); 29f6a38d58Sgilles 3066bc57beSeric struct rq_message { 3166bc57beSeric uint32_t msgid; 3266bc57beSeric struct tree envelopes; 33f6a38d58Sgilles }; 3466bc57beSeric 3566bc57beSeric struct rq_envelope { 3666bc57beSeric TAILQ_ENTRY(rq_envelope) entry; 37a98c8336Seric SPLAY_ENTRY(rq_envelope) t_entry; 3866bc57beSeric 3966bc57beSeric uint64_t evpid; 406dc81a07Seric uint64_t holdq; 41d17ec9a9Seric enum delivery_type type; 4266bc57beSeric 43d17ec9a9Seric #define RQ_EVPSTATE_PENDING 0 44d17ec9a9Seric #define RQ_EVPSTATE_SCHEDULED 1 45d17ec9a9Seric #define RQ_EVPSTATE_INFLIGHT 2 466dc81a07Seric #define RQ_EVPSTATE_HELD 3 47d17ec9a9Seric uint8_t state; 48d17ec9a9Seric 49d17ec9a9Seric #define RQ_ENVELOPE_EXPIRED 0x01 50d17ec9a9Seric #define RQ_ENVELOPE_REMOVED 0x02 51d17ec9a9Seric #define RQ_ENVELOPE_SUSPEND 0x04 5281ab3bccSeric #define RQ_ENVELOPE_UPDATE 0x08 53acfdf0daSeric #define RQ_ENVELOPE_OVERFLOW 0x10 5466bc57beSeric uint8_t flags; 5566bc57beSeric 5681ab3bccSeric time_t ctime; 57f6a38d58Sgilles time_t sched; 5866bc57beSeric time_t expire; 5966bc57beSeric 6066bc57beSeric struct rq_message *message; 61010bda1bSeric 62010bda1bSeric time_t t_inflight; 63010bda1bSeric time_t t_scheduled; 64f6a38d58Sgilles }; 65f6a38d58Sgilles 666dc81a07Seric struct rq_holdq { 676dc81a07Seric struct evplist q; 68acfdf0daSeric size_t count; 696dc81a07Seric }; 706dc81a07Seric 7166bc57beSeric struct rq_queue { 72be215280Seric size_t evpcount; 7366bc57beSeric struct tree messages; 74a98c8336Seric SPLAY_HEAD(prioqtree, rq_envelope) q_priotree; 75010bda1bSeric 764fe02f32Seric struct evplist q_pending; 774fe02f32Seric struct evplist q_inflight; 78010bda1bSeric 79d17ec9a9Seric struct evplist q_mta; 804fe02f32Seric struct evplist q_mda; 814fe02f32Seric struct evplist q_bounce; 8281ab3bccSeric struct evplist q_update; 834fe02f32Seric struct evplist q_expired; 844fe02f32Seric struct evplist q_removed; 85f6a38d58Sgilles }; 86f6a38d58Sgilles 87a98c8336Seric static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *); 88a98c8336Seric 89a98c8336Seric SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); 90acfdf0daSeric static int scheduler_ram_init(const char *); 91299c4efeSeric static int scheduler_ram_insert(struct scheduler_info *); 92299c4efeSeric static size_t scheduler_ram_commit(uint32_t); 93299c4efeSeric static size_t scheduler_ram_rollback(uint32_t); 94299c4efeSeric static int scheduler_ram_update(struct scheduler_info *); 95299c4efeSeric static int scheduler_ram_delete(uint64_t); 966dc81a07Seric static int scheduler_ram_hold(uint64_t, uint64_t); 977eed50e8Seric static int scheduler_ram_release(int, uint64_t, int); 98acfdf0daSeric static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *); 99299c4efeSeric static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t); 100299c4efeSeric static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t); 101299c4efeSeric static int scheduler_ram_schedule(uint64_t); 102299c4efeSeric static int scheduler_ram_remove(uint64_t); 10335e161d3Seric static int scheduler_ram_suspend(uint64_t); 10435e161d3Seric static int scheduler_ram_resume(uint64_t); 105a9835440Ssunil static int scheduler_ram_query(uint64_t); 10666bc57beSeric 107a98c8336Seric static void sorted_insert(struct rq_queue *, struct rq_envelope *); 10866bc57beSeric 10966bc57beSeric static void rq_queue_init(struct rq_queue *); 11066bc57beSeric static void rq_queue_merge(struct rq_queue *, struct rq_queue *); 111010bda1bSeric static void rq_queue_dump(struct rq_queue *, const char *); 112010bda1bSeric static void rq_queue_schedule(struct rq_queue *rq); 11335e161d3Seric static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *); 114010bda1bSeric static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *); 115299c4efeSeric static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *); 11635e161d3Seric static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *); 11735e161d3Seric static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *); 11866bc57beSeric static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *); 119010bda1bSeric static const char *rq_envelope_to_text(struct rq_envelope *); 120f6a38d58Sgilles 121f6a38d58Sgilles struct scheduler_backend scheduler_backend_ramqueue = { 122299c4efeSeric scheduler_ram_init, 12366bc57beSeric 124299c4efeSeric scheduler_ram_insert, 125299c4efeSeric scheduler_ram_commit, 126299c4efeSeric scheduler_ram_rollback, 12766bc57beSeric 128299c4efeSeric scheduler_ram_update, 129299c4efeSeric scheduler_ram_delete, 1306dc81a07Seric scheduler_ram_hold, 1316dc81a07Seric scheduler_ram_release, 13266bc57beSeric 133299c4efeSeric scheduler_ram_batch, 13466bc57beSeric 135299c4efeSeric scheduler_ram_messages, 136299c4efeSeric scheduler_ram_envelopes, 137299c4efeSeric scheduler_ram_schedule, 138299c4efeSeric scheduler_ram_remove, 13935e161d3Seric scheduler_ram_suspend, 14035e161d3Seric scheduler_ram_resume, 141a9835440Ssunil scheduler_ram_query, 142f6a38d58Sgilles }; 143f6a38d58Sgilles 14466bc57beSeric static struct rq_queue ramqueue; 14566bc57beSeric static struct tree updates; 1467eed50e8Seric static struct tree holdqs[3]; /* delivery type */ 147411e8119Sgilles 148010bda1bSeric static time_t currtime; 149010bda1bSeric 150acfdf0daSeric #define BACKOFF_TRANSFER 400 151acfdf0daSeric #define BACKOFF_DELIVERY 10 152acfdf0daSeric #define BACKOFF_OVERFLOW 3 153acfdf0daSeric 154acfdf0daSeric static time_t 155acfdf0daSeric scheduler_backoff(time_t t0, time_t base, uint32_t step) 156acfdf0daSeric { 157acfdf0daSeric return (t0 + base * step * step); 158acfdf0daSeric } 159acfdf0daSeric 160acfdf0daSeric static time_t 161acfdf0daSeric scheduler_next(time_t t0, time_t base, uint32_t step) 162acfdf0daSeric { 163acfdf0daSeric time_t t; 164acfdf0daSeric 165acfdf0daSeric /* XXX be more efficient */ 166acfdf0daSeric while ((t = scheduler_backoff(t0, base, step)) <= currtime) 167acfdf0daSeric step++; 168acfdf0daSeric 169acfdf0daSeric return (t); 170acfdf0daSeric } 171acfdf0daSeric 172299c4efeSeric static int 173acfdf0daSeric scheduler_ram_init(const char *arg) 174f6a38d58Sgilles { 17566bc57beSeric rq_queue_init(&ramqueue); 17666bc57beSeric tree_init(&updates); 1777eed50e8Seric tree_init(&holdqs[D_MDA]); 1787eed50e8Seric tree_init(&holdqs[D_MTA]); 1797eed50e8Seric tree_init(&holdqs[D_BOUNCE]); 180299c4efeSeric 181299c4efeSeric return (1); 182f6a38d58Sgilles } 183f6a38d58Sgilles 184299c4efeSeric static int 185299c4efeSeric scheduler_ram_insert(struct scheduler_info *si) 186f6a38d58Sgilles { 18766bc57beSeric struct rq_queue *update; 18866bc57beSeric struct rq_message *message; 18966bc57beSeric struct rq_envelope *envelope; 190acfdf0daSeric uint32_t msgid; 191f6a38d58Sgilles 192010bda1bSeric currtime = time(NULL); 193010bda1bSeric 194f05a516fSgilles msgid = evpid_to_msgid(si->evpid); 195f6a38d58Sgilles 19666bc57beSeric /* find/prepare a ramqueue update */ 19766bc57beSeric if ((update = tree_get(&updates, msgid)) == NULL) { 198118c16f3Sgilles update = xcalloc(1, sizeof *update); 199e4632cf4Sgilles stat_increment("scheduler.ramqueue.update", 1); 20066bc57beSeric rq_queue_init(update); 20166bc57beSeric tree_xset(&updates, msgid, update); 2029af92d2bSgilles } 2039af92d2bSgilles 20466bc57beSeric /* find/prepare the msgtree message in ramqueue update */ 20566bc57beSeric if ((message = tree_get(&update->messages, msgid)) == NULL) { 206118c16f3Sgilles message = xcalloc(1, sizeof *message); 20766bc57beSeric message->msgid = msgid; 20866bc57beSeric tree_init(&message->envelopes); 20966bc57beSeric tree_xset(&update->messages, msgid, message); 210e4632cf4Sgilles stat_increment("scheduler.ramqueue.message", 1); 21166bc57beSeric } 21266bc57beSeric 21366bc57beSeric /* create envelope in ramqueue message */ 214118c16f3Sgilles envelope = xcalloc(1, sizeof *envelope); 21566bc57beSeric envelope->evpid = si->evpid; 21666bc57beSeric envelope->type = si->type; 21766bc57beSeric envelope->message = message; 21881ab3bccSeric envelope->ctime = si->creation; 219a8e22235Sgilles envelope->expire = si->creation + si->ttl; 220acfdf0daSeric envelope->sched = scheduler_backoff(si->creation, 221acfdf0daSeric (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); 222010bda1bSeric tree_xset(&message->envelopes, envelope->evpid, envelope); 22366bc57beSeric 224be215280Seric update->evpcount++; 225e4632cf4Sgilles stat_increment("scheduler.ramqueue.envelope", 1); 2269ed3223cSgilles 227d17ec9a9Seric envelope->state = RQ_EVPSTATE_PENDING; 228a98c8336Seric TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry); 22965c4fdfbSgilles 23065c4fdfbSgilles si->nexttry = envelope->sched; 231299c4efeSeric 232299c4efeSeric return (1); 233f6a38d58Sgilles } 234f6a38d58Sgilles 235be215280Seric static size_t 236299c4efeSeric scheduler_ram_commit(uint32_t msgid) 237f6a38d58Sgilles { 23866bc57beSeric struct rq_queue *update; 239be215280Seric size_t r; 240010bda1bSeric 241010bda1bSeric currtime = time(NULL); 2429af92d2bSgilles 24366bc57beSeric update = tree_xpop(&updates, msgid); 244be215280Seric r = update->evpcount; 2459af92d2bSgilles 246f24248b7Sreyk if (tracing & TRACE_SCHEDULER) 247010bda1bSeric rq_queue_dump(update, "update to commit"); 248010bda1bSeric 249010bda1bSeric rq_queue_merge(&ramqueue, update); 2504fe02f32Seric 251f24248b7Sreyk if (tracing & TRACE_SCHEDULER) 2524fe02f32Seric rq_queue_dump(&ramqueue, "resulting queue"); 2534fe02f32Seric 254010bda1bSeric rq_queue_schedule(&ramqueue); 2559af92d2bSgilles 25666bc57beSeric free(update); 257e4632cf4Sgilles stat_decrement("scheduler.ramqueue.update", 1); 258be215280Seric 259be215280Seric return (r); 2609af92d2bSgilles } 2619af92d2bSgilles 262be215280Seric static size_t 263299c4efeSeric scheduler_ram_rollback(uint32_t msgid) 2649af92d2bSgilles { 26566bc57beSeric struct rq_queue *update; 266010bda1bSeric struct rq_envelope *evp; 267be215280Seric size_t r; 268010bda1bSeric 269010bda1bSeric currtime = time(NULL); 270f6a38d58Sgilles 271cb35e574Seric if ((update = tree_pop(&updates, msgid)) == NULL) 272be215280Seric return (0); 273be215280Seric r = update->evpcount; 2749af92d2bSgilles 2754fe02f32Seric while ((evp = TAILQ_FIRST(&update->q_pending))) { 2764fe02f32Seric TAILQ_REMOVE(&update->q_pending, evp, entry); 277010bda1bSeric rq_envelope_delete(update, evp); 278010bda1bSeric } 27966bc57beSeric 28066bc57beSeric free(update); 281e4632cf4Sgilles stat_decrement("scheduler.ramqueue.update", 1); 282be215280Seric 283be215280Seric return (r); 28466bc57beSeric } 28566bc57beSeric 286299c4efeSeric static int 287299c4efeSeric scheduler_ram_update(struct scheduler_info *si) 28866bc57beSeric { 289010bda1bSeric struct rq_message *msg; 290010bda1bSeric struct rq_envelope *evp; 29166bc57beSeric uint32_t msgid; 29266bc57beSeric 293010bda1bSeric currtime = time(NULL); 294010bda1bSeric 29566bc57beSeric msgid = evpid_to_msgid(si->evpid); 296010bda1bSeric msg = tree_xget(&ramqueue.messages, msgid); 297010bda1bSeric evp = tree_xget(&msg->envelopes, si->evpid); 29866bc57beSeric 299010bda1bSeric /* it *must* be in-flight */ 300d17ec9a9Seric if (evp->state != RQ_EVPSTATE_INFLIGHT) 301ff01b044Seric fatalx("evp:%016" PRIx64 " not in-flight", si->evpid); 30266bc57beSeric 303d17ec9a9Seric TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 304d17ec9a9Seric 305d17ec9a9Seric /* 306d17ec9a9Seric * If the envelope was removed while inflight, schedule it for 307af0dba2aSsobrado * removal immediately. 308d17ec9a9Seric */ 309d17ec9a9Seric if (evp->flags & RQ_ENVELOPE_REMOVED) { 310d17ec9a9Seric TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry); 311d17ec9a9Seric evp->state = RQ_EVPSTATE_SCHEDULED; 312d17ec9a9Seric evp->t_scheduled = currtime; 313d17ec9a9Seric return (1); 314d17ec9a9Seric } 315d17ec9a9Seric 316acfdf0daSeric evp->sched = scheduler_next(evp->ctime, 317acfdf0daSeric (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); 31866bc57beSeric 319d17ec9a9Seric evp->state = RQ_EVPSTATE_PENDING; 32035e161d3Seric if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 321a98c8336Seric sorted_insert(&ramqueue, evp); 32265c4fdfbSgilles 32365c4fdfbSgilles si->nexttry = evp->sched; 324299c4efeSeric 325299c4efeSeric return (1); 32666bc57beSeric } 32766bc57beSeric 328299c4efeSeric static int 329299c4efeSeric scheduler_ram_delete(uint64_t evpid) 33066bc57beSeric { 331010bda1bSeric struct rq_message *msg; 332010bda1bSeric struct rq_envelope *evp; 33366bc57beSeric uint32_t msgid; 33466bc57beSeric 335010bda1bSeric currtime = time(NULL); 336010bda1bSeric 33766bc57beSeric msgid = evpid_to_msgid(evpid); 338010bda1bSeric msg = tree_xget(&ramqueue.messages, msgid); 339010bda1bSeric evp = tree_xget(&msg->envelopes, evpid); 34066bc57beSeric 34166bc57beSeric /* it *must* be in-flight */ 342d17ec9a9Seric if (evp->state != RQ_EVPSTATE_INFLIGHT) 343ff01b044Seric fatalx("evp:%016" PRIx64 " not in-flight", evpid); 34466bc57beSeric 3454fe02f32Seric TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 346d17ec9a9Seric 347010bda1bSeric rq_envelope_delete(&ramqueue, evp); 348299c4efeSeric 349299c4efeSeric return (1); 35066bc57beSeric } 35166bc57beSeric 352acfdf0daSeric #define HOLDQ_MAXSIZE 1000 353acfdf0daSeric 354299c4efeSeric static int 3556dc81a07Seric scheduler_ram_hold(uint64_t evpid, uint64_t holdq) 3566dc81a07Seric { 3576dc81a07Seric struct rq_holdq *hq; 3586dc81a07Seric struct rq_message *msg; 3596dc81a07Seric struct rq_envelope *evp; 3606dc81a07Seric uint32_t msgid; 3616dc81a07Seric 3626dc81a07Seric currtime = time(NULL); 3636dc81a07Seric 3646dc81a07Seric msgid = evpid_to_msgid(evpid); 3656dc81a07Seric msg = tree_xget(&ramqueue.messages, msgid); 3666dc81a07Seric evp = tree_xget(&msg->envelopes, evpid); 3676dc81a07Seric 3686dc81a07Seric /* it *must* be in-flight */ 3696dc81a07Seric if (evp->state != RQ_EVPSTATE_INFLIGHT) 370ff01b044Seric fatalx("evp:%016" PRIx64 " not in-flight", evpid); 3716dc81a07Seric 3726dc81a07Seric TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 3736dc81a07Seric 3746dc81a07Seric /* If the envelope is suspended, just mark it as pending */ 3756dc81a07Seric if (evp->flags & RQ_ENVELOPE_SUSPEND) { 3766dc81a07Seric evp->state = RQ_EVPSTATE_PENDING; 3776dc81a07Seric return (0); 3786dc81a07Seric } 3796dc81a07Seric 3807eed50e8Seric hq = tree_get(&holdqs[evp->type], holdq); 3816dc81a07Seric if (hq == NULL) { 382118c16f3Sgilles hq = xcalloc(1, sizeof(*hq)); 3836dc81a07Seric TAILQ_INIT(&hq->q); 3847eed50e8Seric tree_xset(&holdqs[evp->type], holdq, hq); 385acfdf0daSeric stat_increment("scheduler.ramqueue.holdq", 1); 386acfdf0daSeric } 387acfdf0daSeric 388acfdf0daSeric /* If the holdq is full, just "tempfail" the envelope */ 389acfdf0daSeric if (hq->count >= HOLDQ_MAXSIZE) { 390acfdf0daSeric evp->state = RQ_EVPSTATE_PENDING; 391acfdf0daSeric evp->flags |= RQ_ENVELOPE_UPDATE; 392acfdf0daSeric evp->flags |= RQ_ENVELOPE_OVERFLOW; 393acfdf0daSeric sorted_insert(&ramqueue, evp); 394acfdf0daSeric stat_increment("scheduler.ramqueue.hold-overflow", 1); 395acfdf0daSeric return (0); 3966dc81a07Seric } 3976dc81a07Seric 3986dc81a07Seric evp->state = RQ_EVPSTATE_HELD; 3996dc81a07Seric evp->holdq = holdq; 4006dc81a07Seric /* This is an optimization: upon release, the envelopes will be 4016dc81a07Seric * inserted in the pending queue from the first element to the last. 4026dc81a07Seric * Since elements already in the queue were received first, they 4036dc81a07Seric * were scheduled first, so they will be reinserted before the 4046dc81a07Seric * current element. 4056dc81a07Seric */ 4066dc81a07Seric TAILQ_INSERT_HEAD(&hq->q, evp, entry); 407acfdf0daSeric hq->count += 1; 4086dc81a07Seric stat_increment("scheduler.ramqueue.hold", 1); 4096dc81a07Seric 4106dc81a07Seric return (1); 4116dc81a07Seric } 4126dc81a07Seric 4136dc81a07Seric static int 4147eed50e8Seric scheduler_ram_release(int type, uint64_t holdq, int n) 4156dc81a07Seric { 4166dc81a07Seric struct rq_holdq *hq; 4176dc81a07Seric struct rq_envelope *evp; 41881ab3bccSeric int i, update; 4196dc81a07Seric 4206dc81a07Seric currtime = time(NULL); 4216dc81a07Seric 4227eed50e8Seric hq = tree_get(&holdqs[type], holdq); 4236dc81a07Seric if (hq == NULL) 4246dc81a07Seric return (0); 4256dc81a07Seric 42681ab3bccSeric if (n == -1) { 42781ab3bccSeric n = 0; 42881ab3bccSeric update = 1; 42981ab3bccSeric } 43081ab3bccSeric else 43181ab3bccSeric update = 0; 43281ab3bccSeric 4336dc81a07Seric for (i = 0; n == 0 || i < n; i++) { 4346dc81a07Seric evp = TAILQ_FIRST(&hq->q); 4356dc81a07Seric if (evp == NULL) 4366dc81a07Seric break; 4376dc81a07Seric 4386dc81a07Seric TAILQ_REMOVE(&hq->q, evp, entry); 439acfdf0daSeric hq->count -= 1; 4406dc81a07Seric evp->holdq = 0; 4416dc81a07Seric 4426dc81a07Seric /* When released, all envelopes are put in the pending queue 443af0dba2aSsobrado * and will be rescheduled immediately. As an optimization, 4446dc81a07Seric * we could just schedule them directly. 4456dc81a07Seric */ 4466dc81a07Seric evp->state = RQ_EVPSTATE_PENDING; 44781ab3bccSeric if (update) 44881ab3bccSeric evp->flags |= RQ_ENVELOPE_UPDATE; 449a98c8336Seric sorted_insert(&ramqueue, evp); 4506dc81a07Seric } 4516dc81a07Seric 4526dc81a07Seric if (TAILQ_EMPTY(&hq->q)) { 4537eed50e8Seric tree_xpop(&holdqs[type], holdq); 4546dc81a07Seric free(hq); 455acfdf0daSeric stat_decrement("scheduler.ramqueue.holdq", 1); 4566dc81a07Seric } 4576dc81a07Seric stat_decrement("scheduler.ramqueue.hold", i); 4586dc81a07Seric 4596dc81a07Seric return (i); 4606dc81a07Seric } 4616dc81a07Seric 4626dc81a07Seric static int 463acfdf0daSeric scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types) 46466bc57beSeric { 4654fe02f32Seric struct rq_envelope *evp; 466acfdf0daSeric size_t i, n; 467acfdf0daSeric time_t t; 46866bc57beSeric 469010bda1bSeric currtime = time(NULL); 47066bc57beSeric 471010bda1bSeric rq_queue_schedule(&ramqueue); 472f24248b7Sreyk if (tracing & TRACE_SCHEDULER) 473299c4efeSeric rq_queue_dump(&ramqueue, "scheduler_ram_batch()"); 474010bda1bSeric 475acfdf0daSeric i = 0; 476acfdf0daSeric n = 0; 47766bc57beSeric 478acfdf0daSeric for (;;) { 4794fe02f32Seric 480acfdf0daSeric if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) { 481acfdf0daSeric TAILQ_REMOVE(&ramqueue.q_removed, evp, entry); 482acfdf0daSeric types[i] = SCHED_REMOVE; 483acfdf0daSeric evpids[i] = evp->evpid; 484010bda1bSeric rq_envelope_delete(&ramqueue, evp); 48581ab3bccSeric 486acfdf0daSeric if (++i == *count) 487acfdf0daSeric break; 488acfdf0daSeric } 48981ab3bccSeric 490acfdf0daSeric if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) { 491acfdf0daSeric TAILQ_REMOVE(&ramqueue.q_expired, evp, entry); 492acfdf0daSeric types[i] = SCHED_EXPIRE; 493acfdf0daSeric evpids[i] = evp->evpid; 494acfdf0daSeric rq_envelope_delete(&ramqueue, evp); 49581ab3bccSeric 496acfdf0daSeric if (++i == *count) 497acfdf0daSeric break; 498acfdf0daSeric } 499acfdf0daSeric 500acfdf0daSeric if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) { 501acfdf0daSeric TAILQ_REMOVE(&ramqueue.q_update, evp, entry); 502acfdf0daSeric types[i] = SCHED_UPDATE; 503acfdf0daSeric evpids[i] = evp->evpid; 504acfdf0daSeric 505acfdf0daSeric if (evp->flags & RQ_ENVELOPE_OVERFLOW) 506acfdf0daSeric t = BACKOFF_OVERFLOW; 507acfdf0daSeric else if (evp->type == D_MTA) 508acfdf0daSeric t = BACKOFF_TRANSFER; 509acfdf0daSeric else 510acfdf0daSeric t = BACKOFF_DELIVERY; 511acfdf0daSeric 512acfdf0daSeric evp->sched = scheduler_next(evp->ctime, t, 0); 513acfdf0daSeric evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW); 51481ab3bccSeric evp->state = RQ_EVPSTATE_PENDING; 51581ab3bccSeric if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 51681ab3bccSeric sorted_insert(&ramqueue, evp); 517acfdf0daSeric 518acfdf0daSeric if (++i == *count) 519acfdf0daSeric break; 52081ab3bccSeric } 521acfdf0daSeric 522acfdf0daSeric if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) { 523acfdf0daSeric TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry); 524acfdf0daSeric types[i] = SCHED_BOUNCE; 525acfdf0daSeric evpids[i] = evp->evpid; 526acfdf0daSeric 5274fe02f32Seric TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 528d17ec9a9Seric evp->state = RQ_EVPSTATE_INFLIGHT; 529010bda1bSeric evp->t_inflight = currtime; 530acfdf0daSeric 531acfdf0daSeric if (++i == *count) 532acfdf0daSeric break; 53366bc57beSeric } 534bc53cd6aSeric 535acfdf0daSeric if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) { 536acfdf0daSeric TAILQ_REMOVE(&ramqueue.q_mda, evp, entry); 537acfdf0daSeric types[i] = SCHED_MDA; 538acfdf0daSeric evpids[i] = evp->evpid; 53966bc57beSeric 540acfdf0daSeric TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 541acfdf0daSeric evp->state = RQ_EVPSTATE_INFLIGHT; 542acfdf0daSeric evp->t_inflight = currtime; 543d6b3bcf4Seric 544acfdf0daSeric if (++i == *count) 545acfdf0daSeric break; 546acfdf0daSeric } 547d6b3bcf4Seric 548acfdf0daSeric if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) { 549acfdf0daSeric TAILQ_REMOVE(&ramqueue.q_mta, evp, entry); 550acfdf0daSeric types[i] = SCHED_MTA; 551acfdf0daSeric evpids[i] = evp->evpid; 552acfdf0daSeric 553acfdf0daSeric TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 554acfdf0daSeric evp->state = RQ_EVPSTATE_INFLIGHT; 555acfdf0daSeric evp->t_inflight = currtime; 556acfdf0daSeric 557acfdf0daSeric if (++i == *count) 558acfdf0daSeric break; 559acfdf0daSeric } 560acfdf0daSeric 561acfdf0daSeric /* nothing seen this round */ 562acfdf0daSeric if (i == n) 563acfdf0daSeric break; 564acfdf0daSeric 565acfdf0daSeric n = i; 566acfdf0daSeric } 567acfdf0daSeric 568acfdf0daSeric if (i) { 569acfdf0daSeric *count = i; 570acfdf0daSeric return (1); 571acfdf0daSeric } 572acfdf0daSeric 573acfdf0daSeric if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) { 574acfdf0daSeric if (evp->sched < evp->expire) 575acfdf0daSeric t = evp->sched; 576acfdf0daSeric else 577acfdf0daSeric t = evp->expire; 578acfdf0daSeric *delay = (t < currtime) ? 0 : (t - currtime); 579acfdf0daSeric } 580acfdf0daSeric else 581acfdf0daSeric *delay = -1; 582acfdf0daSeric 583acfdf0daSeric return (0); 584f6a38d58Sgilles } 585f6a38d58Sgilles 5864fe02f32Seric static size_t 587299c4efeSeric scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size) 5884fe02f32Seric { 5894fe02f32Seric uint64_t id; 5904fe02f32Seric size_t n; 5914fe02f32Seric void *i; 5924fe02f32Seric 5934fe02f32Seric for (n = 0, i = NULL; n < size; n++) { 5944fe02f32Seric if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0) 5954fe02f32Seric break; 5964fe02f32Seric dst[n] = id; 5974fe02f32Seric } 5984fe02f32Seric 5994fe02f32Seric return (n); 6004fe02f32Seric } 6014fe02f32Seric 6024fe02f32Seric static size_t 603299c4efeSeric scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size) 6044fe02f32Seric { 6054fe02f32Seric struct rq_message *msg; 6064fe02f32Seric struct rq_envelope *evp; 6074fe02f32Seric void *i; 6084fe02f32Seric size_t n; 6094fe02f32Seric 6104fe02f32Seric if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL) 6114fe02f32Seric return (0); 6124fe02f32Seric 6134fe02f32Seric for (n = 0, i = NULL; n < size; ) { 6144fe02f32Seric 6154fe02f32Seric if (tree_iterfrom(&msg->envelopes, &i, from, NULL, 6164fe02f32Seric (void**)&evp) == 0) 6174fe02f32Seric break; 6184fe02f32Seric 6194fe02f32Seric if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) 6204fe02f32Seric continue; 6214fe02f32Seric 6224fe02f32Seric dst[n].evpid = evp->evpid; 623d17ec9a9Seric dst[n].flags = 0; 624d17ec9a9Seric dst[n].retry = 0; 625d17ec9a9Seric dst[n].time = 0; 626d17ec9a9Seric 627d17ec9a9Seric if (evp->state == RQ_EVPSTATE_PENDING) { 6284fe02f32Seric dst[n].time = evp->sched; 62965c4fdfbSgilles dst[n].flags = EF_PENDING; 6304fe02f32Seric } 631d17ec9a9Seric else if (evp->state == RQ_EVPSTATE_SCHEDULED) { 6324fe02f32Seric dst[n].time = evp->t_scheduled; 63365c4fdfbSgilles dst[n].flags = EF_PENDING; 6344fe02f32Seric } 635d17ec9a9Seric else if (evp->state == RQ_EVPSTATE_INFLIGHT) { 6364fe02f32Seric dst[n].time = evp->t_inflight; 63765c4fdfbSgilles dst[n].flags = EF_INFLIGHT; 6384fe02f32Seric } 6396dc81a07Seric else if (evp->state == RQ_EVPSTATE_HELD) { 6406dc81a07Seric /* same as scheduled */ 6416dc81a07Seric dst[n].time = evp->t_scheduled; 6426dc81a07Seric dst[n].flags = EF_PENDING; 6436dc81a07Seric dst[n].flags |= EF_HOLD; 6446dc81a07Seric } 64535e161d3Seric if (evp->flags & RQ_ENVELOPE_SUSPEND) 64635e161d3Seric dst[n].flags |= EF_SUSPEND; 647d17ec9a9Seric 6484fe02f32Seric n++; 6494fe02f32Seric } 6504fe02f32Seric 6514fe02f32Seric return (n); 6524fe02f32Seric } 6534fe02f32Seric 654299c4efeSeric static int 655299c4efeSeric scheduler_ram_schedule(uint64_t evpid) 656299c4efeSeric { 657299c4efeSeric struct rq_message *msg; 658299c4efeSeric struct rq_envelope *evp; 659299c4efeSeric uint32_t msgid; 660299c4efeSeric void *i; 661299c4efeSeric int r; 662299c4efeSeric 663299c4efeSeric currtime = time(NULL); 664299c4efeSeric 665299c4efeSeric if (evpid > 0xffffffff) { 666299c4efeSeric msgid = evpid_to_msgid(evpid); 667299c4efeSeric if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 668299c4efeSeric return (0); 669299c4efeSeric if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 670299c4efeSeric return (0); 671d17ec9a9Seric if (evp->state == RQ_EVPSTATE_INFLIGHT) 672d17ec9a9Seric return (0); 673299c4efeSeric rq_envelope_schedule(&ramqueue, evp); 674299c4efeSeric return (1); 675299c4efeSeric } 676299c4efeSeric else { 677299c4efeSeric msgid = evpid; 678299c4efeSeric if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 679299c4efeSeric return (0); 680299c4efeSeric i = NULL; 681299c4efeSeric r = 0; 682d17ec9a9Seric while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) { 683d17ec9a9Seric if (evp->state == RQ_EVPSTATE_INFLIGHT) 684d17ec9a9Seric continue; 685299c4efeSeric rq_envelope_schedule(&ramqueue, evp); 686299c4efeSeric r++; 687299c4efeSeric } 688299c4efeSeric return (r); 689299c4efeSeric } 690299c4efeSeric } 691299c4efeSeric 692299c4efeSeric static int 693299c4efeSeric scheduler_ram_remove(uint64_t evpid) 694299c4efeSeric { 695299c4efeSeric struct rq_message *msg; 696299c4efeSeric struct rq_envelope *evp; 697299c4efeSeric uint32_t msgid; 698299c4efeSeric void *i; 699299c4efeSeric int r; 700299c4efeSeric 701299c4efeSeric currtime = time(NULL); 702299c4efeSeric 703299c4efeSeric if (evpid > 0xffffffff) { 704299c4efeSeric msgid = evpid_to_msgid(evpid); 705299c4efeSeric if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 706299c4efeSeric return (0); 707299c4efeSeric if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 708299c4efeSeric return (0); 709299c4efeSeric if (rq_envelope_remove(&ramqueue, evp)) 710299c4efeSeric return (1); 711299c4efeSeric return (0); 712299c4efeSeric } 713299c4efeSeric else { 714299c4efeSeric msgid = evpid; 715299c4efeSeric if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 716299c4efeSeric return (0); 717299c4efeSeric i = NULL; 718299c4efeSeric r = 0; 719299c4efeSeric while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 720299c4efeSeric if (rq_envelope_remove(&ramqueue, evp)) 721299c4efeSeric r++; 722299c4efeSeric return (r); 723299c4efeSeric } 724299c4efeSeric } 725299c4efeSeric 72635e161d3Seric static int 72735e161d3Seric scheduler_ram_suspend(uint64_t evpid) 72835e161d3Seric { 72935e161d3Seric struct rq_message *msg; 73035e161d3Seric struct rq_envelope *evp; 73135e161d3Seric uint32_t msgid; 73235e161d3Seric void *i; 73335e161d3Seric int r; 73435e161d3Seric 73535e161d3Seric currtime = time(NULL); 73635e161d3Seric 73735e161d3Seric if (evpid > 0xffffffff) { 73835e161d3Seric msgid = evpid_to_msgid(evpid); 73935e161d3Seric if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 74035e161d3Seric return (0); 74135e161d3Seric if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 74235e161d3Seric return (0); 74335e161d3Seric if (rq_envelope_suspend(&ramqueue, evp)) 74435e161d3Seric return (1); 74535e161d3Seric return (0); 74635e161d3Seric } 74735e161d3Seric else { 74835e161d3Seric msgid = evpid; 74935e161d3Seric if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 75035e161d3Seric return (0); 75135e161d3Seric i = NULL; 75235e161d3Seric r = 0; 75335e161d3Seric while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 75435e161d3Seric if (rq_envelope_suspend(&ramqueue, evp)) 75535e161d3Seric r++; 75635e161d3Seric return (r); 75735e161d3Seric } 75835e161d3Seric } 75935e161d3Seric 76035e161d3Seric static int 76135e161d3Seric scheduler_ram_resume(uint64_t evpid) 76235e161d3Seric { 76335e161d3Seric struct rq_message *msg; 76435e161d3Seric struct rq_envelope *evp; 76535e161d3Seric uint32_t msgid; 76635e161d3Seric void *i; 76735e161d3Seric int r; 76835e161d3Seric 76935e161d3Seric currtime = time(NULL); 77035e161d3Seric 77135e161d3Seric if (evpid > 0xffffffff) { 77235e161d3Seric msgid = evpid_to_msgid(evpid); 77335e161d3Seric if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 77435e161d3Seric return (0); 77535e161d3Seric if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 77635e161d3Seric return (0); 77735e161d3Seric if (rq_envelope_resume(&ramqueue, evp)) 77835e161d3Seric return (1); 77935e161d3Seric return (0); 78035e161d3Seric } 78135e161d3Seric else { 78235e161d3Seric msgid = evpid; 78335e161d3Seric if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 78435e161d3Seric return (0); 78535e161d3Seric i = NULL; 78635e161d3Seric r = 0; 78735e161d3Seric while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 78835e161d3Seric if (rq_envelope_resume(&ramqueue, evp)) 78935e161d3Seric r++; 79035e161d3Seric return (r); 79135e161d3Seric } 79235e161d3Seric } 79335e161d3Seric 794a9835440Ssunil static int 795a9835440Ssunil scheduler_ram_query(uint64_t evpid) 796a9835440Ssunil { 797a9835440Ssunil uint32_t msgid; 798a9835440Ssunil 799a9835440Ssunil if (evpid > 0xffffffff) 800a9835440Ssunil msgid = evpid_to_msgid(evpid); 801a9835440Ssunil else 802a9835440Ssunil msgid = evpid; 803a9835440Ssunil 804a9835440Ssunil if (tree_get(&ramqueue.messages, msgid) == NULL) 805a9835440Ssunil return (0); 806a9835440Ssunil 807a9835440Ssunil return (1); 808a9835440Ssunil } 809a9835440Ssunil 810f6a38d58Sgilles static void 811a98c8336Seric sorted_insert(struct rq_queue *rq, struct rq_envelope *evp) 812f6a38d58Sgilles { 813a98c8336Seric struct rq_envelope *evp2; 814f6a38d58Sgilles 815a98c8336Seric SPLAY_INSERT(prioqtree, &rq->q_priotree, evp); 816a98c8336Seric evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp); 817a98c8336Seric if (evp2) 818a98c8336Seric TAILQ_INSERT_BEFORE(evp2, evp, entry); 819a98c8336Seric else 820a98c8336Seric TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry); 821f6a38d58Sgilles } 822f6a38d58Sgilles 823f6a38d58Sgilles static void 82466bc57beSeric rq_queue_init(struct rq_queue *rq) 825f6a38d58Sgilles { 826c1392a69Seric memset(rq, 0, sizeof *rq); 82766bc57beSeric tree_init(&rq->messages); 8284fe02f32Seric TAILQ_INIT(&rq->q_pending); 8294fe02f32Seric TAILQ_INIT(&rq->q_inflight); 830d17ec9a9Seric TAILQ_INIT(&rq->q_mta); 8314fe02f32Seric TAILQ_INIT(&rq->q_mda); 8324fe02f32Seric TAILQ_INIT(&rq->q_bounce); 83381ab3bccSeric TAILQ_INIT(&rq->q_update); 8344fe02f32Seric TAILQ_INIT(&rq->q_expired); 8354fe02f32Seric TAILQ_INIT(&rq->q_removed); 836a98c8336Seric SPLAY_INIT(&rq->q_priotree); 83766bc57beSeric } 838f6a38d58Sgilles 839f6a38d58Sgilles static void 84066bc57beSeric rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) 841f6a38d58Sgilles { 84266bc57beSeric struct rq_message *message, *tomessage; 84366bc57beSeric struct rq_envelope *envelope; 84466bc57beSeric uint64_t id; 84566bc57beSeric void *i; 84666bc57beSeric 84766bc57beSeric while (tree_poproot(&update->messages, &id, (void*)&message)) { 84866bc57beSeric if ((tomessage = tree_get(&rq->messages, id)) == NULL) { 849*ad8d242dSop /* message does not exist. reuse structure */ 85066bc57beSeric tree_xset(&rq->messages, id, message); 85166bc57beSeric continue; 85266bc57beSeric } 85366bc57beSeric /* need to re-link all envelopes before merging them */ 85466bc57beSeric i = NULL; 85566bc57beSeric while ((tree_iter(&message->envelopes, &i, &id, 85666bc57beSeric (void*)&envelope))) 85766bc57beSeric envelope->message = tomessage; 85866bc57beSeric tree_merge(&tomessage->envelopes, &message->envelopes); 85966bc57beSeric free(message); 860ce6c53f0Seric stat_decrement("scheduler.ramqueue.message", 1); 86166bc57beSeric } 86266bc57beSeric 863a98c8336Seric /* Sorted insert in the pending queue */ 864a98c8336Seric while ((envelope = TAILQ_FIRST(&update->q_pending))) { 865a98c8336Seric TAILQ_REMOVE(&update->q_pending, envelope, entry); 866a98c8336Seric sorted_insert(rq, envelope); 867a98c8336Seric } 868a98c8336Seric 8694fe02f32Seric rq->evpcount += update->evpcount; 87066bc57beSeric } 87166bc57beSeric 872acfdf0daSeric #define SCHEDULEMAX 1024 873acfdf0daSeric 87466bc57beSeric static void 875010bda1bSeric rq_queue_schedule(struct rq_queue *rq) 87666bc57beSeric { 877010bda1bSeric struct rq_envelope *evp; 878acfdf0daSeric size_t n; 87966bc57beSeric 880acfdf0daSeric n = 0; 8814fe02f32Seric while ((evp = TAILQ_FIRST(&rq->q_pending))) { 882010bda1bSeric if (evp->sched > currtime && evp->expire > currtime) 883010bda1bSeric break; 88466bc57beSeric 885acfdf0daSeric if (n == SCHEDULEMAX) 886acfdf0daSeric break; 887acfdf0daSeric 888d17ec9a9Seric if (evp->state != RQ_EVPSTATE_PENDING) 889ff01b044Seric fatalx("evp:%016" PRIx64 " flags=0x%x", evp->evpid, 890010bda1bSeric evp->flags); 89166bc57beSeric 892010bda1bSeric if (evp->expire <= currtime) { 8934fe02f32Seric TAILQ_REMOVE(&rq->q_pending, evp, entry); 894a98c8336Seric SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 8954fe02f32Seric TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry); 896d17ec9a9Seric evp->state = RQ_EVPSTATE_SCHEDULED; 897010bda1bSeric evp->flags |= RQ_ENVELOPE_EXPIRED; 898010bda1bSeric evp->t_scheduled = currtime; 899010bda1bSeric continue; 900010bda1bSeric } 901010bda1bSeric rq_envelope_schedule(rq, evp); 902acfdf0daSeric n += 1; 903010bda1bSeric } 904010bda1bSeric } 905010bda1bSeric 90635e161d3Seric static struct evplist * 90735e161d3Seric rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp) 90835e161d3Seric { 909d17ec9a9Seric switch (evp->state) { 910d17ec9a9Seric case RQ_EVPSTATE_PENDING: 911d17ec9a9Seric return &rq->q_pending; 912d17ec9a9Seric 913d17ec9a9Seric case RQ_EVPSTATE_SCHEDULED: 91435e161d3Seric if (evp->flags & RQ_ENVELOPE_EXPIRED) 91535e161d3Seric return &rq->q_expired; 91635e161d3Seric if (evp->flags & RQ_ENVELOPE_REMOVED) 91735e161d3Seric return &rq->q_removed; 91881ab3bccSeric if (evp->flags & RQ_ENVELOPE_UPDATE) 91981ab3bccSeric return &rq->q_update; 92035e161d3Seric if (evp->type == D_MTA) 921d17ec9a9Seric return &rq->q_mta; 92235e161d3Seric if (evp->type == D_MDA) 92335e161d3Seric return &rq->q_mda; 92435e161d3Seric if (evp->type == D_BOUNCE) 92535e161d3Seric return &rq->q_bounce; 926ff01b044Seric fatalx("%016" PRIx64 " bad evp type %d", evp->evpid, evp->type); 927d17ec9a9Seric 928d17ec9a9Seric case RQ_EVPSTATE_INFLIGHT: 929d17ec9a9Seric return &rq->q_inflight; 9306dc81a07Seric 9316dc81a07Seric case RQ_EVPSTATE_HELD: 9326dc81a07Seric return (NULL); 93335e161d3Seric } 93435e161d3Seric 935ff01b044Seric fatalx("%016" PRIx64 " bad state %d", evp->evpid, evp->state); 93635e161d3Seric return (NULL); 93735e161d3Seric } 93835e161d3Seric 939010bda1bSeric static void 940010bda1bSeric rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) 941010bda1bSeric { 9426dc81a07Seric struct rq_holdq *hq; 9434fe02f32Seric struct evplist *q = NULL; 944010bda1bSeric 945d17ec9a9Seric switch (evp->type) { 946d17ec9a9Seric case D_MTA: 947d17ec9a9Seric q = &rq->q_mta; 948d17ec9a9Seric break; 949d17ec9a9Seric case D_MDA: 9504fe02f32Seric q = &rq->q_mda; 951d17ec9a9Seric break; 952d17ec9a9Seric case D_BOUNCE: 9534fe02f32Seric q = &rq->q_bounce; 954d17ec9a9Seric break; 955d17ec9a9Seric } 9564fe02f32Seric 95781ab3bccSeric if (evp->flags & RQ_ENVELOPE_UPDATE) 95881ab3bccSeric q = &rq->q_update; 95981ab3bccSeric 9606dc81a07Seric if (evp->state == RQ_EVPSTATE_HELD) { 9617eed50e8Seric hq = tree_xget(&holdqs[evp->type], evp->holdq); 9626dc81a07Seric TAILQ_REMOVE(&hq->q, evp, entry); 963acfdf0daSeric hq->count -= 1; 9646dc81a07Seric if (TAILQ_EMPTY(&hq->q)) { 9657eed50e8Seric tree_xpop(&holdqs[evp->type], evp->holdq); 9666dc81a07Seric free(hq); 9676dc81a07Seric } 9686dc81a07Seric evp->holdq = 0; 9696dc81a07Seric stat_decrement("scheduler.ramqueue.hold", 1); 9706dc81a07Seric } 971a98c8336Seric else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { 9724fe02f32Seric TAILQ_REMOVE(&rq->q_pending, evp, entry); 973a98c8336Seric SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 974a98c8336Seric } 975d17ec9a9Seric 9764fe02f32Seric TAILQ_INSERT_TAIL(q, evp, entry); 977d17ec9a9Seric evp->state = RQ_EVPSTATE_SCHEDULED; 978010bda1bSeric evp->t_scheduled = currtime; 979010bda1bSeric } 980010bda1bSeric 981299c4efeSeric static int 982010bda1bSeric rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) 983010bda1bSeric { 9846dc81a07Seric struct rq_holdq *hq; 985a98c8336Seric struct evplist *evl; 9866dc81a07Seric 9874fe02f32Seric if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) 988299c4efeSeric return (0); 9894fe02f32Seric /* 990d17ec9a9Seric * If envelope is inflight, mark it envelope for removal. 9914fe02f32Seric */ 992d17ec9a9Seric if (evp->state == RQ_EVPSTATE_INFLIGHT) { 993010bda1bSeric evp->flags |= RQ_ENVELOPE_REMOVED; 994d17ec9a9Seric return (1); 995d17ec9a9Seric } 9964fe02f32Seric 9976dc81a07Seric if (evp->state == RQ_EVPSTATE_HELD) { 9987eed50e8Seric hq = tree_xget(&holdqs[evp->type], evp->holdq); 9996dc81a07Seric TAILQ_REMOVE(&hq->q, evp, entry); 1000acfdf0daSeric hq->count -= 1; 10016dc81a07Seric if (TAILQ_EMPTY(&hq->q)) { 10027eed50e8Seric tree_xpop(&holdqs[evp->type], evp->holdq); 10036dc81a07Seric free(hq); 10046dc81a07Seric } 10056dc81a07Seric evp->holdq = 0; 10066dc81a07Seric stat_decrement("scheduler.ramqueue.hold", 1); 10076dc81a07Seric } 10086dc81a07Seric else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { 1009a98c8336Seric evl = rq_envelope_list(rq, evp); 1010a98c8336Seric TAILQ_REMOVE(evl, evp, entry); 1011a98c8336Seric if (evl == &rq->q_pending) 1012a98c8336Seric SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 10134fe02f32Seric } 1014d17ec9a9Seric 1015d17ec9a9Seric TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry); 1016d17ec9a9Seric evp->state = RQ_EVPSTATE_SCHEDULED; 1017d17ec9a9Seric evp->flags |= RQ_ENVELOPE_REMOVED; 1018d17ec9a9Seric evp->t_scheduled = currtime; 1019299c4efeSeric 1020299c4efeSeric return (1); 1021010bda1bSeric } 1022010bda1bSeric 102335e161d3Seric static int 102435e161d3Seric rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) 102535e161d3Seric { 10266dc81a07Seric struct rq_holdq *hq; 1027a98c8336Seric struct evplist *evl; 10286dc81a07Seric 102935e161d3Seric if (evp->flags & RQ_ENVELOPE_SUSPEND) 103035e161d3Seric return (0); 103135e161d3Seric 10326dc81a07Seric if (evp->state == RQ_EVPSTATE_HELD) { 10337eed50e8Seric hq = tree_xget(&holdqs[evp->type], evp->holdq); 10346dc81a07Seric TAILQ_REMOVE(&hq->q, evp, entry); 1035acfdf0daSeric hq->count -= 1; 10366dc81a07Seric if (TAILQ_EMPTY(&hq->q)) { 10377eed50e8Seric tree_xpop(&holdqs[evp->type], evp->holdq); 10386dc81a07Seric free(hq); 10396dc81a07Seric } 10406dc81a07Seric evp->holdq = 0; 10416dc81a07Seric evp->state = RQ_EVPSTATE_PENDING; 10426dc81a07Seric stat_decrement("scheduler.ramqueue.hold", 1); 10436dc81a07Seric } 10446dc81a07Seric else if (evp->state != RQ_EVPSTATE_INFLIGHT) { 1045a98c8336Seric evl = rq_envelope_list(rq, evp); 1046a98c8336Seric TAILQ_REMOVE(evl, evp, entry); 1047a98c8336Seric if (evl == &rq->q_pending) 1048a98c8336Seric SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 1049d17ec9a9Seric } 105035e161d3Seric 105135e161d3Seric evp->flags |= RQ_ENVELOPE_SUSPEND; 105235e161d3Seric 105335e161d3Seric return (1); 105435e161d3Seric } 105535e161d3Seric 105635e161d3Seric static int 105735e161d3Seric rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp) 105835e161d3Seric { 1059a98c8336Seric struct evplist *evl; 1060a98c8336Seric 106135e161d3Seric if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 106235e161d3Seric return (0); 106335e161d3Seric 1064a98c8336Seric if (evp->state != RQ_EVPSTATE_INFLIGHT) { 1065a98c8336Seric evl = rq_envelope_list(rq, evp); 1066a98c8336Seric if (evl == &rq->q_pending) 1067a98c8336Seric sorted_insert(rq, evp); 1068a98c8336Seric else 1069a98c8336Seric TAILQ_INSERT_TAIL(evl, evp, entry); 1070a98c8336Seric } 107135e161d3Seric 107235e161d3Seric evp->flags &= ~RQ_ENVELOPE_SUSPEND; 1073d17ec9a9Seric 107435e161d3Seric return (1); 107535e161d3Seric } 107635e161d3Seric 1077010bda1bSeric static void 1078010bda1bSeric rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp) 1079010bda1bSeric { 1080010bda1bSeric tree_xpop(&evp->message->envelopes, evp->evpid); 1081010bda1bSeric if (tree_empty(&evp->message->envelopes)) { 1082010bda1bSeric tree_xpop(&rq->messages, evp->message->msgid); 1083010bda1bSeric free(evp->message); 1084e4632cf4Sgilles stat_decrement("scheduler.ramqueue.message", 1); 108566bc57beSeric } 108666bc57beSeric 1087010bda1bSeric free(evp); 10884fe02f32Seric rq->evpcount--; 1089e4632cf4Sgilles stat_decrement("scheduler.ramqueue.envelope", 1); 109066bc57beSeric } 109166bc57beSeric 109266bc57beSeric static const char * 1093010bda1bSeric rq_envelope_to_text(struct rq_envelope *e) 109466bc57beSeric { 109566bc57beSeric static char buf[256]; 109666bc57beSeric char t[64]; 109766bc57beSeric 10988904a2d6Sgilles (void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid); 109966bc57beSeric 110066bc57beSeric if (e->type == D_BOUNCE) 11018904a2d6Sgilles (void)strlcat(buf, "bounce", sizeof buf); 110266bc57beSeric else if (e->type == D_MDA) 11038904a2d6Sgilles (void)strlcat(buf, "mda", sizeof buf); 110466bc57beSeric else if (e->type == D_MTA) 11058904a2d6Sgilles (void)strlcat(buf, "mta", sizeof buf); 110666bc57beSeric 11078904a2d6Sgilles (void)snprintf(t, sizeof t, ",expire=%s", 11084fe02f32Seric duration_to_text(e->expire - currtime)); 11098904a2d6Sgilles (void)strlcat(buf, t, sizeof buf); 111066bc57beSeric 1111d17ec9a9Seric 1112d17ec9a9Seric switch (e->state) { 1113d17ec9a9Seric case RQ_EVPSTATE_PENDING: 11148904a2d6Sgilles (void)snprintf(t, sizeof t, ",pending=%s", 1115010bda1bSeric duration_to_text(e->sched - currtime)); 11168904a2d6Sgilles (void)strlcat(buf, t, sizeof buf); 1117d17ec9a9Seric break; 1118d17ec9a9Seric 1119d17ec9a9Seric case RQ_EVPSTATE_SCHEDULED: 11208904a2d6Sgilles (void)snprintf(t, sizeof t, ",scheduled=%s", 1121010bda1bSeric duration_to_text(currtime - e->t_scheduled)); 11228904a2d6Sgilles (void)strlcat(buf, t, sizeof buf); 1123d17ec9a9Seric break; 1124d17ec9a9Seric 1125d17ec9a9Seric case RQ_EVPSTATE_INFLIGHT: 11268904a2d6Sgilles (void)snprintf(t, sizeof t, ",inflight=%s", 1127010bda1bSeric duration_to_text(currtime - e->t_inflight)); 11288904a2d6Sgilles (void)strlcat(buf, t, sizeof buf); 1129d17ec9a9Seric break; 1130d17ec9a9Seric 11316dc81a07Seric case RQ_EVPSTATE_HELD: 11328904a2d6Sgilles (void)snprintf(t, sizeof t, ",held=%s", 11336dc81a07Seric duration_to_text(currtime - e->t_inflight)); 11348904a2d6Sgilles (void)strlcat(buf, t, sizeof buf); 11356dc81a07Seric break; 1136d17ec9a9Seric default: 1137ff01b044Seric fatalx("%016" PRIx64 " bad state %d", e->evpid, e->state); 1138010bda1bSeric } 1139d17ec9a9Seric 1140010bda1bSeric if (e->flags & RQ_ENVELOPE_REMOVED) 11418904a2d6Sgilles (void)strlcat(buf, ",removed", sizeof buf); 114266bc57beSeric if (e->flags & RQ_ENVELOPE_EXPIRED) 11438904a2d6Sgilles (void)strlcat(buf, ",expired", sizeof buf); 114435e161d3Seric if (e->flags & RQ_ENVELOPE_SUSPEND) 11458904a2d6Sgilles (void)strlcat(buf, ",suspended", sizeof buf); 114666bc57beSeric 11478904a2d6Sgilles (void)strlcat(buf, "]", sizeof buf); 114866bc57beSeric 114966bc57beSeric return (buf); 115066bc57beSeric } 115166bc57beSeric 115266bc57beSeric static void 1153010bda1bSeric rq_queue_dump(struct rq_queue *rq, const char * name) 115466bc57beSeric { 115566bc57beSeric struct rq_message *message; 115666bc57beSeric struct rq_envelope *envelope; 115766bc57beSeric void *i, *j; 115866bc57beSeric uint64_t id; 115966bc57beSeric 116082614934Seric log_debug("debug: /--- ramqueue: %s", name); 116166bc57beSeric 116266bc57beSeric i = NULL; 116366bc57beSeric while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) { 116482614934Seric log_debug("debug: | msg:%08" PRIx32, message->msgid); 116566bc57beSeric j = NULL; 116666bc57beSeric while ((tree_iter(&message->envelopes, &j, &id, 116766bc57beSeric (void*)&envelope))) 116882614934Seric log_debug("debug: | %s", 1169010bda1bSeric rq_envelope_to_text(envelope)); 117066bc57beSeric } 117182614934Seric log_debug("debug: \\---"); 1172f6a38d58Sgilles } 1173a98c8336Seric 1174a98c8336Seric static int 1175a98c8336Seric rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2) 1176a98c8336Seric { 1177a98c8336Seric time_t ref1, ref2; 1178a98c8336Seric 1179a98c8336Seric ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire; 1180a98c8336Seric ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire; 1181a98c8336Seric if (ref1 != ref2) 1182a98c8336Seric return (ref1 < ref2) ? -1 : 1; 1183a98c8336Seric 1184a98c8336Seric if (e1->evpid != e2->evpid) 1185a98c8336Seric return (e1->evpid < e2->evpid) ? -1 : 1; 1186a98c8336Seric 1187a98c8336Seric return 0; 1188a98c8336Seric } 1189a98c8336Seric 1190a98c8336Seric SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); 1191