xref: /openbsd-src/usr.sbin/smtpd/scheduler_ramqueue.c (revision ad8d242dc7ca593f1fe13645fd97cf16ba39bc1c)
1*ad8d242dSop /*	$OpenBSD: scheduler_ramqueue.c,v 1.49 2024/09/03 18:27:04 op Exp $	*/
2f6a38d58Sgilles 
3f6a38d58Sgilles /*
465c4fdfbSgilles  * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
56dc81a07Seric  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
6f6a38d58Sgilles  *
7f6a38d58Sgilles  * Permission to use, copy, modify, and distribute this software for any
8f6a38d58Sgilles  * purpose with or without fee is hereby granted, provided that the above
9f6a38d58Sgilles  * copyright notice and this permission notice appear in all copies.
10f6a38d58Sgilles  *
11f6a38d58Sgilles  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12f6a38d58Sgilles  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13f6a38d58Sgilles  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14f6a38d58Sgilles  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15f6a38d58Sgilles  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16f6a38d58Sgilles  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17f6a38d58Sgilles  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18f6a38d58Sgilles  */
19f6a38d58Sgilles 
200e63ed3bSchl #include <inttypes.h>
21f6a38d58Sgilles #include <stdlib.h>
22f6a38d58Sgilles #include <string.h>
230dcffd0dSop #include <time.h>
24f6a38d58Sgilles 
25f6a38d58Sgilles #include "smtpd.h"
26f6a38d58Sgilles #include "log.h"
27f6a38d58Sgilles 
2866bc57beSeric TAILQ_HEAD(evplist, rq_envelope);
29f6a38d58Sgilles 
3066bc57beSeric struct rq_message {
3166bc57beSeric 	uint32_t		 msgid;
3266bc57beSeric 	struct tree		 envelopes;
33f6a38d58Sgilles };
3466bc57beSeric 
3566bc57beSeric struct rq_envelope {
3666bc57beSeric 	TAILQ_ENTRY(rq_envelope) entry;
37a98c8336Seric 	SPLAY_ENTRY(rq_envelope) t_entry;
3866bc57beSeric 
3966bc57beSeric 	uint64_t		 evpid;
406dc81a07Seric 	uint64_t		 holdq;
41d17ec9a9Seric 	enum delivery_type	 type;
4266bc57beSeric 
43d17ec9a9Seric #define	RQ_EVPSTATE_PENDING	 0
44d17ec9a9Seric #define	RQ_EVPSTATE_SCHEDULED	 1
45d17ec9a9Seric #define	RQ_EVPSTATE_INFLIGHT	 2
466dc81a07Seric #define	RQ_EVPSTATE_HELD	 3
47d17ec9a9Seric 	uint8_t			 state;
48d17ec9a9Seric 
49d17ec9a9Seric #define	RQ_ENVELOPE_EXPIRED	 0x01
50d17ec9a9Seric #define	RQ_ENVELOPE_REMOVED	 0x02
51d17ec9a9Seric #define	RQ_ENVELOPE_SUSPEND	 0x04
5281ab3bccSeric #define	RQ_ENVELOPE_UPDATE	 0x08
53acfdf0daSeric #define	RQ_ENVELOPE_OVERFLOW	 0x10
5466bc57beSeric 	uint8_t			 flags;
5566bc57beSeric 
5681ab3bccSeric 	time_t			 ctime;
57f6a38d58Sgilles 	time_t			 sched;
5866bc57beSeric 	time_t			 expire;
5966bc57beSeric 
6066bc57beSeric 	struct rq_message	*message;
61010bda1bSeric 
62010bda1bSeric 	time_t			 t_inflight;
63010bda1bSeric 	time_t			 t_scheduled;
64f6a38d58Sgilles };
65f6a38d58Sgilles 
666dc81a07Seric struct rq_holdq {
676dc81a07Seric 	struct evplist		 q;
68acfdf0daSeric 	size_t			 count;
696dc81a07Seric };
706dc81a07Seric 
7166bc57beSeric struct rq_queue {
72be215280Seric 	size_t			 evpcount;
7366bc57beSeric 	struct tree		 messages;
74a98c8336Seric 	SPLAY_HEAD(prioqtree, rq_envelope)	q_priotree;
75010bda1bSeric 
764fe02f32Seric 	struct evplist		 q_pending;
774fe02f32Seric 	struct evplist		 q_inflight;
78010bda1bSeric 
79d17ec9a9Seric 	struct evplist		 q_mta;
804fe02f32Seric 	struct evplist		 q_mda;
814fe02f32Seric 	struct evplist		 q_bounce;
8281ab3bccSeric 	struct evplist		 q_update;
834fe02f32Seric 	struct evplist		 q_expired;
844fe02f32Seric 	struct evplist		 q_removed;
85f6a38d58Sgilles };
86f6a38d58Sgilles 
87a98c8336Seric static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *);
88a98c8336Seric 
89a98c8336Seric SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
90acfdf0daSeric static int scheduler_ram_init(const char *);
91299c4efeSeric static int scheduler_ram_insert(struct scheduler_info *);
92299c4efeSeric static size_t scheduler_ram_commit(uint32_t);
93299c4efeSeric static size_t scheduler_ram_rollback(uint32_t);
94299c4efeSeric static int scheduler_ram_update(struct scheduler_info *);
95299c4efeSeric static int scheduler_ram_delete(uint64_t);
966dc81a07Seric static int scheduler_ram_hold(uint64_t, uint64_t);
977eed50e8Seric static int scheduler_ram_release(int, uint64_t, int);
98acfdf0daSeric static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *);
99299c4efeSeric static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t);
100299c4efeSeric static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t);
101299c4efeSeric static int scheduler_ram_schedule(uint64_t);
102299c4efeSeric static int scheduler_ram_remove(uint64_t);
10335e161d3Seric static int scheduler_ram_suspend(uint64_t);
10435e161d3Seric static int scheduler_ram_resume(uint64_t);
105a9835440Ssunil static int scheduler_ram_query(uint64_t);
10666bc57beSeric 
107a98c8336Seric static void sorted_insert(struct rq_queue *, struct rq_envelope *);
10866bc57beSeric 
10966bc57beSeric static void rq_queue_init(struct rq_queue *);
11066bc57beSeric static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
111010bda1bSeric static void rq_queue_dump(struct rq_queue *, const char *);
112010bda1bSeric static void rq_queue_schedule(struct rq_queue *rq);
11335e161d3Seric static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *);
114010bda1bSeric static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *);
115299c4efeSeric static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *);
11635e161d3Seric static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *);
11735e161d3Seric static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *);
11866bc57beSeric static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
119010bda1bSeric static const char *rq_envelope_to_text(struct rq_envelope *);
120f6a38d58Sgilles 
121f6a38d58Sgilles struct scheduler_backend scheduler_backend_ramqueue = {
122299c4efeSeric 	scheduler_ram_init,
12366bc57beSeric 
124299c4efeSeric 	scheduler_ram_insert,
125299c4efeSeric 	scheduler_ram_commit,
126299c4efeSeric 	scheduler_ram_rollback,
12766bc57beSeric 
128299c4efeSeric 	scheduler_ram_update,
129299c4efeSeric 	scheduler_ram_delete,
1306dc81a07Seric 	scheduler_ram_hold,
1316dc81a07Seric 	scheduler_ram_release,
13266bc57beSeric 
133299c4efeSeric 	scheduler_ram_batch,
13466bc57beSeric 
135299c4efeSeric 	scheduler_ram_messages,
136299c4efeSeric 	scheduler_ram_envelopes,
137299c4efeSeric 	scheduler_ram_schedule,
138299c4efeSeric 	scheduler_ram_remove,
13935e161d3Seric 	scheduler_ram_suspend,
14035e161d3Seric 	scheduler_ram_resume,
141a9835440Ssunil 	scheduler_ram_query,
142f6a38d58Sgilles };
143f6a38d58Sgilles 
14466bc57beSeric static struct rq_queue	ramqueue;
14566bc57beSeric static struct tree	updates;
1467eed50e8Seric static struct tree	holdqs[3]; /* delivery type */
147411e8119Sgilles 
148010bda1bSeric static time_t		currtime;
149010bda1bSeric 
150acfdf0daSeric #define BACKOFF_TRANSFER	400
151acfdf0daSeric #define BACKOFF_DELIVERY	10
152acfdf0daSeric #define BACKOFF_OVERFLOW	3
153acfdf0daSeric 
154acfdf0daSeric static time_t
155acfdf0daSeric scheduler_backoff(time_t t0, time_t base, uint32_t step)
156acfdf0daSeric {
157acfdf0daSeric 	return (t0 + base * step * step);
158acfdf0daSeric }
159acfdf0daSeric 
160acfdf0daSeric static time_t
161acfdf0daSeric scheduler_next(time_t t0, time_t base, uint32_t step)
162acfdf0daSeric {
163acfdf0daSeric 	time_t t;
164acfdf0daSeric 
165acfdf0daSeric 	/* XXX be more efficient */
166acfdf0daSeric 	while ((t = scheduler_backoff(t0, base, step)) <= currtime)
167acfdf0daSeric 		step++;
168acfdf0daSeric 
169acfdf0daSeric 	return (t);
170acfdf0daSeric }
171acfdf0daSeric 
172299c4efeSeric static int
173acfdf0daSeric scheduler_ram_init(const char *arg)
174f6a38d58Sgilles {
17566bc57beSeric 	rq_queue_init(&ramqueue);
17666bc57beSeric 	tree_init(&updates);
1777eed50e8Seric 	tree_init(&holdqs[D_MDA]);
1787eed50e8Seric 	tree_init(&holdqs[D_MTA]);
1797eed50e8Seric 	tree_init(&holdqs[D_BOUNCE]);
180299c4efeSeric 
181299c4efeSeric 	return (1);
182f6a38d58Sgilles }
183f6a38d58Sgilles 
184299c4efeSeric static int
185299c4efeSeric scheduler_ram_insert(struct scheduler_info *si)
186f6a38d58Sgilles {
18766bc57beSeric 	struct rq_queue		*update;
18866bc57beSeric 	struct rq_message	*message;
18966bc57beSeric 	struct rq_envelope	*envelope;
190acfdf0daSeric 	uint32_t		 msgid;
191f6a38d58Sgilles 
192010bda1bSeric 	currtime = time(NULL);
193010bda1bSeric 
194f05a516fSgilles 	msgid = evpid_to_msgid(si->evpid);
195f6a38d58Sgilles 
19666bc57beSeric 	/* find/prepare a ramqueue update */
19766bc57beSeric 	if ((update = tree_get(&updates, msgid)) == NULL) {
198118c16f3Sgilles 		update = xcalloc(1, sizeof *update);
199e4632cf4Sgilles 		stat_increment("scheduler.ramqueue.update", 1);
20066bc57beSeric 		rq_queue_init(update);
20166bc57beSeric 		tree_xset(&updates, msgid, update);
2029af92d2bSgilles 	}
2039af92d2bSgilles 
20466bc57beSeric 	/* find/prepare the msgtree message in ramqueue update */
20566bc57beSeric 	if ((message = tree_get(&update->messages, msgid)) == NULL) {
206118c16f3Sgilles 		message = xcalloc(1, sizeof *message);
20766bc57beSeric 		message->msgid = msgid;
20866bc57beSeric 		tree_init(&message->envelopes);
20966bc57beSeric 		tree_xset(&update->messages, msgid, message);
210e4632cf4Sgilles 		stat_increment("scheduler.ramqueue.message", 1);
21166bc57beSeric 	}
21266bc57beSeric 
21366bc57beSeric 	/* create envelope in ramqueue message */
214118c16f3Sgilles 	envelope = xcalloc(1, sizeof *envelope);
21566bc57beSeric 	envelope->evpid = si->evpid;
21666bc57beSeric 	envelope->type = si->type;
21766bc57beSeric 	envelope->message = message;
21881ab3bccSeric 	envelope->ctime = si->creation;
219a8e22235Sgilles 	envelope->expire = si->creation + si->ttl;
220acfdf0daSeric 	envelope->sched = scheduler_backoff(si->creation,
221acfdf0daSeric 	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
222010bda1bSeric 	tree_xset(&message->envelopes, envelope->evpid, envelope);
22366bc57beSeric 
224be215280Seric 	update->evpcount++;
225e4632cf4Sgilles 	stat_increment("scheduler.ramqueue.envelope", 1);
2269ed3223cSgilles 
227d17ec9a9Seric 	envelope->state = RQ_EVPSTATE_PENDING;
228a98c8336Seric 	TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry);
22965c4fdfbSgilles 
23065c4fdfbSgilles 	si->nexttry = envelope->sched;
231299c4efeSeric 
232299c4efeSeric 	return (1);
233f6a38d58Sgilles }
234f6a38d58Sgilles 
235be215280Seric static size_t
236299c4efeSeric scheduler_ram_commit(uint32_t msgid)
237f6a38d58Sgilles {
23866bc57beSeric 	struct rq_queue	*update;
239be215280Seric 	size_t		 r;
240010bda1bSeric 
241010bda1bSeric 	currtime = time(NULL);
2429af92d2bSgilles 
24366bc57beSeric 	update = tree_xpop(&updates, msgid);
244be215280Seric 	r = update->evpcount;
2459af92d2bSgilles 
246f24248b7Sreyk 	if (tracing & TRACE_SCHEDULER)
247010bda1bSeric 		rq_queue_dump(update, "update to commit");
248010bda1bSeric 
249010bda1bSeric 	rq_queue_merge(&ramqueue, update);
2504fe02f32Seric 
251f24248b7Sreyk 	if (tracing & TRACE_SCHEDULER)
2524fe02f32Seric 		rq_queue_dump(&ramqueue, "resulting queue");
2534fe02f32Seric 
254010bda1bSeric 	rq_queue_schedule(&ramqueue);
2559af92d2bSgilles 
25666bc57beSeric 	free(update);
257e4632cf4Sgilles 	stat_decrement("scheduler.ramqueue.update", 1);
258be215280Seric 
259be215280Seric 	return (r);
2609af92d2bSgilles }
2619af92d2bSgilles 
262be215280Seric static size_t
263299c4efeSeric scheduler_ram_rollback(uint32_t msgid)
2649af92d2bSgilles {
26566bc57beSeric 	struct rq_queue		*update;
266010bda1bSeric 	struct rq_envelope	*evp;
267be215280Seric 	size_t			 r;
268010bda1bSeric 
269010bda1bSeric 	currtime = time(NULL);
270f6a38d58Sgilles 
271cb35e574Seric 	if ((update = tree_pop(&updates, msgid)) == NULL)
272be215280Seric 		return (0);
273be215280Seric 	r = update->evpcount;
2749af92d2bSgilles 
2754fe02f32Seric 	while ((evp = TAILQ_FIRST(&update->q_pending))) {
2764fe02f32Seric 		TAILQ_REMOVE(&update->q_pending, evp, entry);
277010bda1bSeric 		rq_envelope_delete(update, evp);
278010bda1bSeric 	}
27966bc57beSeric 
28066bc57beSeric 	free(update);
281e4632cf4Sgilles 	stat_decrement("scheduler.ramqueue.update", 1);
282be215280Seric 
283be215280Seric 	return (r);
28466bc57beSeric }
28566bc57beSeric 
286299c4efeSeric static int
287299c4efeSeric scheduler_ram_update(struct scheduler_info *si)
28866bc57beSeric {
289010bda1bSeric 	struct rq_message	*msg;
290010bda1bSeric 	struct rq_envelope	*evp;
29166bc57beSeric 	uint32_t		 msgid;
29266bc57beSeric 
293010bda1bSeric 	currtime = time(NULL);
294010bda1bSeric 
29566bc57beSeric 	msgid = evpid_to_msgid(si->evpid);
296010bda1bSeric 	msg = tree_xget(&ramqueue.messages, msgid);
297010bda1bSeric 	evp = tree_xget(&msg->envelopes, si->evpid);
29866bc57beSeric 
299010bda1bSeric 	/* it *must* be in-flight */
300d17ec9a9Seric 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
301ff01b044Seric 		fatalx("evp:%016" PRIx64 " not in-flight", si->evpid);
30266bc57beSeric 
303d17ec9a9Seric 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
304d17ec9a9Seric 
305d17ec9a9Seric 	/*
306d17ec9a9Seric 	 * If the envelope was removed while inflight,  schedule it for
307af0dba2aSsobrado 	 * removal immediately.
308d17ec9a9Seric 	 */
309d17ec9a9Seric 	if (evp->flags & RQ_ENVELOPE_REMOVED) {
310d17ec9a9Seric 		TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry);
311d17ec9a9Seric 		evp->state = RQ_EVPSTATE_SCHEDULED;
312d17ec9a9Seric 		evp->t_scheduled = currtime;
313d17ec9a9Seric 		return (1);
314d17ec9a9Seric 	}
315d17ec9a9Seric 
316acfdf0daSeric 	evp->sched = scheduler_next(evp->ctime,
317acfdf0daSeric 	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
31866bc57beSeric 
319d17ec9a9Seric 	evp->state = RQ_EVPSTATE_PENDING;
32035e161d3Seric 	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
321a98c8336Seric 		sorted_insert(&ramqueue, evp);
32265c4fdfbSgilles 
32365c4fdfbSgilles 	si->nexttry = evp->sched;
324299c4efeSeric 
325299c4efeSeric 	return (1);
32666bc57beSeric }
32766bc57beSeric 
328299c4efeSeric static int
329299c4efeSeric scheduler_ram_delete(uint64_t evpid)
33066bc57beSeric {
331010bda1bSeric 	struct rq_message	*msg;
332010bda1bSeric 	struct rq_envelope	*evp;
33366bc57beSeric 	uint32_t		 msgid;
33466bc57beSeric 
335010bda1bSeric 	currtime = time(NULL);
336010bda1bSeric 
33766bc57beSeric 	msgid = evpid_to_msgid(evpid);
338010bda1bSeric 	msg = tree_xget(&ramqueue.messages, msgid);
339010bda1bSeric 	evp = tree_xget(&msg->envelopes, evpid);
34066bc57beSeric 
34166bc57beSeric 	/* it *must* be in-flight */
342d17ec9a9Seric 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
343ff01b044Seric 		fatalx("evp:%016" PRIx64 " not in-flight", evpid);
34466bc57beSeric 
3454fe02f32Seric 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
346d17ec9a9Seric 
347010bda1bSeric 	rq_envelope_delete(&ramqueue, evp);
348299c4efeSeric 
349299c4efeSeric 	return (1);
35066bc57beSeric }
35166bc57beSeric 
352acfdf0daSeric #define HOLDQ_MAXSIZE	1000
353acfdf0daSeric 
354299c4efeSeric static int
3556dc81a07Seric scheduler_ram_hold(uint64_t evpid, uint64_t holdq)
3566dc81a07Seric {
3576dc81a07Seric 	struct rq_holdq		*hq;
3586dc81a07Seric 	struct rq_message	*msg;
3596dc81a07Seric 	struct rq_envelope	*evp;
3606dc81a07Seric 	uint32_t		 msgid;
3616dc81a07Seric 
3626dc81a07Seric 	currtime = time(NULL);
3636dc81a07Seric 
3646dc81a07Seric 	msgid = evpid_to_msgid(evpid);
3656dc81a07Seric 	msg = tree_xget(&ramqueue.messages, msgid);
3666dc81a07Seric 	evp = tree_xget(&msg->envelopes, evpid);
3676dc81a07Seric 
3686dc81a07Seric 	/* it *must* be in-flight */
3696dc81a07Seric 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
370ff01b044Seric 		fatalx("evp:%016" PRIx64 " not in-flight", evpid);
3716dc81a07Seric 
3726dc81a07Seric 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
3736dc81a07Seric 
3746dc81a07Seric 	/* If the envelope is suspended, just mark it as pending */
3756dc81a07Seric 	if (evp->flags & RQ_ENVELOPE_SUSPEND) {
3766dc81a07Seric 		evp->state = RQ_EVPSTATE_PENDING;
3776dc81a07Seric 		return (0);
3786dc81a07Seric 	}
3796dc81a07Seric 
3807eed50e8Seric 	hq = tree_get(&holdqs[evp->type], holdq);
3816dc81a07Seric 	if (hq == NULL) {
382118c16f3Sgilles 		hq = xcalloc(1, sizeof(*hq));
3836dc81a07Seric 		TAILQ_INIT(&hq->q);
3847eed50e8Seric 		tree_xset(&holdqs[evp->type], holdq, hq);
385acfdf0daSeric 		stat_increment("scheduler.ramqueue.holdq", 1);
386acfdf0daSeric 	}
387acfdf0daSeric 
388acfdf0daSeric 	/* If the holdq is full, just "tempfail" the envelope */
389acfdf0daSeric 	if (hq->count >= HOLDQ_MAXSIZE) {
390acfdf0daSeric 		evp->state = RQ_EVPSTATE_PENDING;
391acfdf0daSeric 		evp->flags |= RQ_ENVELOPE_UPDATE;
392acfdf0daSeric 		evp->flags |= RQ_ENVELOPE_OVERFLOW;
393acfdf0daSeric 		sorted_insert(&ramqueue, evp);
394acfdf0daSeric 		stat_increment("scheduler.ramqueue.hold-overflow", 1);
395acfdf0daSeric 		return (0);
3966dc81a07Seric 	}
3976dc81a07Seric 
3986dc81a07Seric 	evp->state = RQ_EVPSTATE_HELD;
3996dc81a07Seric 	evp->holdq = holdq;
4006dc81a07Seric 	/* This is an optimization: upon release, the envelopes will be
4016dc81a07Seric 	 * inserted in the pending queue from the first element to the last.
4026dc81a07Seric 	 * Since elements already in the queue were received first, they
4036dc81a07Seric 	 * were scheduled first, so they will be reinserted before the
4046dc81a07Seric 	 * current element.
4056dc81a07Seric 	 */
4066dc81a07Seric 	TAILQ_INSERT_HEAD(&hq->q, evp, entry);
407acfdf0daSeric 	hq->count += 1;
4086dc81a07Seric 	stat_increment("scheduler.ramqueue.hold", 1);
4096dc81a07Seric 
4106dc81a07Seric 	return (1);
4116dc81a07Seric }
4126dc81a07Seric 
4136dc81a07Seric static int
4147eed50e8Seric scheduler_ram_release(int type, uint64_t holdq, int n)
4156dc81a07Seric {
4166dc81a07Seric 	struct rq_holdq		*hq;
4176dc81a07Seric 	struct rq_envelope	*evp;
41881ab3bccSeric 	int			 i, update;
4196dc81a07Seric 
4206dc81a07Seric 	currtime = time(NULL);
4216dc81a07Seric 
4227eed50e8Seric 	hq = tree_get(&holdqs[type], holdq);
4236dc81a07Seric 	if (hq == NULL)
4246dc81a07Seric 		return (0);
4256dc81a07Seric 
42681ab3bccSeric 	if (n == -1) {
42781ab3bccSeric 		n = 0;
42881ab3bccSeric 		update = 1;
42981ab3bccSeric 	}
43081ab3bccSeric 	else
43181ab3bccSeric 		update = 0;
43281ab3bccSeric 
4336dc81a07Seric 	for (i = 0; n == 0 || i < n; i++) {
4346dc81a07Seric 		evp = TAILQ_FIRST(&hq->q);
4356dc81a07Seric 		if (evp == NULL)
4366dc81a07Seric 			break;
4376dc81a07Seric 
4386dc81a07Seric 		TAILQ_REMOVE(&hq->q, evp, entry);
439acfdf0daSeric 		hq->count -= 1;
4406dc81a07Seric 		evp->holdq = 0;
4416dc81a07Seric 
4426dc81a07Seric 		/* When released, all envelopes are put in the pending queue
443af0dba2aSsobrado 		 * and will be rescheduled immediately.  As an optimization,
4446dc81a07Seric 		 * we could just schedule them directly.
4456dc81a07Seric 		 */
4466dc81a07Seric 		evp->state = RQ_EVPSTATE_PENDING;
44781ab3bccSeric 		if (update)
44881ab3bccSeric 			evp->flags |= RQ_ENVELOPE_UPDATE;
449a98c8336Seric 		sorted_insert(&ramqueue, evp);
4506dc81a07Seric 	}
4516dc81a07Seric 
4526dc81a07Seric 	if (TAILQ_EMPTY(&hq->q)) {
4537eed50e8Seric 		tree_xpop(&holdqs[type], holdq);
4546dc81a07Seric 		free(hq);
455acfdf0daSeric 		stat_decrement("scheduler.ramqueue.holdq", 1);
4566dc81a07Seric 	}
4576dc81a07Seric 	stat_decrement("scheduler.ramqueue.hold", i);
4586dc81a07Seric 
4596dc81a07Seric 	return (i);
4606dc81a07Seric }
4616dc81a07Seric 
4626dc81a07Seric static int
463acfdf0daSeric scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types)
46466bc57beSeric {
4654fe02f32Seric 	struct rq_envelope	*evp;
466acfdf0daSeric 	size_t			 i, n;
467acfdf0daSeric 	time_t			 t;
46866bc57beSeric 
469010bda1bSeric 	currtime = time(NULL);
47066bc57beSeric 
471010bda1bSeric 	rq_queue_schedule(&ramqueue);
472f24248b7Sreyk 	if (tracing & TRACE_SCHEDULER)
473299c4efeSeric 		rq_queue_dump(&ramqueue, "scheduler_ram_batch()");
474010bda1bSeric 
475acfdf0daSeric 	i = 0;
476acfdf0daSeric 	n = 0;
47766bc57beSeric 
478acfdf0daSeric 	for (;;) {
4794fe02f32Seric 
480acfdf0daSeric 		if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) {
481acfdf0daSeric 			TAILQ_REMOVE(&ramqueue.q_removed, evp, entry);
482acfdf0daSeric 			types[i] = SCHED_REMOVE;
483acfdf0daSeric 			evpids[i] = evp->evpid;
484010bda1bSeric 			rq_envelope_delete(&ramqueue, evp);
48581ab3bccSeric 
486acfdf0daSeric 			if (++i == *count)
487acfdf0daSeric 				break;
488acfdf0daSeric 		}
48981ab3bccSeric 
490acfdf0daSeric 		if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) {
491acfdf0daSeric 			TAILQ_REMOVE(&ramqueue.q_expired, evp, entry);
492acfdf0daSeric 			types[i] = SCHED_EXPIRE;
493acfdf0daSeric 			evpids[i] = evp->evpid;
494acfdf0daSeric 			rq_envelope_delete(&ramqueue, evp);
49581ab3bccSeric 
496acfdf0daSeric 			if (++i == *count)
497acfdf0daSeric 				break;
498acfdf0daSeric 		}
499acfdf0daSeric 
500acfdf0daSeric 		if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) {
501acfdf0daSeric 			TAILQ_REMOVE(&ramqueue.q_update, evp, entry);
502acfdf0daSeric 			types[i] = SCHED_UPDATE;
503acfdf0daSeric 			evpids[i] = evp->evpid;
504acfdf0daSeric 
505acfdf0daSeric 			if (evp->flags & RQ_ENVELOPE_OVERFLOW)
506acfdf0daSeric 				t = BACKOFF_OVERFLOW;
507acfdf0daSeric 			else if (evp->type == D_MTA)
508acfdf0daSeric 				t = BACKOFF_TRANSFER;
509acfdf0daSeric 			else
510acfdf0daSeric 				t = BACKOFF_DELIVERY;
511acfdf0daSeric 
512acfdf0daSeric 			evp->sched = scheduler_next(evp->ctime, t, 0);
513acfdf0daSeric 			evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW);
51481ab3bccSeric 			evp->state = RQ_EVPSTATE_PENDING;
51581ab3bccSeric 			if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
51681ab3bccSeric 				sorted_insert(&ramqueue, evp);
517acfdf0daSeric 
518acfdf0daSeric 			if (++i == *count)
519acfdf0daSeric 				break;
52081ab3bccSeric 		}
521acfdf0daSeric 
522acfdf0daSeric 		if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) {
523acfdf0daSeric 			TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry);
524acfdf0daSeric 			types[i] = SCHED_BOUNCE;
525acfdf0daSeric 			evpids[i] = evp->evpid;
526acfdf0daSeric 
5274fe02f32Seric 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
528d17ec9a9Seric 			evp->state = RQ_EVPSTATE_INFLIGHT;
529010bda1bSeric 			evp->t_inflight = currtime;
530acfdf0daSeric 
531acfdf0daSeric 			if (++i == *count)
532acfdf0daSeric 				break;
53366bc57beSeric 		}
534bc53cd6aSeric 
535acfdf0daSeric 		if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) {
536acfdf0daSeric 			TAILQ_REMOVE(&ramqueue.q_mda, evp, entry);
537acfdf0daSeric 			types[i] = SCHED_MDA;
538acfdf0daSeric 			evpids[i] = evp->evpid;
53966bc57beSeric 
540acfdf0daSeric 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
541acfdf0daSeric 			evp->state = RQ_EVPSTATE_INFLIGHT;
542acfdf0daSeric 			evp->t_inflight = currtime;
543d6b3bcf4Seric 
544acfdf0daSeric 			if (++i == *count)
545acfdf0daSeric 				break;
546acfdf0daSeric 		}
547d6b3bcf4Seric 
548acfdf0daSeric 		if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) {
549acfdf0daSeric 			TAILQ_REMOVE(&ramqueue.q_mta, evp, entry);
550acfdf0daSeric 			types[i] = SCHED_MTA;
551acfdf0daSeric 			evpids[i] = evp->evpid;
552acfdf0daSeric 
553acfdf0daSeric 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
554acfdf0daSeric 			evp->state = RQ_EVPSTATE_INFLIGHT;
555acfdf0daSeric 			evp->t_inflight = currtime;
556acfdf0daSeric 
557acfdf0daSeric 			if (++i == *count)
558acfdf0daSeric 				break;
559acfdf0daSeric 		}
560acfdf0daSeric 
561acfdf0daSeric 		/* nothing seen this round */
562acfdf0daSeric 		if (i == n)
563acfdf0daSeric 			break;
564acfdf0daSeric 
565acfdf0daSeric 		n = i;
566acfdf0daSeric 	}
567acfdf0daSeric 
568acfdf0daSeric 	if (i) {
569acfdf0daSeric 		*count = i;
570acfdf0daSeric 		return (1);
571acfdf0daSeric 	}
572acfdf0daSeric 
573acfdf0daSeric 	if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
574acfdf0daSeric 		if (evp->sched < evp->expire)
575acfdf0daSeric 			t = evp->sched;
576acfdf0daSeric 		else
577acfdf0daSeric 			t = evp->expire;
578acfdf0daSeric 		*delay = (t < currtime) ? 0 : (t - currtime);
579acfdf0daSeric 	}
580acfdf0daSeric 	else
581acfdf0daSeric 		*delay = -1;
582acfdf0daSeric 
583acfdf0daSeric 	return (0);
584f6a38d58Sgilles }
585f6a38d58Sgilles 
5864fe02f32Seric static size_t
587299c4efeSeric scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size)
5884fe02f32Seric {
5894fe02f32Seric 	uint64_t id;
5904fe02f32Seric 	size_t	 n;
5914fe02f32Seric 	void	*i;
5924fe02f32Seric 
5934fe02f32Seric 	for (n = 0, i = NULL; n < size; n++) {
5944fe02f32Seric 		if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
5954fe02f32Seric 			break;
5964fe02f32Seric 		dst[n] = id;
5974fe02f32Seric 	}
5984fe02f32Seric 
5994fe02f32Seric 	return (n);
6004fe02f32Seric }
6014fe02f32Seric 
6024fe02f32Seric static size_t
603299c4efeSeric scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size)
6044fe02f32Seric {
6054fe02f32Seric 	struct rq_message	*msg;
6064fe02f32Seric 	struct rq_envelope	*evp;
6074fe02f32Seric 	void			*i;
6084fe02f32Seric 	size_t			 n;
6094fe02f32Seric 
6104fe02f32Seric 	if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
6114fe02f32Seric 		return (0);
6124fe02f32Seric 
6134fe02f32Seric 	for (n = 0, i = NULL; n < size; ) {
6144fe02f32Seric 
6154fe02f32Seric 		if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
6164fe02f32Seric 		    (void**)&evp) == 0)
6174fe02f32Seric 			break;
6184fe02f32Seric 
6194fe02f32Seric 		if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
6204fe02f32Seric 			continue;
6214fe02f32Seric 
6224fe02f32Seric 		dst[n].evpid = evp->evpid;
623d17ec9a9Seric 		dst[n].flags = 0;
624d17ec9a9Seric 		dst[n].retry = 0;
625d17ec9a9Seric 		dst[n].time = 0;
626d17ec9a9Seric 
627d17ec9a9Seric 		if (evp->state == RQ_EVPSTATE_PENDING) {
6284fe02f32Seric 			dst[n].time = evp->sched;
62965c4fdfbSgilles 			dst[n].flags = EF_PENDING;
6304fe02f32Seric 		}
631d17ec9a9Seric 		else if (evp->state == RQ_EVPSTATE_SCHEDULED) {
6324fe02f32Seric 			dst[n].time = evp->t_scheduled;
63365c4fdfbSgilles 			dst[n].flags = EF_PENDING;
6344fe02f32Seric 		}
635d17ec9a9Seric 		else if (evp->state == RQ_EVPSTATE_INFLIGHT) {
6364fe02f32Seric 			dst[n].time = evp->t_inflight;
63765c4fdfbSgilles 			dst[n].flags = EF_INFLIGHT;
6384fe02f32Seric 		}
6396dc81a07Seric 		else if (evp->state == RQ_EVPSTATE_HELD) {
6406dc81a07Seric 			/* same as scheduled */
6416dc81a07Seric 			dst[n].time = evp->t_scheduled;
6426dc81a07Seric 			dst[n].flags = EF_PENDING;
6436dc81a07Seric 			dst[n].flags |= EF_HOLD;
6446dc81a07Seric 		}
64535e161d3Seric 		if (evp->flags & RQ_ENVELOPE_SUSPEND)
64635e161d3Seric 			dst[n].flags |= EF_SUSPEND;
647d17ec9a9Seric 
6484fe02f32Seric 		n++;
6494fe02f32Seric 	}
6504fe02f32Seric 
6514fe02f32Seric 	return (n);
6524fe02f32Seric }
6534fe02f32Seric 
654299c4efeSeric static int
655299c4efeSeric scheduler_ram_schedule(uint64_t evpid)
656299c4efeSeric {
657299c4efeSeric 	struct rq_message	*msg;
658299c4efeSeric 	struct rq_envelope	*evp;
659299c4efeSeric 	uint32_t		 msgid;
660299c4efeSeric 	void			*i;
661299c4efeSeric 	int			 r;
662299c4efeSeric 
663299c4efeSeric 	currtime = time(NULL);
664299c4efeSeric 
665299c4efeSeric 	if (evpid > 0xffffffff) {
666299c4efeSeric 		msgid = evpid_to_msgid(evpid);
667299c4efeSeric 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
668299c4efeSeric 			return (0);
669299c4efeSeric 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
670299c4efeSeric 			return (0);
671d17ec9a9Seric 		if (evp->state == RQ_EVPSTATE_INFLIGHT)
672d17ec9a9Seric 			return (0);
673299c4efeSeric 		rq_envelope_schedule(&ramqueue, evp);
674299c4efeSeric 		return (1);
675299c4efeSeric 	}
676299c4efeSeric 	else {
677299c4efeSeric 		msgid = evpid;
678299c4efeSeric 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
679299c4efeSeric 			return (0);
680299c4efeSeric 		i = NULL;
681299c4efeSeric 		r = 0;
682d17ec9a9Seric 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) {
683d17ec9a9Seric 			if (evp->state == RQ_EVPSTATE_INFLIGHT)
684d17ec9a9Seric 				continue;
685299c4efeSeric 			rq_envelope_schedule(&ramqueue, evp);
686299c4efeSeric 			r++;
687299c4efeSeric 		}
688299c4efeSeric 		return (r);
689299c4efeSeric 	}
690299c4efeSeric }
691299c4efeSeric 
692299c4efeSeric static int
693299c4efeSeric scheduler_ram_remove(uint64_t evpid)
694299c4efeSeric {
695299c4efeSeric 	struct rq_message	*msg;
696299c4efeSeric 	struct rq_envelope	*evp;
697299c4efeSeric 	uint32_t		 msgid;
698299c4efeSeric 	void			*i;
699299c4efeSeric 	int			 r;
700299c4efeSeric 
701299c4efeSeric 	currtime = time(NULL);
702299c4efeSeric 
703299c4efeSeric 	if (evpid > 0xffffffff) {
704299c4efeSeric 		msgid = evpid_to_msgid(evpid);
705299c4efeSeric 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
706299c4efeSeric 			return (0);
707299c4efeSeric 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
708299c4efeSeric 			return (0);
709299c4efeSeric 		if (rq_envelope_remove(&ramqueue, evp))
710299c4efeSeric 			return (1);
711299c4efeSeric 		return (0);
712299c4efeSeric 	}
713299c4efeSeric 	else {
714299c4efeSeric 		msgid = evpid;
715299c4efeSeric 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
716299c4efeSeric 			return (0);
717299c4efeSeric 		i = NULL;
718299c4efeSeric 		r = 0;
719299c4efeSeric 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
720299c4efeSeric 			if (rq_envelope_remove(&ramqueue, evp))
721299c4efeSeric 				r++;
722299c4efeSeric 		return (r);
723299c4efeSeric 	}
724299c4efeSeric }
725299c4efeSeric 
72635e161d3Seric static int
72735e161d3Seric scheduler_ram_suspend(uint64_t evpid)
72835e161d3Seric {
72935e161d3Seric 	struct rq_message	*msg;
73035e161d3Seric 	struct rq_envelope	*evp;
73135e161d3Seric 	uint32_t		 msgid;
73235e161d3Seric 	void			*i;
73335e161d3Seric 	int			 r;
73435e161d3Seric 
73535e161d3Seric 	currtime = time(NULL);
73635e161d3Seric 
73735e161d3Seric 	if (evpid > 0xffffffff) {
73835e161d3Seric 		msgid = evpid_to_msgid(evpid);
73935e161d3Seric 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
74035e161d3Seric 			return (0);
74135e161d3Seric 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
74235e161d3Seric 			return (0);
74335e161d3Seric 		if (rq_envelope_suspend(&ramqueue, evp))
74435e161d3Seric 			return (1);
74535e161d3Seric 		return (0);
74635e161d3Seric 	}
74735e161d3Seric 	else {
74835e161d3Seric 		msgid = evpid;
74935e161d3Seric 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
75035e161d3Seric 			return (0);
75135e161d3Seric 		i = NULL;
75235e161d3Seric 		r = 0;
75335e161d3Seric 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
75435e161d3Seric 			if (rq_envelope_suspend(&ramqueue, evp))
75535e161d3Seric 				r++;
75635e161d3Seric 		return (r);
75735e161d3Seric 	}
75835e161d3Seric }
75935e161d3Seric 
76035e161d3Seric static int
76135e161d3Seric scheduler_ram_resume(uint64_t evpid)
76235e161d3Seric {
76335e161d3Seric 	struct rq_message	*msg;
76435e161d3Seric 	struct rq_envelope	*evp;
76535e161d3Seric 	uint32_t		 msgid;
76635e161d3Seric 	void			*i;
76735e161d3Seric 	int			 r;
76835e161d3Seric 
76935e161d3Seric 	currtime = time(NULL);
77035e161d3Seric 
77135e161d3Seric 	if (evpid > 0xffffffff) {
77235e161d3Seric 		msgid = evpid_to_msgid(evpid);
77335e161d3Seric 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
77435e161d3Seric 			return (0);
77535e161d3Seric 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
77635e161d3Seric 			return (0);
77735e161d3Seric 		if (rq_envelope_resume(&ramqueue, evp))
77835e161d3Seric 			return (1);
77935e161d3Seric 		return (0);
78035e161d3Seric 	}
78135e161d3Seric 	else {
78235e161d3Seric 		msgid = evpid;
78335e161d3Seric 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
78435e161d3Seric 			return (0);
78535e161d3Seric 		i = NULL;
78635e161d3Seric 		r = 0;
78735e161d3Seric 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
78835e161d3Seric 			if (rq_envelope_resume(&ramqueue, evp))
78935e161d3Seric 				r++;
79035e161d3Seric 		return (r);
79135e161d3Seric 	}
79235e161d3Seric }
79335e161d3Seric 
794a9835440Ssunil static int
795a9835440Ssunil scheduler_ram_query(uint64_t evpid)
796a9835440Ssunil {
797a9835440Ssunil 	uint32_t msgid;
798a9835440Ssunil 
799a9835440Ssunil 	if (evpid > 0xffffffff)
800a9835440Ssunil 		msgid = evpid_to_msgid(evpid);
801a9835440Ssunil 	else
802a9835440Ssunil 		msgid = evpid;
803a9835440Ssunil 
804a9835440Ssunil 	if (tree_get(&ramqueue.messages, msgid) == NULL)
805a9835440Ssunil 		return (0);
806a9835440Ssunil 
807a9835440Ssunil 	return (1);
808a9835440Ssunil }
809a9835440Ssunil 
810f6a38d58Sgilles static void
811a98c8336Seric sorted_insert(struct rq_queue *rq, struct rq_envelope *evp)
812f6a38d58Sgilles {
813a98c8336Seric 	struct rq_envelope	*evp2;
814f6a38d58Sgilles 
815a98c8336Seric 	SPLAY_INSERT(prioqtree, &rq->q_priotree, evp);
816a98c8336Seric 	evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp);
817a98c8336Seric 	if (evp2)
818a98c8336Seric 		TAILQ_INSERT_BEFORE(evp2, evp, entry);
819a98c8336Seric 	else
820a98c8336Seric 		TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry);
821f6a38d58Sgilles }
822f6a38d58Sgilles 
823f6a38d58Sgilles static void
82466bc57beSeric rq_queue_init(struct rq_queue *rq)
825f6a38d58Sgilles {
826c1392a69Seric 	memset(rq, 0, sizeof *rq);
82766bc57beSeric 	tree_init(&rq->messages);
8284fe02f32Seric 	TAILQ_INIT(&rq->q_pending);
8294fe02f32Seric 	TAILQ_INIT(&rq->q_inflight);
830d17ec9a9Seric 	TAILQ_INIT(&rq->q_mta);
8314fe02f32Seric 	TAILQ_INIT(&rq->q_mda);
8324fe02f32Seric 	TAILQ_INIT(&rq->q_bounce);
83381ab3bccSeric 	TAILQ_INIT(&rq->q_update);
8344fe02f32Seric 	TAILQ_INIT(&rq->q_expired);
8354fe02f32Seric 	TAILQ_INIT(&rq->q_removed);
836a98c8336Seric 	SPLAY_INIT(&rq->q_priotree);
83766bc57beSeric }
838f6a38d58Sgilles 
839f6a38d58Sgilles static void
84066bc57beSeric rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
841f6a38d58Sgilles {
84266bc57beSeric 	struct rq_message	*message, *tomessage;
84366bc57beSeric 	struct rq_envelope	*envelope;
84466bc57beSeric 	uint64_t		 id;
84566bc57beSeric 	void			*i;
84666bc57beSeric 
84766bc57beSeric 	while (tree_poproot(&update->messages, &id, (void*)&message)) {
84866bc57beSeric 		if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
849*ad8d242dSop 			/* message does not exist. reuse structure */
85066bc57beSeric 			tree_xset(&rq->messages, id, message);
85166bc57beSeric 			continue;
85266bc57beSeric 		}
85366bc57beSeric 		/* need to re-link all envelopes before merging them */
85466bc57beSeric 		i = NULL;
85566bc57beSeric 		while ((tree_iter(&message->envelopes, &i, &id,
85666bc57beSeric 		    (void*)&envelope)))
85766bc57beSeric 			envelope->message = tomessage;
85866bc57beSeric 		tree_merge(&tomessage->envelopes, &message->envelopes);
85966bc57beSeric 		free(message);
860ce6c53f0Seric 		stat_decrement("scheduler.ramqueue.message", 1);
86166bc57beSeric 	}
86266bc57beSeric 
863a98c8336Seric 	/* Sorted insert in the pending queue */
864a98c8336Seric 	while ((envelope = TAILQ_FIRST(&update->q_pending))) {
865a98c8336Seric 		TAILQ_REMOVE(&update->q_pending, envelope, entry);
866a98c8336Seric 		sorted_insert(rq, envelope);
867a98c8336Seric 	}
868a98c8336Seric 
8694fe02f32Seric 	rq->evpcount += update->evpcount;
87066bc57beSeric }
87166bc57beSeric 
872acfdf0daSeric #define SCHEDULEMAX	1024
873acfdf0daSeric 
87466bc57beSeric static void
875010bda1bSeric rq_queue_schedule(struct rq_queue *rq)
87666bc57beSeric {
877010bda1bSeric 	struct rq_envelope	*evp;
878acfdf0daSeric 	size_t			 n;
87966bc57beSeric 
880acfdf0daSeric 	n = 0;
8814fe02f32Seric 	while ((evp = TAILQ_FIRST(&rq->q_pending))) {
882010bda1bSeric 		if (evp->sched > currtime && evp->expire > currtime)
883010bda1bSeric 			break;
88466bc57beSeric 
885acfdf0daSeric 		if (n == SCHEDULEMAX)
886acfdf0daSeric 			break;
887acfdf0daSeric 
888d17ec9a9Seric 		if (evp->state != RQ_EVPSTATE_PENDING)
889ff01b044Seric 			fatalx("evp:%016" PRIx64 " flags=0x%x", evp->evpid,
890010bda1bSeric 			    evp->flags);
89166bc57beSeric 
892010bda1bSeric 		if (evp->expire <= currtime) {
8934fe02f32Seric 			TAILQ_REMOVE(&rq->q_pending, evp, entry);
894a98c8336Seric 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
8954fe02f32Seric 			TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
896d17ec9a9Seric 			evp->state = RQ_EVPSTATE_SCHEDULED;
897010bda1bSeric 			evp->flags |= RQ_ENVELOPE_EXPIRED;
898010bda1bSeric 			evp->t_scheduled = currtime;
899010bda1bSeric 			continue;
900010bda1bSeric 		}
901010bda1bSeric 		rq_envelope_schedule(rq, evp);
902acfdf0daSeric 		n += 1;
903010bda1bSeric 	}
904010bda1bSeric }
905010bda1bSeric 
90635e161d3Seric static struct evplist *
90735e161d3Seric rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
90835e161d3Seric {
909d17ec9a9Seric 	switch (evp->state) {
910d17ec9a9Seric 	case RQ_EVPSTATE_PENDING:
911d17ec9a9Seric 		return &rq->q_pending;
912d17ec9a9Seric 
913d17ec9a9Seric 	case RQ_EVPSTATE_SCHEDULED:
91435e161d3Seric 		if (evp->flags & RQ_ENVELOPE_EXPIRED)
91535e161d3Seric 			return &rq->q_expired;
91635e161d3Seric 		if (evp->flags & RQ_ENVELOPE_REMOVED)
91735e161d3Seric 			return &rq->q_removed;
91881ab3bccSeric 		if (evp->flags & RQ_ENVELOPE_UPDATE)
91981ab3bccSeric 			return &rq->q_update;
92035e161d3Seric 		if (evp->type == D_MTA)
921d17ec9a9Seric 			return &rq->q_mta;
92235e161d3Seric 		if (evp->type == D_MDA)
92335e161d3Seric 			return &rq->q_mda;
92435e161d3Seric 		if (evp->type == D_BOUNCE)
92535e161d3Seric 			return &rq->q_bounce;
926ff01b044Seric 		fatalx("%016" PRIx64 " bad evp type %d", evp->evpid, evp->type);
927d17ec9a9Seric 
928d17ec9a9Seric 	case RQ_EVPSTATE_INFLIGHT:
929d17ec9a9Seric 		return &rq->q_inflight;
9306dc81a07Seric 
9316dc81a07Seric 	case RQ_EVPSTATE_HELD:
9326dc81a07Seric 		return (NULL);
93335e161d3Seric 	}
93435e161d3Seric 
935ff01b044Seric 	fatalx("%016" PRIx64 " bad state %d", evp->evpid, evp->state);
93635e161d3Seric 	return (NULL);
93735e161d3Seric }
93835e161d3Seric 
939010bda1bSeric static void
940010bda1bSeric rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
941010bda1bSeric {
9426dc81a07Seric 	struct rq_holdq	*hq;
9434fe02f32Seric 	struct evplist	*q = NULL;
944010bda1bSeric 
945d17ec9a9Seric 	switch (evp->type) {
946d17ec9a9Seric 	case D_MTA:
947d17ec9a9Seric 		q = &rq->q_mta;
948d17ec9a9Seric 		break;
949d17ec9a9Seric 	case D_MDA:
9504fe02f32Seric 		q = &rq->q_mda;
951d17ec9a9Seric 		break;
952d17ec9a9Seric 	case D_BOUNCE:
9534fe02f32Seric 		q = &rq->q_bounce;
954d17ec9a9Seric 		break;
955d17ec9a9Seric 	}
9564fe02f32Seric 
95781ab3bccSeric 	if (evp->flags & RQ_ENVELOPE_UPDATE)
95881ab3bccSeric 		q = &rq->q_update;
95981ab3bccSeric 
9606dc81a07Seric 	if (evp->state == RQ_EVPSTATE_HELD) {
9617eed50e8Seric 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
9626dc81a07Seric 		TAILQ_REMOVE(&hq->q, evp, entry);
963acfdf0daSeric 		hq->count -= 1;
9646dc81a07Seric 		if (TAILQ_EMPTY(&hq->q)) {
9657eed50e8Seric 			tree_xpop(&holdqs[evp->type], evp->holdq);
9666dc81a07Seric 			free(hq);
9676dc81a07Seric 		}
9686dc81a07Seric 		evp->holdq = 0;
9696dc81a07Seric 		stat_decrement("scheduler.ramqueue.hold", 1);
9706dc81a07Seric 	}
971a98c8336Seric 	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
9724fe02f32Seric 		TAILQ_REMOVE(&rq->q_pending, evp, entry);
973a98c8336Seric 		SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
974a98c8336Seric 	}
975d17ec9a9Seric 
9764fe02f32Seric 	TAILQ_INSERT_TAIL(q, evp, entry);
977d17ec9a9Seric 	evp->state = RQ_EVPSTATE_SCHEDULED;
978010bda1bSeric 	evp->t_scheduled = currtime;
979010bda1bSeric }
980010bda1bSeric 
981299c4efeSeric static int
982010bda1bSeric rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
983010bda1bSeric {
9846dc81a07Seric 	struct rq_holdq	*hq;
985a98c8336Seric 	struct evplist	*evl;
9866dc81a07Seric 
9874fe02f32Seric 	if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
988299c4efeSeric 		return (0);
9894fe02f32Seric 	/*
990d17ec9a9Seric 	 * If envelope is inflight, mark it envelope for removal.
9914fe02f32Seric 	 */
992d17ec9a9Seric 	if (evp->state == RQ_EVPSTATE_INFLIGHT) {
993010bda1bSeric 		evp->flags |= RQ_ENVELOPE_REMOVED;
994d17ec9a9Seric 		return (1);
995d17ec9a9Seric 	}
9964fe02f32Seric 
9976dc81a07Seric 	if (evp->state == RQ_EVPSTATE_HELD) {
9987eed50e8Seric 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
9996dc81a07Seric 		TAILQ_REMOVE(&hq->q, evp, entry);
1000acfdf0daSeric 		hq->count -= 1;
10016dc81a07Seric 		if (TAILQ_EMPTY(&hq->q)) {
10027eed50e8Seric 			tree_xpop(&holdqs[evp->type], evp->holdq);
10036dc81a07Seric 			free(hq);
10046dc81a07Seric 		}
10056dc81a07Seric 		evp->holdq = 0;
10066dc81a07Seric 		stat_decrement("scheduler.ramqueue.hold", 1);
10076dc81a07Seric 	}
10086dc81a07Seric 	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
1009a98c8336Seric 		evl = rq_envelope_list(rq, evp);
1010a98c8336Seric 		TAILQ_REMOVE(evl, evp, entry);
1011a98c8336Seric 		if (evl == &rq->q_pending)
1012a98c8336Seric 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
10134fe02f32Seric 	}
1014d17ec9a9Seric 
1015d17ec9a9Seric 	TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
1016d17ec9a9Seric 	evp->state = RQ_EVPSTATE_SCHEDULED;
1017d17ec9a9Seric 	evp->flags |= RQ_ENVELOPE_REMOVED;
1018d17ec9a9Seric 	evp->t_scheduled = currtime;
1019299c4efeSeric 
1020299c4efeSeric 	return (1);
1021010bda1bSeric }
1022010bda1bSeric 
102335e161d3Seric static int
102435e161d3Seric rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
102535e161d3Seric {
10266dc81a07Seric 	struct rq_holdq	*hq;
1027a98c8336Seric 	struct evplist	*evl;
10286dc81a07Seric 
102935e161d3Seric 	if (evp->flags & RQ_ENVELOPE_SUSPEND)
103035e161d3Seric 		return (0);
103135e161d3Seric 
10326dc81a07Seric 	if (evp->state == RQ_EVPSTATE_HELD) {
10337eed50e8Seric 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
10346dc81a07Seric 		TAILQ_REMOVE(&hq->q, evp, entry);
1035acfdf0daSeric 		hq->count -= 1;
10366dc81a07Seric 		if (TAILQ_EMPTY(&hq->q)) {
10377eed50e8Seric 			tree_xpop(&holdqs[evp->type], evp->holdq);
10386dc81a07Seric 			free(hq);
10396dc81a07Seric 		}
10406dc81a07Seric 		evp->holdq = 0;
10416dc81a07Seric 		evp->state = RQ_EVPSTATE_PENDING;
10426dc81a07Seric 		stat_decrement("scheduler.ramqueue.hold", 1);
10436dc81a07Seric 	}
10446dc81a07Seric 	else if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1045a98c8336Seric 		evl = rq_envelope_list(rq, evp);
1046a98c8336Seric 		TAILQ_REMOVE(evl, evp, entry);
1047a98c8336Seric 		if (evl == &rq->q_pending)
1048a98c8336Seric 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1049d17ec9a9Seric 	}
105035e161d3Seric 
105135e161d3Seric 	evp->flags |= RQ_ENVELOPE_SUSPEND;
105235e161d3Seric 
105335e161d3Seric 	return (1);
105435e161d3Seric }
105535e161d3Seric 
105635e161d3Seric static int
105735e161d3Seric rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
105835e161d3Seric {
1059a98c8336Seric 	struct evplist	*evl;
1060a98c8336Seric 
106135e161d3Seric 	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
106235e161d3Seric 		return (0);
106335e161d3Seric 
1064a98c8336Seric 	if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1065a98c8336Seric 		evl = rq_envelope_list(rq, evp);
1066a98c8336Seric 		if (evl == &rq->q_pending)
1067a98c8336Seric 			sorted_insert(rq, evp);
1068a98c8336Seric 		else
1069a98c8336Seric 			TAILQ_INSERT_TAIL(evl, evp, entry);
1070a98c8336Seric 	}
107135e161d3Seric 
107235e161d3Seric 	evp->flags &= ~RQ_ENVELOPE_SUSPEND;
1073d17ec9a9Seric 
107435e161d3Seric 	return (1);
107535e161d3Seric }
107635e161d3Seric 
1077010bda1bSeric static void
1078010bda1bSeric rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
1079010bda1bSeric {
1080010bda1bSeric 	tree_xpop(&evp->message->envelopes, evp->evpid);
1081010bda1bSeric 	if (tree_empty(&evp->message->envelopes)) {
1082010bda1bSeric 		tree_xpop(&rq->messages, evp->message->msgid);
1083010bda1bSeric 		free(evp->message);
1084e4632cf4Sgilles 		stat_decrement("scheduler.ramqueue.message", 1);
108566bc57beSeric 	}
108666bc57beSeric 
1087010bda1bSeric 	free(evp);
10884fe02f32Seric 	rq->evpcount--;
1089e4632cf4Sgilles 	stat_decrement("scheduler.ramqueue.envelope", 1);
109066bc57beSeric }
109166bc57beSeric 
109266bc57beSeric static const char *
1093010bda1bSeric rq_envelope_to_text(struct rq_envelope *e)
109466bc57beSeric {
109566bc57beSeric 	static char	buf[256];
109666bc57beSeric 	char		t[64];
109766bc57beSeric 
10988904a2d6Sgilles 	(void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);
109966bc57beSeric 
110066bc57beSeric 	if (e->type == D_BOUNCE)
11018904a2d6Sgilles 		(void)strlcat(buf, "bounce", sizeof buf);
110266bc57beSeric 	else if (e->type == D_MDA)
11038904a2d6Sgilles 		(void)strlcat(buf, "mda", sizeof buf);
110466bc57beSeric 	else if (e->type == D_MTA)
11058904a2d6Sgilles 		(void)strlcat(buf, "mta", sizeof buf);
110666bc57beSeric 
11078904a2d6Sgilles 	(void)snprintf(t, sizeof t, ",expire=%s",
11084fe02f32Seric 	    duration_to_text(e->expire - currtime));
11098904a2d6Sgilles 	(void)strlcat(buf, t, sizeof buf);
111066bc57beSeric 
1111d17ec9a9Seric 
1112d17ec9a9Seric 	switch (e->state) {
1113d17ec9a9Seric 	case RQ_EVPSTATE_PENDING:
11148904a2d6Sgilles 		(void)snprintf(t, sizeof t, ",pending=%s",
1115010bda1bSeric 		    duration_to_text(e->sched - currtime));
11168904a2d6Sgilles 		(void)strlcat(buf, t, sizeof buf);
1117d17ec9a9Seric 		break;
1118d17ec9a9Seric 
1119d17ec9a9Seric 	case RQ_EVPSTATE_SCHEDULED:
11208904a2d6Sgilles 		(void)snprintf(t, sizeof t, ",scheduled=%s",
1121010bda1bSeric 		    duration_to_text(currtime - e->t_scheduled));
11228904a2d6Sgilles 		(void)strlcat(buf, t, sizeof buf);
1123d17ec9a9Seric 		break;
1124d17ec9a9Seric 
1125d17ec9a9Seric 	case RQ_EVPSTATE_INFLIGHT:
11268904a2d6Sgilles 		(void)snprintf(t, sizeof t, ",inflight=%s",
1127010bda1bSeric 		    duration_to_text(currtime - e->t_inflight));
11288904a2d6Sgilles 		(void)strlcat(buf, t, sizeof buf);
1129d17ec9a9Seric 		break;
1130d17ec9a9Seric 
11316dc81a07Seric 	case RQ_EVPSTATE_HELD:
11328904a2d6Sgilles 		(void)snprintf(t, sizeof t, ",held=%s",
11336dc81a07Seric 		    duration_to_text(currtime - e->t_inflight));
11348904a2d6Sgilles 		(void)strlcat(buf, t, sizeof buf);
11356dc81a07Seric 		break;
1136d17ec9a9Seric 	default:
1137ff01b044Seric 		fatalx("%016" PRIx64 " bad state %d", e->evpid, e->state);
1138010bda1bSeric 	}
1139d17ec9a9Seric 
1140010bda1bSeric 	if (e->flags & RQ_ENVELOPE_REMOVED)
11418904a2d6Sgilles 		(void)strlcat(buf, ",removed", sizeof buf);
114266bc57beSeric 	if (e->flags & RQ_ENVELOPE_EXPIRED)
11438904a2d6Sgilles 		(void)strlcat(buf, ",expired", sizeof buf);
114435e161d3Seric 	if (e->flags & RQ_ENVELOPE_SUSPEND)
11458904a2d6Sgilles 		(void)strlcat(buf, ",suspended", sizeof buf);
114666bc57beSeric 
11478904a2d6Sgilles 	(void)strlcat(buf, "]", sizeof buf);
114866bc57beSeric 
114966bc57beSeric 	return (buf);
115066bc57beSeric }
115166bc57beSeric 
115266bc57beSeric static void
1153010bda1bSeric rq_queue_dump(struct rq_queue *rq, const char * name)
115466bc57beSeric {
115566bc57beSeric 	struct rq_message	*message;
115666bc57beSeric 	struct rq_envelope	*envelope;
115766bc57beSeric 	void			*i, *j;
115866bc57beSeric 	uint64_t		 id;
115966bc57beSeric 
116082614934Seric 	log_debug("debug: /--- ramqueue: %s", name);
116166bc57beSeric 
116266bc57beSeric 	i = NULL;
116366bc57beSeric 	while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
116482614934Seric 		log_debug("debug: | msg:%08" PRIx32, message->msgid);
116566bc57beSeric 		j = NULL;
116666bc57beSeric 		while ((tree_iter(&message->envelopes, &j, &id,
116766bc57beSeric 		    (void*)&envelope)))
116882614934Seric 			log_debug("debug: |   %s",
1169010bda1bSeric 			    rq_envelope_to_text(envelope));
117066bc57beSeric 	}
117182614934Seric 	log_debug("debug: \\---");
1172f6a38d58Sgilles }
1173a98c8336Seric 
1174a98c8336Seric static int
1175a98c8336Seric rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2)
1176a98c8336Seric {
1177a98c8336Seric 	time_t	ref1, ref2;
1178a98c8336Seric 
1179a98c8336Seric 	ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire;
1180a98c8336Seric 	ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire;
1181a98c8336Seric 	if (ref1 != ref2)
1182a98c8336Seric 		return (ref1 < ref2) ? -1 : 1;
1183a98c8336Seric 
1184a98c8336Seric 	if (e1->evpid != e2->evpid)
1185a98c8336Seric 		return (e1->evpid < e2->evpid) ? -1 : 1;
1186a98c8336Seric 
1187a98c8336Seric 	return 0;
1188a98c8336Seric }
1189a98c8336Seric 
1190a98c8336Seric SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
1191