xref: /openbsd-src/usr.sbin/smtpd/scheduler_ramqueue.c (revision 50b7afb2c2c0993b0894d4e34bf857cb13ed9c80)
1 /*	$OpenBSD: scheduler_ramqueue.c,v 1.40 2014/07/10 14:45:02 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 #include <sys/types.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/socket.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <event.h>
28 #include <fcntl.h>
29 #include <imsg.h>
30 #include <inttypes.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <time.h>
35 
36 #include "smtpd.h"
37 #include "log.h"
38 
39 TAILQ_HEAD(evplist, rq_envelope);
40 
41 struct rq_message {
42 	uint32_t		 msgid;
43 	struct tree		 envelopes;
44 };
45 
46 struct rq_envelope {
47 	TAILQ_ENTRY(rq_envelope) entry;
48 	SPLAY_ENTRY(rq_envelope) t_entry;
49 
50 	uint64_t		 evpid;
51 	uint64_t		 holdq;
52 	enum delivery_type	 type;
53 
54 #define	RQ_EVPSTATE_PENDING	 0
55 #define	RQ_EVPSTATE_SCHEDULED	 1
56 #define	RQ_EVPSTATE_INFLIGHT	 2
57 #define	RQ_EVPSTATE_HELD	 3
58 	uint8_t			 state;
59 
60 #define	RQ_ENVELOPE_EXPIRED	 0x01
61 #define	RQ_ENVELOPE_REMOVED	 0x02
62 #define	RQ_ENVELOPE_SUSPEND	 0x04
63 #define	RQ_ENVELOPE_UPDATE	 0x08
64 #define	RQ_ENVELOPE_OVERFLOW	 0x10
65 	uint8_t			 flags;
66 
67 	time_t			 ctime;
68 	time_t			 sched;
69 	time_t			 expire;
70 
71 	struct rq_message	*message;
72 
73 	time_t			 t_inflight;
74 	time_t			 t_scheduled;
75 };
76 
77 struct rq_holdq {
78 	struct evplist		 q;
79 	size_t			 count;
80 };
81 
82 struct rq_queue {
83 	size_t			 evpcount;
84 	struct tree		 messages;
85 	SPLAY_HEAD(prioqtree, rq_envelope)	q_priotree;
86 
87 	struct evplist		 q_pending;
88 	struct evplist		 q_inflight;
89 
90 	struct evplist		 q_mta;
91 	struct evplist		 q_mda;
92 	struct evplist		 q_bounce;
93 	struct evplist		 q_update;
94 	struct evplist		 q_expired;
95 	struct evplist		 q_removed;
96 };
97 
98 static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *);
99 
100 SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
101 static int scheduler_ram_init(const char *);
102 static int scheduler_ram_insert(struct scheduler_info *);
103 static size_t scheduler_ram_commit(uint32_t);
104 static size_t scheduler_ram_rollback(uint32_t);
105 static int scheduler_ram_update(struct scheduler_info *);
106 static int scheduler_ram_delete(uint64_t);
107 static int scheduler_ram_hold(uint64_t, uint64_t);
108 static int scheduler_ram_release(int, uint64_t, int);
109 static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *);
110 static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t);
111 static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t);
112 static int scheduler_ram_schedule(uint64_t);
113 static int scheduler_ram_remove(uint64_t);
114 static int scheduler_ram_suspend(uint64_t);
115 static int scheduler_ram_resume(uint64_t);
116 
117 static void sorted_insert(struct rq_queue *, struct rq_envelope *);
118 
119 static void rq_queue_init(struct rq_queue *);
120 static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
121 static void rq_queue_dump(struct rq_queue *, const char *);
122 static void rq_queue_schedule(struct rq_queue *rq);
123 static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *);
124 static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *);
125 static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *);
126 static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *);
127 static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *);
128 static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
129 static const char *rq_envelope_to_text(struct rq_envelope *);
130 
131 struct scheduler_backend scheduler_backend_ramqueue = {
132 	scheduler_ram_init,
133 
134 	scheduler_ram_insert,
135 	scheduler_ram_commit,
136 	scheduler_ram_rollback,
137 
138 	scheduler_ram_update,
139 	scheduler_ram_delete,
140 	scheduler_ram_hold,
141 	scheduler_ram_release,
142 
143 	scheduler_ram_batch,
144 
145 	scheduler_ram_messages,
146 	scheduler_ram_envelopes,
147 	scheduler_ram_schedule,
148 	scheduler_ram_remove,
149 	scheduler_ram_suspend,
150 	scheduler_ram_resume,
151 };
152 
153 static struct rq_queue	ramqueue;
154 static struct tree	updates;
155 static struct tree	holdqs[3]; /* delivery type */
156 
157 static time_t		currtime;
158 
159 #define BACKOFF_TRANSFER	400
160 #define BACKOFF_DELIVERY	10
161 #define BACKOFF_OVERFLOW	3
162 
163 static time_t
164 scheduler_backoff(time_t t0, time_t base, uint32_t step)
165 {
166 	return (t0 + base * step * step);
167 }
168 
169 static time_t
170 scheduler_next(time_t t0, time_t base, uint32_t step)
171 {
172 	time_t t;
173 
174 	/* XXX be more efficient */
175 	while ((t = scheduler_backoff(t0, base, step)) <= currtime)
176 		step++;
177 
178 	return (t);
179 }
180 
181 static int
182 scheduler_ram_init(const char *arg)
183 {
184 	rq_queue_init(&ramqueue);
185 	tree_init(&updates);
186 	tree_init(&holdqs[D_MDA]);
187 	tree_init(&holdqs[D_MTA]);
188 	tree_init(&holdqs[D_BOUNCE]);
189 
190 	return (1);
191 }
192 
193 static int
194 scheduler_ram_insert(struct scheduler_info *si)
195 {
196 	struct rq_queue		*update;
197 	struct rq_message	*message;
198 	struct rq_envelope	*envelope;
199 	uint32_t		 msgid;
200 
201 	currtime = time(NULL);
202 
203 	msgid = evpid_to_msgid(si->evpid);
204 
205 	/* find/prepare a ramqueue update */
206 	if ((update = tree_get(&updates, msgid)) == NULL) {
207 		update = xcalloc(1, sizeof *update, "scheduler_insert");
208 		stat_increment("scheduler.ramqueue.update", 1);
209 		rq_queue_init(update);
210 		tree_xset(&updates, msgid, update);
211 	}
212 
213 	/* find/prepare the msgtree message in ramqueue update */
214 	if ((message = tree_get(&update->messages, msgid)) == NULL) {
215 		message = xcalloc(1, sizeof *message, "scheduler_insert");
216 		message->msgid = msgid;
217 		tree_init(&message->envelopes);
218 		tree_xset(&update->messages, msgid, message);
219 		stat_increment("scheduler.ramqueue.message", 1);
220 	}
221 
222 	/* create envelope in ramqueue message */
223 	envelope = xcalloc(1, sizeof *envelope, "scheduler_insert");
224 	envelope->evpid = si->evpid;
225 	envelope->type = si->type;
226 	envelope->message = message;
227 	envelope->ctime = si->creation;
228 	envelope->expire = si->creation + si->expire;
229 	envelope->sched = scheduler_backoff(si->creation,
230 	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
231 	tree_xset(&message->envelopes, envelope->evpid, envelope);
232 
233 	update->evpcount++;
234 	stat_increment("scheduler.ramqueue.envelope", 1);
235 
236 	envelope->state = RQ_EVPSTATE_PENDING;
237 	TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry);
238 
239 	si->nexttry = envelope->sched;
240 
241 	return (1);
242 }
243 
244 static size_t
245 scheduler_ram_commit(uint32_t msgid)
246 {
247 	struct rq_queue	*update;
248 	size_t		 r;
249 
250 	currtime = time(NULL);
251 
252 	update = tree_xpop(&updates, msgid);
253 	r = update->evpcount;
254 
255 	if (verbose & TRACE_SCHEDULER)
256 		rq_queue_dump(update, "update to commit");
257 
258 	rq_queue_merge(&ramqueue, update);
259 
260 	if (verbose & TRACE_SCHEDULER)
261 		rq_queue_dump(&ramqueue, "resulting queue");
262 
263 	rq_queue_schedule(&ramqueue);
264 
265 	free(update);
266 	stat_decrement("scheduler.ramqueue.update", 1);
267 
268 	return (r);
269 }
270 
271 static size_t
272 scheduler_ram_rollback(uint32_t msgid)
273 {
274 	struct rq_queue		*update;
275 	struct rq_envelope	*evp;
276 	size_t			 r;
277 
278 	currtime = time(NULL);
279 
280 	if ((update = tree_pop(&updates, msgid)) == NULL)
281 		return (0);
282 	r = update->evpcount;
283 
284 	while ((evp = TAILQ_FIRST(&update->q_pending))) {
285 		TAILQ_REMOVE(&update->q_pending, evp, entry);
286 		rq_envelope_delete(update, evp);
287 	}
288 
289 	free(update);
290 	stat_decrement("scheduler.ramqueue.update", 1);
291 
292 	return (r);
293 }
294 
295 static int
296 scheduler_ram_update(struct scheduler_info *si)
297 {
298 	struct rq_message	*msg;
299 	struct rq_envelope	*evp;
300 	uint32_t		 msgid;
301 
302 	currtime = time(NULL);
303 
304 	msgid = evpid_to_msgid(si->evpid);
305 	msg = tree_xget(&ramqueue.messages, msgid);
306 	evp = tree_xget(&msg->envelopes, si->evpid);
307 
308 	/* it *must* be in-flight */
309 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
310 		errx(1, "evp:%016" PRIx64 " not in-flight", si->evpid);
311 
312 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
313 
314 	/*
315 	 * If the envelope was removed while inflight,  schedule it for
316 	 * removal immediately.
317 	 */
318 	if (evp->flags & RQ_ENVELOPE_REMOVED) {
319 		TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry);
320 		evp->state = RQ_EVPSTATE_SCHEDULED;
321 		evp->t_scheduled = currtime;
322 		return (1);
323 	}
324 
325 	evp->sched = scheduler_next(evp->ctime,
326 	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
327 
328 	evp->state = RQ_EVPSTATE_PENDING;
329 	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
330 		sorted_insert(&ramqueue, evp);
331 
332 	si->nexttry = evp->sched;
333 
334 	return (1);
335 }
336 
337 static int
338 scheduler_ram_delete(uint64_t evpid)
339 {
340 	struct rq_message	*msg;
341 	struct rq_envelope	*evp;
342 	uint32_t		 msgid;
343 
344 	currtime = time(NULL);
345 
346 	msgid = evpid_to_msgid(evpid);
347 	msg = tree_xget(&ramqueue.messages, msgid);
348 	evp = tree_xget(&msg->envelopes, evpid);
349 
350 	/* it *must* be in-flight */
351 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
352 		errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
353 
354 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
355 
356 	rq_envelope_delete(&ramqueue, evp);
357 
358 	return (1);
359 }
360 
361 #define HOLDQ_MAXSIZE	1000
362 
363 static int
364 scheduler_ram_hold(uint64_t evpid, uint64_t holdq)
365 {
366 	struct rq_holdq		*hq;
367 	struct rq_message	*msg;
368 	struct rq_envelope	*evp;
369 	uint32_t		 msgid;
370 
371 	currtime = time(NULL);
372 
373 	msgid = evpid_to_msgid(evpid);
374 	msg = tree_xget(&ramqueue.messages, msgid);
375 	evp = tree_xget(&msg->envelopes, evpid);
376 
377 	/* it *must* be in-flight */
378 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
379 		errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
380 
381 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
382 
383 	/* If the envelope is suspended, just mark it as pending */
384 	if (evp->flags & RQ_ENVELOPE_SUSPEND) {
385 		evp->state = RQ_EVPSTATE_PENDING;
386 		return (0);
387 	}
388 
389 	hq = tree_get(&holdqs[evp->type], holdq);
390 	if (hq == NULL) {
391 		hq = xcalloc(1, sizeof(*hq), "scheduler_hold");
392 		TAILQ_INIT(&hq->q);
393 		tree_xset(&holdqs[evp->type], holdq, hq);
394 		stat_increment("scheduler.ramqueue.holdq", 1);
395 	}
396 
397 	/* If the holdq is full, just "tempfail" the envelope */
398 	if (hq->count >= HOLDQ_MAXSIZE) {
399 		evp->state = RQ_EVPSTATE_PENDING;
400 		evp->flags |= RQ_ENVELOPE_UPDATE;
401 		evp->flags |= RQ_ENVELOPE_OVERFLOW;
402 		sorted_insert(&ramqueue, evp);
403 		stat_increment("scheduler.ramqueue.hold-overflow", 1);
404 		return (0);
405 	}
406 
407 	evp->state = RQ_EVPSTATE_HELD;
408 	evp->holdq = holdq;
409 	/* This is an optimization: upon release, the envelopes will be
410 	 * inserted in the pending queue from the first element to the last.
411 	 * Since elements already in the queue were received first, they
412 	 * were scheduled first, so they will be reinserted before the
413 	 * current element.
414 	 */
415 	TAILQ_INSERT_HEAD(&hq->q, evp, entry);
416 	hq->count += 1;
417 	stat_increment("scheduler.ramqueue.hold", 1);
418 
419 	return (1);
420 }
421 
422 static int
423 scheduler_ram_release(int type, uint64_t holdq, int n)
424 {
425 	struct rq_holdq		*hq;
426 	struct rq_envelope	*evp;
427 	int			 i, update;
428 
429 	currtime = time(NULL);
430 
431 	hq = tree_get(&holdqs[type], holdq);
432 	if (hq == NULL)
433 		return (0);
434 
435 	if (n == -1) {
436 		n = 0;
437 		update = 1;
438 	}
439 	else
440 		update = 0;
441 
442 	for (i = 0; n == 0 || i < n; i++) {
443 		evp = TAILQ_FIRST(&hq->q);
444 		if (evp == NULL)
445 			break;
446 
447 		TAILQ_REMOVE(&hq->q, evp, entry);
448 		hq->count -= 1;
449 		evp->holdq = 0;
450 
451 		/* When released, all envelopes are put in the pending queue
452 		 * and will be rescheduled immediately.  As an optimization,
453 		 * we could just schedule them directly.
454 		 */
455 		evp->state = RQ_EVPSTATE_PENDING;
456 		if (update)
457 			evp->flags |= RQ_ENVELOPE_UPDATE;
458 		sorted_insert(&ramqueue, evp);
459 	}
460 
461 	if (TAILQ_EMPTY(&hq->q)) {
462 		tree_xpop(&holdqs[type], holdq);
463 		free(hq);
464 		stat_decrement("scheduler.ramqueue.holdq", 1);
465 	}
466 	stat_decrement("scheduler.ramqueue.hold", i);
467 
468 	return (i);
469 }
470 
471 static int
472 scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types)
473 {
474 	struct rq_envelope	*evp;
475 	size_t			 i, n;
476 	time_t			 t;
477 
478 	currtime = time(NULL);
479 
480 	rq_queue_schedule(&ramqueue);
481 	if (verbose & TRACE_SCHEDULER)
482 		rq_queue_dump(&ramqueue, "scheduler_ram_batch()");
483 
484 	i = 0;
485 	n = 0;
486 
487 	for (;;) {
488 
489 		if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) {
490 			TAILQ_REMOVE(&ramqueue.q_removed, evp, entry);
491 			types[i] = SCHED_REMOVE;
492 			evpids[i] = evp->evpid;
493 			rq_envelope_delete(&ramqueue, evp);
494 
495 			if (++i == *count)
496 				break;
497 		}
498 
499 		if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) {
500 			TAILQ_REMOVE(&ramqueue.q_expired, evp, entry);
501 			types[i] = SCHED_EXPIRE;
502 			evpids[i] = evp->evpid;
503 			rq_envelope_delete(&ramqueue, evp);
504 
505 			if (++i == *count)
506 				break;
507 		}
508 
509 		if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) {
510 			TAILQ_REMOVE(&ramqueue.q_update, evp, entry);
511 			types[i] = SCHED_UPDATE;
512 			evpids[i] = evp->evpid;
513 
514 			if (evp->flags & RQ_ENVELOPE_OVERFLOW)
515 				t = BACKOFF_OVERFLOW;
516 			else if (evp->type == D_MTA)
517 				t = BACKOFF_TRANSFER;
518 			else
519 				t = BACKOFF_DELIVERY;
520 
521 			evp->sched = scheduler_next(evp->ctime, t, 0);
522 			evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW);
523 			evp->state = RQ_EVPSTATE_PENDING;
524 			if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
525 				sorted_insert(&ramqueue, evp);
526 
527 			if (++i == *count)
528 				break;
529 		}
530 
531 		if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) {
532 			TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry);
533 			types[i] = SCHED_BOUNCE;
534 			evpids[i] = evp->evpid;
535 
536 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
537 			evp->state = RQ_EVPSTATE_INFLIGHT;
538 			evp->t_inflight = currtime;
539 
540 			if (++i == *count)
541 				break;
542 		}
543 
544 		if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) {
545 			TAILQ_REMOVE(&ramqueue.q_mda, evp, entry);
546 			types[i] = SCHED_MDA;
547 			evpids[i] = evp->evpid;
548 
549 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
550 			evp->state = RQ_EVPSTATE_INFLIGHT;
551 			evp->t_inflight = currtime;
552 
553 			if (++i == *count)
554 				break;
555 		}
556 
557 		if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) {
558 			TAILQ_REMOVE(&ramqueue.q_mta, evp, entry);
559 			types[i] = SCHED_MTA;
560 			evpids[i] = evp->evpid;
561 
562 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
563 			evp->state = RQ_EVPSTATE_INFLIGHT;
564 			evp->t_inflight = currtime;
565 
566 			if (++i == *count)
567 				break;
568 		}
569 
570 		/* nothing seen this round */
571 		if (i == n)
572 			break;
573 
574 		n = i;
575 	}
576 
577 	if (i) {
578 		*count = i;
579 		return (1);
580 	}
581 
582 	if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
583 		if (evp->sched < evp->expire)
584 			t = evp->sched;
585 		else
586 			t = evp->expire;
587 		*delay = (t < currtime) ? 0 : (t - currtime);
588 	}
589 	else
590 		*delay = -1;
591 
592 	return (0);
593 }
594 
595 static size_t
596 scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size)
597 {
598 	uint64_t id;
599 	size_t	 n;
600 	void	*i;
601 
602 	for (n = 0, i = NULL; n < size; n++) {
603 		if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
604 			break;
605 		dst[n] = id;
606 	}
607 
608 	return (n);
609 }
610 
611 static size_t
612 scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size)
613 {
614 	struct rq_message	*msg;
615 	struct rq_envelope	*evp;
616 	void			*i;
617 	size_t			 n;
618 
619 	if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
620 		return (0);
621 
622 	for (n = 0, i = NULL; n < size; ) {
623 
624 		if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
625 		    (void**)&evp) == 0)
626 			break;
627 
628 		if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
629 			continue;
630 
631 		dst[n].evpid = evp->evpid;
632 		dst[n].flags = 0;
633 		dst[n].retry = 0;
634 		dst[n].time = 0;
635 
636 		if (evp->state == RQ_EVPSTATE_PENDING) {
637 			dst[n].time = evp->sched;
638 			dst[n].flags = EF_PENDING;
639 		}
640 		else if (evp->state == RQ_EVPSTATE_SCHEDULED) {
641 			dst[n].time = evp->t_scheduled;
642 			dst[n].flags = EF_PENDING;
643 		}
644 		else if (evp->state == RQ_EVPSTATE_INFLIGHT) {
645 			dst[n].time = evp->t_inflight;
646 			dst[n].flags = EF_INFLIGHT;
647 		}
648 		else if (evp->state == RQ_EVPSTATE_HELD) {
649 			/* same as scheduled */
650 			dst[n].time = evp->t_scheduled;
651 			dst[n].flags = EF_PENDING;
652 			dst[n].flags |= EF_HOLD;
653 		}
654 		if (evp->flags & RQ_ENVELOPE_SUSPEND)
655 			dst[n].flags |= EF_SUSPEND;
656 
657 		n++;
658 	}
659 
660 	return (n);
661 }
662 
663 static int
664 scheduler_ram_schedule(uint64_t evpid)
665 {
666 	struct rq_message	*msg;
667 	struct rq_envelope	*evp;
668 	uint32_t		 msgid;
669 	void			*i;
670 	int			 r;
671 
672 	currtime = time(NULL);
673 
674 	if (evpid > 0xffffffff) {
675 		msgid = evpid_to_msgid(evpid);
676 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
677 			return (0);
678 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
679 			return (0);
680 		if (evp->state == RQ_EVPSTATE_INFLIGHT)
681 			return (0);
682 		rq_envelope_schedule(&ramqueue, evp);
683 		return (1);
684 	}
685 	else {
686 		msgid = evpid;
687 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
688 			return (0);
689 		i = NULL;
690 		r = 0;
691 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) {
692 			if (evp->state == RQ_EVPSTATE_INFLIGHT)
693 				continue;
694 			rq_envelope_schedule(&ramqueue, evp);
695 			r++;
696 		}
697 		return (r);
698 	}
699 }
700 
701 static int
702 scheduler_ram_remove(uint64_t evpid)
703 {
704 	struct rq_message	*msg;
705 	struct rq_envelope	*evp;
706 	uint32_t		 msgid;
707 	void			*i;
708 	int			 r;
709 
710 	currtime = time(NULL);
711 
712 	if (evpid > 0xffffffff) {
713 		msgid = evpid_to_msgid(evpid);
714 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
715 			return (0);
716 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
717 			return (0);
718 		if (rq_envelope_remove(&ramqueue, evp))
719 			return (1);
720 		return (0);
721 	}
722 	else {
723 		msgid = evpid;
724 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
725 			return (0);
726 		i = NULL;
727 		r = 0;
728 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
729 			if (rq_envelope_remove(&ramqueue, evp))
730 				r++;
731 		return (r);
732 	}
733 }
734 
735 static int
736 scheduler_ram_suspend(uint64_t evpid)
737 {
738 	struct rq_message	*msg;
739 	struct rq_envelope	*evp;
740 	uint32_t		 msgid;
741 	void			*i;
742 	int			 r;
743 
744 	currtime = time(NULL);
745 
746 	if (evpid > 0xffffffff) {
747 		msgid = evpid_to_msgid(evpid);
748 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
749 			return (0);
750 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
751 			return (0);
752 		if (rq_envelope_suspend(&ramqueue, evp))
753 			return (1);
754 		return (0);
755 	}
756 	else {
757 		msgid = evpid;
758 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
759 			return (0);
760 		i = NULL;
761 		r = 0;
762 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
763 			if (rq_envelope_suspend(&ramqueue, evp))
764 				r++;
765 		return (r);
766 	}
767 }
768 
769 static int
770 scheduler_ram_resume(uint64_t evpid)
771 {
772 	struct rq_message	*msg;
773 	struct rq_envelope	*evp;
774 	uint32_t		 msgid;
775 	void			*i;
776 	int			 r;
777 
778 	currtime = time(NULL);
779 
780 	if (evpid > 0xffffffff) {
781 		msgid = evpid_to_msgid(evpid);
782 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
783 			return (0);
784 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
785 			return (0);
786 		if (rq_envelope_resume(&ramqueue, evp))
787 			return (1);
788 		return (0);
789 	}
790 	else {
791 		msgid = evpid;
792 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
793 			return (0);
794 		i = NULL;
795 		r = 0;
796 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
797 			if (rq_envelope_resume(&ramqueue, evp))
798 				r++;
799 		return (r);
800 	}
801 }
802 
803 static void
804 sorted_insert(struct rq_queue *rq, struct rq_envelope *evp)
805 {
806 	struct rq_envelope	*evp2;
807 
808 	SPLAY_INSERT(prioqtree, &rq->q_priotree, evp);
809 	evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp);
810 	if (evp2)
811 		TAILQ_INSERT_BEFORE(evp2, evp, entry);
812 	else
813 		TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry);
814 }
815 
816 static void
817 rq_queue_init(struct rq_queue *rq)
818 {
819 	memset(rq, 0, sizeof *rq);
820 	tree_init(&rq->messages);
821 	TAILQ_INIT(&rq->q_pending);
822 	TAILQ_INIT(&rq->q_inflight);
823 	TAILQ_INIT(&rq->q_mta);
824 	TAILQ_INIT(&rq->q_mda);
825 	TAILQ_INIT(&rq->q_bounce);
826 	TAILQ_INIT(&rq->q_update);
827 	TAILQ_INIT(&rq->q_expired);
828 	TAILQ_INIT(&rq->q_removed);
829 	SPLAY_INIT(&rq->q_priotree);
830 }
831 
832 static void
833 rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
834 {
835 	struct rq_message	*message, *tomessage;
836 	struct rq_envelope	*envelope;
837 	uint64_t		 id;
838 	void			*i;
839 
840 	while (tree_poproot(&update->messages, &id, (void*)&message)) {
841 		if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
842 			/* message does not exist. re-use structure */
843 			tree_xset(&rq->messages, id, message);
844 			continue;
845 		}
846 		/* need to re-link all envelopes before merging them */
847 		i = NULL;
848 		while ((tree_iter(&message->envelopes, &i, &id,
849 		    (void*)&envelope)))
850 			envelope->message = tomessage;
851 		tree_merge(&tomessage->envelopes, &message->envelopes);
852 		free(message);
853 		stat_decrement("scheduler.ramqueue.message", 1);
854 	}
855 
856 	/* Sorted insert in the pending queue */
857 	while ((envelope = TAILQ_FIRST(&update->q_pending))) {
858 		TAILQ_REMOVE(&update->q_pending, envelope, entry);
859 		sorted_insert(rq, envelope);
860 	}
861 
862 	rq->evpcount += update->evpcount;
863 }
864 
865 #define SCHEDULEMAX	1024
866 
867 static void
868 rq_queue_schedule(struct rq_queue *rq)
869 {
870 	struct rq_envelope	*evp;
871 	size_t			 n;
872 
873 	n = 0;
874 	while ((evp = TAILQ_FIRST(&rq->q_pending))) {
875 		if (evp->sched > currtime && evp->expire > currtime)
876 			break;
877 
878 		if (n == SCHEDULEMAX)
879 			break;
880 
881 		if (evp->state != RQ_EVPSTATE_PENDING)
882 			errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid,
883 			    evp->flags);
884 
885 		if (evp->expire <= currtime) {
886 			TAILQ_REMOVE(&rq->q_pending, evp, entry);
887 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
888 			TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
889 			evp->state = RQ_EVPSTATE_SCHEDULED;
890 			evp->flags |= RQ_ENVELOPE_EXPIRED;
891 			evp->t_scheduled = currtime;
892 			continue;
893 		}
894 		rq_envelope_schedule(rq, evp);
895 		n += 1;
896 	}
897 }
898 
899 static struct evplist *
900 rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
901 {
902 	switch (evp->state) {
903 	case RQ_EVPSTATE_PENDING:
904 		return &rq->q_pending;
905 
906 	case RQ_EVPSTATE_SCHEDULED:
907 		if (evp->flags & RQ_ENVELOPE_EXPIRED)
908 			return &rq->q_expired;
909 		if (evp->flags & RQ_ENVELOPE_REMOVED)
910 			return &rq->q_removed;
911 		if (evp->flags & RQ_ENVELOPE_UPDATE)
912 			return &rq->q_update;
913 		if (evp->type == D_MTA)
914 			return &rq->q_mta;
915 		if (evp->type == D_MDA)
916 			return &rq->q_mda;
917 		if (evp->type == D_BOUNCE)
918 			return &rq->q_bounce;
919 		errx(1, "%016" PRIx64 " bad evp type %d", evp->evpid, evp->type);
920 
921 	case RQ_EVPSTATE_INFLIGHT:
922 		return &rq->q_inflight;
923 
924 	case RQ_EVPSTATE_HELD:
925 		return (NULL);
926 	}
927 
928 	errx(1, "%016" PRIx64 " bad state %d", evp->evpid, evp->state);
929 	return (NULL);
930 }
931 
932 static void
933 rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
934 {
935 	struct rq_holdq	*hq;
936 	struct evplist	*q = NULL;
937 
938 	switch (evp->type) {
939 	case D_MTA:
940 		q = &rq->q_mta;
941 		break;
942 	case D_MDA:
943 		q = &rq->q_mda;
944 		break;
945 	case D_BOUNCE:
946 		q = &rq->q_bounce;
947 		break;
948 	}
949 
950 	if (evp->flags & RQ_ENVELOPE_UPDATE)
951 		q = &rq->q_update;
952 
953 	if (evp->state == RQ_EVPSTATE_HELD) {
954 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
955 		TAILQ_REMOVE(&hq->q, evp, entry);
956 		hq->count -= 1;
957 		if (TAILQ_EMPTY(&hq->q)) {
958 			tree_xpop(&holdqs[evp->type], evp->holdq);
959 			free(hq);
960 		}
961 		evp->holdq = 0;
962 		stat_decrement("scheduler.ramqueue.hold", 1);
963 	}
964 	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
965 		TAILQ_REMOVE(&rq->q_pending, evp, entry);
966 		SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
967 	}
968 
969 	TAILQ_INSERT_TAIL(q, evp, entry);
970 	evp->state = RQ_EVPSTATE_SCHEDULED;
971 	evp->t_scheduled = currtime;
972 }
973 
974 static int
975 rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
976 {
977 	struct rq_holdq	*hq;
978 	struct evplist	*evl;
979 
980 	if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
981 		return (0);
982 	/*
983 	 * If envelope is inflight, mark it envelope for removal.
984 	 */
985 	if (evp->state == RQ_EVPSTATE_INFLIGHT) {
986 		evp->flags |= RQ_ENVELOPE_REMOVED;
987 		return (1);
988 	}
989 
990 	if (evp->state == RQ_EVPSTATE_HELD) {
991 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
992 		TAILQ_REMOVE(&hq->q, evp, entry);
993 		hq->count -= 1;
994 		if (TAILQ_EMPTY(&hq->q)) {
995 			tree_xpop(&holdqs[evp->type], evp->holdq);
996 			free(hq);
997 		}
998 		evp->holdq = 0;
999 		stat_decrement("scheduler.ramqueue.hold", 1);
1000 	}
1001 	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
1002 		evl = rq_envelope_list(rq, evp);
1003 		TAILQ_REMOVE(evl, evp, entry);
1004 		if (evl == &rq->q_pending)
1005 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1006 	}
1007 
1008 	TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
1009 	evp->state = RQ_EVPSTATE_SCHEDULED;
1010 	evp->flags |= RQ_ENVELOPE_REMOVED;
1011 	evp->t_scheduled = currtime;
1012 
1013 	return (1);
1014 }
1015 
1016 static int
1017 rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
1018 {
1019 	struct rq_holdq	*hq;
1020 	struct evplist	*evl;
1021 
1022 	if (evp->flags & RQ_ENVELOPE_SUSPEND)
1023 		return (0);
1024 
1025 	if (evp->state == RQ_EVPSTATE_HELD) {
1026 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
1027 		TAILQ_REMOVE(&hq->q, evp, entry);
1028 		hq->count -= 1;
1029 		if (TAILQ_EMPTY(&hq->q)) {
1030 			tree_xpop(&holdqs[evp->type], evp->holdq);
1031 			free(hq);
1032 		}
1033 		evp->holdq = 0;
1034 		evp->state = RQ_EVPSTATE_PENDING;
1035 		stat_decrement("scheduler.ramqueue.hold", 1);
1036 	}
1037 	else if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1038 		evl = rq_envelope_list(rq, evp);
1039 		TAILQ_REMOVE(evl, evp, entry);
1040 		if (evl == &rq->q_pending)
1041 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1042 	}
1043 
1044 	evp->flags |= RQ_ENVELOPE_SUSPEND;
1045 
1046 	return (1);
1047 }
1048 
1049 static int
1050 rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
1051 {
1052 	struct evplist	*evl;
1053 
1054 	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
1055 		return (0);
1056 
1057 	if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1058 		evl = rq_envelope_list(rq, evp);
1059 		if (evl == &rq->q_pending)
1060 			sorted_insert(rq, evp);
1061 		else
1062 			TAILQ_INSERT_TAIL(evl, evp, entry);
1063 	}
1064 
1065 	evp->flags &= ~RQ_ENVELOPE_SUSPEND;
1066 
1067 	return (1);
1068 }
1069 
1070 static void
1071 rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
1072 {
1073 	tree_xpop(&evp->message->envelopes, evp->evpid);
1074 	if (tree_empty(&evp->message->envelopes)) {
1075 		tree_xpop(&rq->messages, evp->message->msgid);
1076 		free(evp->message);
1077 		stat_decrement("scheduler.ramqueue.message", 1);
1078 	}
1079 
1080 	free(evp);
1081 	rq->evpcount--;
1082 	stat_decrement("scheduler.ramqueue.envelope", 1);
1083 }
1084 
1085 static const char *
1086 rq_envelope_to_text(struct rq_envelope *e)
1087 {
1088 	static char	buf[256];
1089 	char		t[64];
1090 
1091 	(void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);
1092 
1093 	if (e->type == D_BOUNCE)
1094 		(void)strlcat(buf, "bounce", sizeof buf);
1095 	else if (e->type == D_MDA)
1096 		(void)strlcat(buf, "mda", sizeof buf);
1097 	else if (e->type == D_MTA)
1098 		(void)strlcat(buf, "mta", sizeof buf);
1099 
1100 	(void)snprintf(t, sizeof t, ",expire=%s",
1101 	    duration_to_text(e->expire - currtime));
1102 	(void)strlcat(buf, t, sizeof buf);
1103 
1104 
1105 	switch (e->state) {
1106 	case RQ_EVPSTATE_PENDING:
1107 		(void)snprintf(t, sizeof t, ",pending=%s",
1108 		    duration_to_text(e->sched - currtime));
1109 		(void)strlcat(buf, t, sizeof buf);
1110 		break;
1111 
1112 	case RQ_EVPSTATE_SCHEDULED:
1113 		(void)snprintf(t, sizeof t, ",scheduled=%s",
1114 		    duration_to_text(currtime - e->t_scheduled));
1115 		(void)strlcat(buf, t, sizeof buf);
1116 		break;
1117 
1118 	case RQ_EVPSTATE_INFLIGHT:
1119 		(void)snprintf(t, sizeof t, ",inflight=%s",
1120 		    duration_to_text(currtime - e->t_inflight));
1121 		(void)strlcat(buf, t, sizeof buf);
1122 		break;
1123 
1124 	case RQ_EVPSTATE_HELD:
1125 		(void)snprintf(t, sizeof t, ",held=%s",
1126 		    duration_to_text(currtime - e->t_inflight));
1127 		(void)strlcat(buf, t, sizeof buf);
1128 		break;
1129 	default:
1130 		errx(1, "%016" PRIx64 " bad state %d", e->evpid, e->state);
1131 	}
1132 
1133 	if (e->flags & RQ_ENVELOPE_REMOVED)
1134 		(void)strlcat(buf, ",removed", sizeof buf);
1135 	if (e->flags & RQ_ENVELOPE_EXPIRED)
1136 		(void)strlcat(buf, ",expired", sizeof buf);
1137 	if (e->flags & RQ_ENVELOPE_SUSPEND)
1138 		(void)strlcat(buf, ",suspended", sizeof buf);
1139 
1140 	(void)strlcat(buf, "]", sizeof buf);
1141 
1142 	return (buf);
1143 }
1144 
1145 static void
1146 rq_queue_dump(struct rq_queue *rq, const char * name)
1147 {
1148 	struct rq_message	*message;
1149 	struct rq_envelope	*envelope;
1150 	void			*i, *j;
1151 	uint64_t		 id;
1152 
1153 	log_debug("debug: /--- ramqueue: %s", name);
1154 
1155 	i = NULL;
1156 	while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
1157 		log_debug("debug: | msg:%08" PRIx32, message->msgid);
1158 		j = NULL;
1159 		while ((tree_iter(&message->envelopes, &j, &id,
1160 		    (void*)&envelope)))
1161 			log_debug("debug: |   %s",
1162 			    rq_envelope_to_text(envelope));
1163 	}
1164 	log_debug("debug: \\---");
1165 }
1166 
1167 static int
1168 rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2)
1169 {
1170 	time_t	ref1, ref2;
1171 
1172 	ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire;
1173 	ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire;
1174 	if (ref1 != ref2)
1175 		return (ref1 < ref2) ? -1 : 1;
1176 
1177 	if (e1->evpid != e2->evpid)
1178 		return (e1->evpid < e2->evpid) ? -1 : 1;
1179 
1180 	return 0;
1181 }
1182 
1183 SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
1184