xref: /openbsd-src/usr.sbin/smtpd/scheduler_ramqueue.c (revision 46035553bfdd96e63c94e32da0210227ec2e3cf1)
1 /*	$OpenBSD: scheduler_ramqueue.c,v 1.45 2018/05/31 21:06:12 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 #include <sys/types.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/socket.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <event.h>
28 #include <fcntl.h>
29 #include <imsg.h>
30 #include <inttypes.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <limits.h>
35 #include <time.h>
36 
37 #include "smtpd.h"
38 #include "log.h"
39 
40 TAILQ_HEAD(evplist, rq_envelope);
41 
42 struct rq_message {
43 	uint32_t		 msgid;
44 	struct tree		 envelopes;
45 };
46 
47 struct rq_envelope {
48 	TAILQ_ENTRY(rq_envelope) entry;
49 	SPLAY_ENTRY(rq_envelope) t_entry;
50 
51 	uint64_t		 evpid;
52 	uint64_t		 holdq;
53 	enum delivery_type	 type;
54 
55 #define	RQ_EVPSTATE_PENDING	 0
56 #define	RQ_EVPSTATE_SCHEDULED	 1
57 #define	RQ_EVPSTATE_INFLIGHT	 2
58 #define	RQ_EVPSTATE_HELD	 3
59 	uint8_t			 state;
60 
61 #define	RQ_ENVELOPE_EXPIRED	 0x01
62 #define	RQ_ENVELOPE_REMOVED	 0x02
63 #define	RQ_ENVELOPE_SUSPEND	 0x04
64 #define	RQ_ENVELOPE_UPDATE	 0x08
65 #define	RQ_ENVELOPE_OVERFLOW	 0x10
66 	uint8_t			 flags;
67 
68 	time_t			 ctime;
69 	time_t			 sched;
70 	time_t			 expire;
71 
72 	struct rq_message	*message;
73 
74 	time_t			 t_inflight;
75 	time_t			 t_scheduled;
76 };
77 
78 struct rq_holdq {
79 	struct evplist		 q;
80 	size_t			 count;
81 };
82 
83 struct rq_queue {
84 	size_t			 evpcount;
85 	struct tree		 messages;
86 	SPLAY_HEAD(prioqtree, rq_envelope)	q_priotree;
87 
88 	struct evplist		 q_pending;
89 	struct evplist		 q_inflight;
90 
91 	struct evplist		 q_mta;
92 	struct evplist		 q_mda;
93 	struct evplist		 q_bounce;
94 	struct evplist		 q_update;
95 	struct evplist		 q_expired;
96 	struct evplist		 q_removed;
97 };
98 
99 static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *);
100 
101 SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
102 static int scheduler_ram_init(const char *);
103 static int scheduler_ram_insert(struct scheduler_info *);
104 static size_t scheduler_ram_commit(uint32_t);
105 static size_t scheduler_ram_rollback(uint32_t);
106 static int scheduler_ram_update(struct scheduler_info *);
107 static int scheduler_ram_delete(uint64_t);
108 static int scheduler_ram_hold(uint64_t, uint64_t);
109 static int scheduler_ram_release(int, uint64_t, int);
110 static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *);
111 static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t);
112 static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t);
113 static int scheduler_ram_schedule(uint64_t);
114 static int scheduler_ram_remove(uint64_t);
115 static int scheduler_ram_suspend(uint64_t);
116 static int scheduler_ram_resume(uint64_t);
117 static int scheduler_ram_query(uint64_t);
118 
119 static void sorted_insert(struct rq_queue *, struct rq_envelope *);
120 
121 static void rq_queue_init(struct rq_queue *);
122 static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
123 static void rq_queue_dump(struct rq_queue *, const char *);
124 static void rq_queue_schedule(struct rq_queue *rq);
125 static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *);
126 static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *);
127 static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *);
128 static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *);
129 static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *);
130 static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
131 static const char *rq_envelope_to_text(struct rq_envelope *);
132 
133 struct scheduler_backend scheduler_backend_ramqueue = {
134 	scheduler_ram_init,
135 
136 	scheduler_ram_insert,
137 	scheduler_ram_commit,
138 	scheduler_ram_rollback,
139 
140 	scheduler_ram_update,
141 	scheduler_ram_delete,
142 	scheduler_ram_hold,
143 	scheduler_ram_release,
144 
145 	scheduler_ram_batch,
146 
147 	scheduler_ram_messages,
148 	scheduler_ram_envelopes,
149 	scheduler_ram_schedule,
150 	scheduler_ram_remove,
151 	scheduler_ram_suspend,
152 	scheduler_ram_resume,
153 	scheduler_ram_query,
154 };
155 
156 static struct rq_queue	ramqueue;
157 static struct tree	updates;
158 static struct tree	holdqs[3]; /* delivery type */
159 
160 static time_t		currtime;
161 
162 #define BACKOFF_TRANSFER	400
163 #define BACKOFF_DELIVERY	10
164 #define BACKOFF_OVERFLOW	3
165 
166 static time_t
167 scheduler_backoff(time_t t0, time_t base, uint32_t step)
168 {
169 	return (t0 + base * step * step);
170 }
171 
172 static time_t
173 scheduler_next(time_t t0, time_t base, uint32_t step)
174 {
175 	time_t t;
176 
177 	/* XXX be more efficient */
178 	while ((t = scheduler_backoff(t0, base, step)) <= currtime)
179 		step++;
180 
181 	return (t);
182 }
183 
184 static int
185 scheduler_ram_init(const char *arg)
186 {
187 	rq_queue_init(&ramqueue);
188 	tree_init(&updates);
189 	tree_init(&holdqs[D_MDA]);
190 	tree_init(&holdqs[D_MTA]);
191 	tree_init(&holdqs[D_BOUNCE]);
192 
193 	return (1);
194 }
195 
196 static int
197 scheduler_ram_insert(struct scheduler_info *si)
198 {
199 	struct rq_queue		*update;
200 	struct rq_message	*message;
201 	struct rq_envelope	*envelope;
202 	uint32_t		 msgid;
203 
204 	currtime = time(NULL);
205 
206 	msgid = evpid_to_msgid(si->evpid);
207 
208 	/* find/prepare a ramqueue update */
209 	if ((update = tree_get(&updates, msgid)) == NULL) {
210 		update = xcalloc(1, sizeof *update);
211 		stat_increment("scheduler.ramqueue.update", 1);
212 		rq_queue_init(update);
213 		tree_xset(&updates, msgid, update);
214 	}
215 
216 	/* find/prepare the msgtree message in ramqueue update */
217 	if ((message = tree_get(&update->messages, msgid)) == NULL) {
218 		message = xcalloc(1, sizeof *message);
219 		message->msgid = msgid;
220 		tree_init(&message->envelopes);
221 		tree_xset(&update->messages, msgid, message);
222 		stat_increment("scheduler.ramqueue.message", 1);
223 	}
224 
225 	/* create envelope in ramqueue message */
226 	envelope = xcalloc(1, sizeof *envelope);
227 	envelope->evpid = si->evpid;
228 	envelope->type = si->type;
229 	envelope->message = message;
230 	envelope->ctime = si->creation;
231 	envelope->expire = si->creation + si->ttl;
232 	envelope->sched = scheduler_backoff(si->creation,
233 	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
234 	tree_xset(&message->envelopes, envelope->evpid, envelope);
235 
236 	update->evpcount++;
237 	stat_increment("scheduler.ramqueue.envelope", 1);
238 
239 	envelope->state = RQ_EVPSTATE_PENDING;
240 	TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry);
241 
242 	si->nexttry = envelope->sched;
243 
244 	return (1);
245 }
246 
247 static size_t
248 scheduler_ram_commit(uint32_t msgid)
249 {
250 	struct rq_queue	*update;
251 	size_t		 r;
252 
253 	currtime = time(NULL);
254 
255 	update = tree_xpop(&updates, msgid);
256 	r = update->evpcount;
257 
258 	if (tracing & TRACE_SCHEDULER)
259 		rq_queue_dump(update, "update to commit");
260 
261 	rq_queue_merge(&ramqueue, update);
262 
263 	if (tracing & TRACE_SCHEDULER)
264 		rq_queue_dump(&ramqueue, "resulting queue");
265 
266 	rq_queue_schedule(&ramqueue);
267 
268 	free(update);
269 	stat_decrement("scheduler.ramqueue.update", 1);
270 
271 	return (r);
272 }
273 
274 static size_t
275 scheduler_ram_rollback(uint32_t msgid)
276 {
277 	struct rq_queue		*update;
278 	struct rq_envelope	*evp;
279 	size_t			 r;
280 
281 	currtime = time(NULL);
282 
283 	if ((update = tree_pop(&updates, msgid)) == NULL)
284 		return (0);
285 	r = update->evpcount;
286 
287 	while ((evp = TAILQ_FIRST(&update->q_pending))) {
288 		TAILQ_REMOVE(&update->q_pending, evp, entry);
289 		rq_envelope_delete(update, evp);
290 	}
291 
292 	free(update);
293 	stat_decrement("scheduler.ramqueue.update", 1);
294 
295 	return (r);
296 }
297 
298 static int
299 scheduler_ram_update(struct scheduler_info *si)
300 {
301 	struct rq_message	*msg;
302 	struct rq_envelope	*evp;
303 	uint32_t		 msgid;
304 
305 	currtime = time(NULL);
306 
307 	msgid = evpid_to_msgid(si->evpid);
308 	msg = tree_xget(&ramqueue.messages, msgid);
309 	evp = tree_xget(&msg->envelopes, si->evpid);
310 
311 	/* it *must* be in-flight */
312 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
313 		errx(1, "evp:%016" PRIx64 " not in-flight", si->evpid);
314 
315 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
316 
317 	/*
318 	 * If the envelope was removed while inflight,  schedule it for
319 	 * removal immediately.
320 	 */
321 	if (evp->flags & RQ_ENVELOPE_REMOVED) {
322 		TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry);
323 		evp->state = RQ_EVPSTATE_SCHEDULED;
324 		evp->t_scheduled = currtime;
325 		return (1);
326 	}
327 
328 	evp->sched = scheduler_next(evp->ctime,
329 	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
330 
331 	evp->state = RQ_EVPSTATE_PENDING;
332 	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
333 		sorted_insert(&ramqueue, evp);
334 
335 	si->nexttry = evp->sched;
336 
337 	return (1);
338 }
339 
340 static int
341 scheduler_ram_delete(uint64_t evpid)
342 {
343 	struct rq_message	*msg;
344 	struct rq_envelope	*evp;
345 	uint32_t		 msgid;
346 
347 	currtime = time(NULL);
348 
349 	msgid = evpid_to_msgid(evpid);
350 	msg = tree_xget(&ramqueue.messages, msgid);
351 	evp = tree_xget(&msg->envelopes, evpid);
352 
353 	/* it *must* be in-flight */
354 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
355 		errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
356 
357 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
358 
359 	rq_envelope_delete(&ramqueue, evp);
360 
361 	return (1);
362 }
363 
364 #define HOLDQ_MAXSIZE	1000
365 
366 static int
367 scheduler_ram_hold(uint64_t evpid, uint64_t holdq)
368 {
369 	struct rq_holdq		*hq;
370 	struct rq_message	*msg;
371 	struct rq_envelope	*evp;
372 	uint32_t		 msgid;
373 
374 	currtime = time(NULL);
375 
376 	msgid = evpid_to_msgid(evpid);
377 	msg = tree_xget(&ramqueue.messages, msgid);
378 	evp = tree_xget(&msg->envelopes, evpid);
379 
380 	/* it *must* be in-flight */
381 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
382 		errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
383 
384 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
385 
386 	/* If the envelope is suspended, just mark it as pending */
387 	if (evp->flags & RQ_ENVELOPE_SUSPEND) {
388 		evp->state = RQ_EVPSTATE_PENDING;
389 		return (0);
390 	}
391 
392 	hq = tree_get(&holdqs[evp->type], holdq);
393 	if (hq == NULL) {
394 		hq = xcalloc(1, sizeof(*hq));
395 		TAILQ_INIT(&hq->q);
396 		tree_xset(&holdqs[evp->type], holdq, hq);
397 		stat_increment("scheduler.ramqueue.holdq", 1);
398 	}
399 
400 	/* If the holdq is full, just "tempfail" the envelope */
401 	if (hq->count >= HOLDQ_MAXSIZE) {
402 		evp->state = RQ_EVPSTATE_PENDING;
403 		evp->flags |= RQ_ENVELOPE_UPDATE;
404 		evp->flags |= RQ_ENVELOPE_OVERFLOW;
405 		sorted_insert(&ramqueue, evp);
406 		stat_increment("scheduler.ramqueue.hold-overflow", 1);
407 		return (0);
408 	}
409 
410 	evp->state = RQ_EVPSTATE_HELD;
411 	evp->holdq = holdq;
412 	/* This is an optimization: upon release, the envelopes will be
413 	 * inserted in the pending queue from the first element to the last.
414 	 * Since elements already in the queue were received first, they
415 	 * were scheduled first, so they will be reinserted before the
416 	 * current element.
417 	 */
418 	TAILQ_INSERT_HEAD(&hq->q, evp, entry);
419 	hq->count += 1;
420 	stat_increment("scheduler.ramqueue.hold", 1);
421 
422 	return (1);
423 }
424 
425 static int
426 scheduler_ram_release(int type, uint64_t holdq, int n)
427 {
428 	struct rq_holdq		*hq;
429 	struct rq_envelope	*evp;
430 	int			 i, update;
431 
432 	currtime = time(NULL);
433 
434 	hq = tree_get(&holdqs[type], holdq);
435 	if (hq == NULL)
436 		return (0);
437 
438 	if (n == -1) {
439 		n = 0;
440 		update = 1;
441 	}
442 	else
443 		update = 0;
444 
445 	for (i = 0; n == 0 || i < n; i++) {
446 		evp = TAILQ_FIRST(&hq->q);
447 		if (evp == NULL)
448 			break;
449 
450 		TAILQ_REMOVE(&hq->q, evp, entry);
451 		hq->count -= 1;
452 		evp->holdq = 0;
453 
454 		/* When released, all envelopes are put in the pending queue
455 		 * and will be rescheduled immediately.  As an optimization,
456 		 * we could just schedule them directly.
457 		 */
458 		evp->state = RQ_EVPSTATE_PENDING;
459 		if (update)
460 			evp->flags |= RQ_ENVELOPE_UPDATE;
461 		sorted_insert(&ramqueue, evp);
462 	}
463 
464 	if (TAILQ_EMPTY(&hq->q)) {
465 		tree_xpop(&holdqs[type], holdq);
466 		free(hq);
467 		stat_decrement("scheduler.ramqueue.holdq", 1);
468 	}
469 	stat_decrement("scheduler.ramqueue.hold", i);
470 
471 	return (i);
472 }
473 
474 static int
475 scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types)
476 {
477 	struct rq_envelope	*evp;
478 	size_t			 i, n;
479 	time_t			 t;
480 
481 	currtime = time(NULL);
482 
483 	rq_queue_schedule(&ramqueue);
484 	if (tracing & TRACE_SCHEDULER)
485 		rq_queue_dump(&ramqueue, "scheduler_ram_batch()");
486 
487 	i = 0;
488 	n = 0;
489 
490 	for (;;) {
491 
492 		if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) {
493 			TAILQ_REMOVE(&ramqueue.q_removed, evp, entry);
494 			types[i] = SCHED_REMOVE;
495 			evpids[i] = evp->evpid;
496 			rq_envelope_delete(&ramqueue, evp);
497 
498 			if (++i == *count)
499 				break;
500 		}
501 
502 		if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) {
503 			TAILQ_REMOVE(&ramqueue.q_expired, evp, entry);
504 			types[i] = SCHED_EXPIRE;
505 			evpids[i] = evp->evpid;
506 			rq_envelope_delete(&ramqueue, evp);
507 
508 			if (++i == *count)
509 				break;
510 		}
511 
512 		if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) {
513 			TAILQ_REMOVE(&ramqueue.q_update, evp, entry);
514 			types[i] = SCHED_UPDATE;
515 			evpids[i] = evp->evpid;
516 
517 			if (evp->flags & RQ_ENVELOPE_OVERFLOW)
518 				t = BACKOFF_OVERFLOW;
519 			else if (evp->type == D_MTA)
520 				t = BACKOFF_TRANSFER;
521 			else
522 				t = BACKOFF_DELIVERY;
523 
524 			evp->sched = scheduler_next(evp->ctime, t, 0);
525 			evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW);
526 			evp->state = RQ_EVPSTATE_PENDING;
527 			if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
528 				sorted_insert(&ramqueue, evp);
529 
530 			if (++i == *count)
531 				break;
532 		}
533 
534 		if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) {
535 			TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry);
536 			types[i] = SCHED_BOUNCE;
537 			evpids[i] = evp->evpid;
538 
539 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
540 			evp->state = RQ_EVPSTATE_INFLIGHT;
541 			evp->t_inflight = currtime;
542 
543 			if (++i == *count)
544 				break;
545 		}
546 
547 		if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) {
548 			TAILQ_REMOVE(&ramqueue.q_mda, evp, entry);
549 			types[i] = SCHED_MDA;
550 			evpids[i] = evp->evpid;
551 
552 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
553 			evp->state = RQ_EVPSTATE_INFLIGHT;
554 			evp->t_inflight = currtime;
555 
556 			if (++i == *count)
557 				break;
558 		}
559 
560 		if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) {
561 			TAILQ_REMOVE(&ramqueue.q_mta, evp, entry);
562 			types[i] = SCHED_MTA;
563 			evpids[i] = evp->evpid;
564 
565 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
566 			evp->state = RQ_EVPSTATE_INFLIGHT;
567 			evp->t_inflight = currtime;
568 
569 			if (++i == *count)
570 				break;
571 		}
572 
573 		/* nothing seen this round */
574 		if (i == n)
575 			break;
576 
577 		n = i;
578 	}
579 
580 	if (i) {
581 		*count = i;
582 		return (1);
583 	}
584 
585 	if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
586 		if (evp->sched < evp->expire)
587 			t = evp->sched;
588 		else
589 			t = evp->expire;
590 		*delay = (t < currtime) ? 0 : (t - currtime);
591 	}
592 	else
593 		*delay = -1;
594 
595 	return (0);
596 }
597 
598 static size_t
599 scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size)
600 {
601 	uint64_t id;
602 	size_t	 n;
603 	void	*i;
604 
605 	for (n = 0, i = NULL; n < size; n++) {
606 		if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
607 			break;
608 		dst[n] = id;
609 	}
610 
611 	return (n);
612 }
613 
614 static size_t
615 scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size)
616 {
617 	struct rq_message	*msg;
618 	struct rq_envelope	*evp;
619 	void			*i;
620 	size_t			 n;
621 
622 	if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
623 		return (0);
624 
625 	for (n = 0, i = NULL; n < size; ) {
626 
627 		if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
628 		    (void**)&evp) == 0)
629 			break;
630 
631 		if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
632 			continue;
633 
634 		dst[n].evpid = evp->evpid;
635 		dst[n].flags = 0;
636 		dst[n].retry = 0;
637 		dst[n].time = 0;
638 
639 		if (evp->state == RQ_EVPSTATE_PENDING) {
640 			dst[n].time = evp->sched;
641 			dst[n].flags = EF_PENDING;
642 		}
643 		else if (evp->state == RQ_EVPSTATE_SCHEDULED) {
644 			dst[n].time = evp->t_scheduled;
645 			dst[n].flags = EF_PENDING;
646 		}
647 		else if (evp->state == RQ_EVPSTATE_INFLIGHT) {
648 			dst[n].time = evp->t_inflight;
649 			dst[n].flags = EF_INFLIGHT;
650 		}
651 		else if (evp->state == RQ_EVPSTATE_HELD) {
652 			/* same as scheduled */
653 			dst[n].time = evp->t_scheduled;
654 			dst[n].flags = EF_PENDING;
655 			dst[n].flags |= EF_HOLD;
656 		}
657 		if (evp->flags & RQ_ENVELOPE_SUSPEND)
658 			dst[n].flags |= EF_SUSPEND;
659 
660 		n++;
661 	}
662 
663 	return (n);
664 }
665 
666 static int
667 scheduler_ram_schedule(uint64_t evpid)
668 {
669 	struct rq_message	*msg;
670 	struct rq_envelope	*evp;
671 	uint32_t		 msgid;
672 	void			*i;
673 	int			 r;
674 
675 	currtime = time(NULL);
676 
677 	if (evpid > 0xffffffff) {
678 		msgid = evpid_to_msgid(evpid);
679 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
680 			return (0);
681 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
682 			return (0);
683 		if (evp->state == RQ_EVPSTATE_INFLIGHT)
684 			return (0);
685 		rq_envelope_schedule(&ramqueue, evp);
686 		return (1);
687 	}
688 	else {
689 		msgid = evpid;
690 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
691 			return (0);
692 		i = NULL;
693 		r = 0;
694 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) {
695 			if (evp->state == RQ_EVPSTATE_INFLIGHT)
696 				continue;
697 			rq_envelope_schedule(&ramqueue, evp);
698 			r++;
699 		}
700 		return (r);
701 	}
702 }
703 
704 static int
705 scheduler_ram_remove(uint64_t evpid)
706 {
707 	struct rq_message	*msg;
708 	struct rq_envelope	*evp;
709 	uint32_t		 msgid;
710 	void			*i;
711 	int			 r;
712 
713 	currtime = time(NULL);
714 
715 	if (evpid > 0xffffffff) {
716 		msgid = evpid_to_msgid(evpid);
717 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
718 			return (0);
719 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
720 			return (0);
721 		if (rq_envelope_remove(&ramqueue, evp))
722 			return (1);
723 		return (0);
724 	}
725 	else {
726 		msgid = evpid;
727 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
728 			return (0);
729 		i = NULL;
730 		r = 0;
731 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
732 			if (rq_envelope_remove(&ramqueue, evp))
733 				r++;
734 		return (r);
735 	}
736 }
737 
738 static int
739 scheduler_ram_suspend(uint64_t evpid)
740 {
741 	struct rq_message	*msg;
742 	struct rq_envelope	*evp;
743 	uint32_t		 msgid;
744 	void			*i;
745 	int			 r;
746 
747 	currtime = time(NULL);
748 
749 	if (evpid > 0xffffffff) {
750 		msgid = evpid_to_msgid(evpid);
751 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
752 			return (0);
753 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
754 			return (0);
755 		if (rq_envelope_suspend(&ramqueue, evp))
756 			return (1);
757 		return (0);
758 	}
759 	else {
760 		msgid = evpid;
761 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
762 			return (0);
763 		i = NULL;
764 		r = 0;
765 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
766 			if (rq_envelope_suspend(&ramqueue, evp))
767 				r++;
768 		return (r);
769 	}
770 }
771 
772 static int
773 scheduler_ram_resume(uint64_t evpid)
774 {
775 	struct rq_message	*msg;
776 	struct rq_envelope	*evp;
777 	uint32_t		 msgid;
778 	void			*i;
779 	int			 r;
780 
781 	currtime = time(NULL);
782 
783 	if (evpid > 0xffffffff) {
784 		msgid = evpid_to_msgid(evpid);
785 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
786 			return (0);
787 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
788 			return (0);
789 		if (rq_envelope_resume(&ramqueue, evp))
790 			return (1);
791 		return (0);
792 	}
793 	else {
794 		msgid = evpid;
795 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
796 			return (0);
797 		i = NULL;
798 		r = 0;
799 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
800 			if (rq_envelope_resume(&ramqueue, evp))
801 				r++;
802 		return (r);
803 	}
804 }
805 
806 static int
807 scheduler_ram_query(uint64_t evpid)
808 {
809 	uint32_t msgid;
810 
811 	if (evpid > 0xffffffff)
812 		msgid = evpid_to_msgid(evpid);
813 	else
814 		msgid = evpid;
815 
816 	if (tree_get(&ramqueue.messages, msgid) == NULL)
817 		return (0);
818 
819 	return (1);
820 }
821 
822 static void
823 sorted_insert(struct rq_queue *rq, struct rq_envelope *evp)
824 {
825 	struct rq_envelope	*evp2;
826 
827 	SPLAY_INSERT(prioqtree, &rq->q_priotree, evp);
828 	evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp);
829 	if (evp2)
830 		TAILQ_INSERT_BEFORE(evp2, evp, entry);
831 	else
832 		TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry);
833 }
834 
835 static void
836 rq_queue_init(struct rq_queue *rq)
837 {
838 	memset(rq, 0, sizeof *rq);
839 	tree_init(&rq->messages);
840 	TAILQ_INIT(&rq->q_pending);
841 	TAILQ_INIT(&rq->q_inflight);
842 	TAILQ_INIT(&rq->q_mta);
843 	TAILQ_INIT(&rq->q_mda);
844 	TAILQ_INIT(&rq->q_bounce);
845 	TAILQ_INIT(&rq->q_update);
846 	TAILQ_INIT(&rq->q_expired);
847 	TAILQ_INIT(&rq->q_removed);
848 	SPLAY_INIT(&rq->q_priotree);
849 }
850 
851 static void
852 rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
853 {
854 	struct rq_message	*message, *tomessage;
855 	struct rq_envelope	*envelope;
856 	uint64_t		 id;
857 	void			*i;
858 
859 	while (tree_poproot(&update->messages, &id, (void*)&message)) {
860 		if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
861 			/* message does not exist. re-use structure */
862 			tree_xset(&rq->messages, id, message);
863 			continue;
864 		}
865 		/* need to re-link all envelopes before merging them */
866 		i = NULL;
867 		while ((tree_iter(&message->envelopes, &i, &id,
868 		    (void*)&envelope)))
869 			envelope->message = tomessage;
870 		tree_merge(&tomessage->envelopes, &message->envelopes);
871 		free(message);
872 		stat_decrement("scheduler.ramqueue.message", 1);
873 	}
874 
875 	/* Sorted insert in the pending queue */
876 	while ((envelope = TAILQ_FIRST(&update->q_pending))) {
877 		TAILQ_REMOVE(&update->q_pending, envelope, entry);
878 		sorted_insert(rq, envelope);
879 	}
880 
881 	rq->evpcount += update->evpcount;
882 }
883 
884 #define SCHEDULEMAX	1024
885 
886 static void
887 rq_queue_schedule(struct rq_queue *rq)
888 {
889 	struct rq_envelope	*evp;
890 	size_t			 n;
891 
892 	n = 0;
893 	while ((evp = TAILQ_FIRST(&rq->q_pending))) {
894 		if (evp->sched > currtime && evp->expire > currtime)
895 			break;
896 
897 		if (n == SCHEDULEMAX)
898 			break;
899 
900 		if (evp->state != RQ_EVPSTATE_PENDING)
901 			errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid,
902 			    evp->flags);
903 
904 		if (evp->expire <= currtime) {
905 			TAILQ_REMOVE(&rq->q_pending, evp, entry);
906 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
907 			TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
908 			evp->state = RQ_EVPSTATE_SCHEDULED;
909 			evp->flags |= RQ_ENVELOPE_EXPIRED;
910 			evp->t_scheduled = currtime;
911 			continue;
912 		}
913 		rq_envelope_schedule(rq, evp);
914 		n += 1;
915 	}
916 }
917 
918 static struct evplist *
919 rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
920 {
921 	switch (evp->state) {
922 	case RQ_EVPSTATE_PENDING:
923 		return &rq->q_pending;
924 
925 	case RQ_EVPSTATE_SCHEDULED:
926 		if (evp->flags & RQ_ENVELOPE_EXPIRED)
927 			return &rq->q_expired;
928 		if (evp->flags & RQ_ENVELOPE_REMOVED)
929 			return &rq->q_removed;
930 		if (evp->flags & RQ_ENVELOPE_UPDATE)
931 			return &rq->q_update;
932 		if (evp->type == D_MTA)
933 			return &rq->q_mta;
934 		if (evp->type == D_MDA)
935 			return &rq->q_mda;
936 		if (evp->type == D_BOUNCE)
937 			return &rq->q_bounce;
938 		errx(1, "%016" PRIx64 " bad evp type %d", evp->evpid, evp->type);
939 
940 	case RQ_EVPSTATE_INFLIGHT:
941 		return &rq->q_inflight;
942 
943 	case RQ_EVPSTATE_HELD:
944 		return (NULL);
945 	}
946 
947 	errx(1, "%016" PRIx64 " bad state %d", evp->evpid, evp->state);
948 	return (NULL);
949 }
950 
951 static void
952 rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
953 {
954 	struct rq_holdq	*hq;
955 	struct evplist	*q = NULL;
956 
957 	switch (evp->type) {
958 	case D_MTA:
959 		q = &rq->q_mta;
960 		break;
961 	case D_MDA:
962 		q = &rq->q_mda;
963 		break;
964 	case D_BOUNCE:
965 		q = &rq->q_bounce;
966 		break;
967 	}
968 
969 	if (evp->flags & RQ_ENVELOPE_UPDATE)
970 		q = &rq->q_update;
971 
972 	if (evp->state == RQ_EVPSTATE_HELD) {
973 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
974 		TAILQ_REMOVE(&hq->q, evp, entry);
975 		hq->count -= 1;
976 		if (TAILQ_EMPTY(&hq->q)) {
977 			tree_xpop(&holdqs[evp->type], evp->holdq);
978 			free(hq);
979 		}
980 		evp->holdq = 0;
981 		stat_decrement("scheduler.ramqueue.hold", 1);
982 	}
983 	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
984 		TAILQ_REMOVE(&rq->q_pending, evp, entry);
985 		SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
986 	}
987 
988 	TAILQ_INSERT_TAIL(q, evp, entry);
989 	evp->state = RQ_EVPSTATE_SCHEDULED;
990 	evp->t_scheduled = currtime;
991 }
992 
993 static int
994 rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
995 {
996 	struct rq_holdq	*hq;
997 	struct evplist	*evl;
998 
999 	if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
1000 		return (0);
1001 	/*
1002 	 * If envelope is inflight, mark it envelope for removal.
1003 	 */
1004 	if (evp->state == RQ_EVPSTATE_INFLIGHT) {
1005 		evp->flags |= RQ_ENVELOPE_REMOVED;
1006 		return (1);
1007 	}
1008 
1009 	if (evp->state == RQ_EVPSTATE_HELD) {
1010 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
1011 		TAILQ_REMOVE(&hq->q, evp, entry);
1012 		hq->count -= 1;
1013 		if (TAILQ_EMPTY(&hq->q)) {
1014 			tree_xpop(&holdqs[evp->type], evp->holdq);
1015 			free(hq);
1016 		}
1017 		evp->holdq = 0;
1018 		stat_decrement("scheduler.ramqueue.hold", 1);
1019 	}
1020 	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
1021 		evl = rq_envelope_list(rq, evp);
1022 		TAILQ_REMOVE(evl, evp, entry);
1023 		if (evl == &rq->q_pending)
1024 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1025 	}
1026 
1027 	TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
1028 	evp->state = RQ_EVPSTATE_SCHEDULED;
1029 	evp->flags |= RQ_ENVELOPE_REMOVED;
1030 	evp->t_scheduled = currtime;
1031 
1032 	return (1);
1033 }
1034 
1035 static int
1036 rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
1037 {
1038 	struct rq_holdq	*hq;
1039 	struct evplist	*evl;
1040 
1041 	if (evp->flags & RQ_ENVELOPE_SUSPEND)
1042 		return (0);
1043 
1044 	if (evp->state == RQ_EVPSTATE_HELD) {
1045 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
1046 		TAILQ_REMOVE(&hq->q, evp, entry);
1047 		hq->count -= 1;
1048 		if (TAILQ_EMPTY(&hq->q)) {
1049 			tree_xpop(&holdqs[evp->type], evp->holdq);
1050 			free(hq);
1051 		}
1052 		evp->holdq = 0;
1053 		evp->state = RQ_EVPSTATE_PENDING;
1054 		stat_decrement("scheduler.ramqueue.hold", 1);
1055 	}
1056 	else if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1057 		evl = rq_envelope_list(rq, evp);
1058 		TAILQ_REMOVE(evl, evp, entry);
1059 		if (evl == &rq->q_pending)
1060 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1061 	}
1062 
1063 	evp->flags |= RQ_ENVELOPE_SUSPEND;
1064 
1065 	return (1);
1066 }
1067 
1068 static int
1069 rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
1070 {
1071 	struct evplist	*evl;
1072 
1073 	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
1074 		return (0);
1075 
1076 	if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1077 		evl = rq_envelope_list(rq, evp);
1078 		if (evl == &rq->q_pending)
1079 			sorted_insert(rq, evp);
1080 		else
1081 			TAILQ_INSERT_TAIL(evl, evp, entry);
1082 	}
1083 
1084 	evp->flags &= ~RQ_ENVELOPE_SUSPEND;
1085 
1086 	return (1);
1087 }
1088 
1089 static void
1090 rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
1091 {
1092 	tree_xpop(&evp->message->envelopes, evp->evpid);
1093 	if (tree_empty(&evp->message->envelopes)) {
1094 		tree_xpop(&rq->messages, evp->message->msgid);
1095 		free(evp->message);
1096 		stat_decrement("scheduler.ramqueue.message", 1);
1097 	}
1098 
1099 	free(evp);
1100 	rq->evpcount--;
1101 	stat_decrement("scheduler.ramqueue.envelope", 1);
1102 }
1103 
1104 static const char *
1105 rq_envelope_to_text(struct rq_envelope *e)
1106 {
1107 	static char	buf[256];
1108 	char		t[64];
1109 
1110 	(void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);
1111 
1112 	if (e->type == D_BOUNCE)
1113 		(void)strlcat(buf, "bounce", sizeof buf);
1114 	else if (e->type == D_MDA)
1115 		(void)strlcat(buf, "mda", sizeof buf);
1116 	else if (e->type == D_MTA)
1117 		(void)strlcat(buf, "mta", sizeof buf);
1118 
1119 	(void)snprintf(t, sizeof t, ",expire=%s",
1120 	    duration_to_text(e->expire - currtime));
1121 	(void)strlcat(buf, t, sizeof buf);
1122 
1123 
1124 	switch (e->state) {
1125 	case RQ_EVPSTATE_PENDING:
1126 		(void)snprintf(t, sizeof t, ",pending=%s",
1127 		    duration_to_text(e->sched - currtime));
1128 		(void)strlcat(buf, t, sizeof buf);
1129 		break;
1130 
1131 	case RQ_EVPSTATE_SCHEDULED:
1132 		(void)snprintf(t, sizeof t, ",scheduled=%s",
1133 		    duration_to_text(currtime - e->t_scheduled));
1134 		(void)strlcat(buf, t, sizeof buf);
1135 		break;
1136 
1137 	case RQ_EVPSTATE_INFLIGHT:
1138 		(void)snprintf(t, sizeof t, ",inflight=%s",
1139 		    duration_to_text(currtime - e->t_inflight));
1140 		(void)strlcat(buf, t, sizeof buf);
1141 		break;
1142 
1143 	case RQ_EVPSTATE_HELD:
1144 		(void)snprintf(t, sizeof t, ",held=%s",
1145 		    duration_to_text(currtime - e->t_inflight));
1146 		(void)strlcat(buf, t, sizeof buf);
1147 		break;
1148 	default:
1149 		errx(1, "%016" PRIx64 " bad state %d", e->evpid, e->state);
1150 	}
1151 
1152 	if (e->flags & RQ_ENVELOPE_REMOVED)
1153 		(void)strlcat(buf, ",removed", sizeof buf);
1154 	if (e->flags & RQ_ENVELOPE_EXPIRED)
1155 		(void)strlcat(buf, ",expired", sizeof buf);
1156 	if (e->flags & RQ_ENVELOPE_SUSPEND)
1157 		(void)strlcat(buf, ",suspended", sizeof buf);
1158 
1159 	(void)strlcat(buf, "]", sizeof buf);
1160 
1161 	return (buf);
1162 }
1163 
1164 static void
1165 rq_queue_dump(struct rq_queue *rq, const char * name)
1166 {
1167 	struct rq_message	*message;
1168 	struct rq_envelope	*envelope;
1169 	void			*i, *j;
1170 	uint64_t		 id;
1171 
1172 	log_debug("debug: /--- ramqueue: %s", name);
1173 
1174 	i = NULL;
1175 	while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
1176 		log_debug("debug: | msg:%08" PRIx32, message->msgid);
1177 		j = NULL;
1178 		while ((tree_iter(&message->envelopes, &j, &id,
1179 		    (void*)&envelope)))
1180 			log_debug("debug: |   %s",
1181 			    rq_envelope_to_text(envelope));
1182 	}
1183 	log_debug("debug: \\---");
1184 }
1185 
1186 static int
1187 rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2)
1188 {
1189 	time_t	ref1, ref2;
1190 
1191 	ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire;
1192 	ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire;
1193 	if (ref1 != ref2)
1194 		return (ref1 < ref2) ? -1 : 1;
1195 
1196 	if (e1->evpid != e2->evpid)
1197 		return (e1->evpid < e2->evpid) ? -1 : 1;
1198 
1199 	return 0;
1200 }
1201 
1202 SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
1203