xref: /openbsd-src/usr.sbin/smtpd/scheduler_ramqueue.c (revision 824adb5411e4389b29bae28eba5c2c2bbd147f34)
1 /*	$OpenBSD: scheduler_ramqueue.c,v 1.47 2021/06/14 17:58:16 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 #include <inttypes.h>
21 #include <stdlib.h>
22 #include <string.h>
23 
24 #include "smtpd.h"
25 #include "log.h"
26 
27 TAILQ_HEAD(evplist, rq_envelope);
28 
29 struct rq_message {
30 	uint32_t		 msgid;
31 	struct tree		 envelopes;
32 };
33 
34 struct rq_envelope {
35 	TAILQ_ENTRY(rq_envelope) entry;
36 	SPLAY_ENTRY(rq_envelope) t_entry;
37 
38 	uint64_t		 evpid;
39 	uint64_t		 holdq;
40 	enum delivery_type	 type;
41 
42 #define	RQ_EVPSTATE_PENDING	 0
43 #define	RQ_EVPSTATE_SCHEDULED	 1
44 #define	RQ_EVPSTATE_INFLIGHT	 2
45 #define	RQ_EVPSTATE_HELD	 3
46 	uint8_t			 state;
47 
48 #define	RQ_ENVELOPE_EXPIRED	 0x01
49 #define	RQ_ENVELOPE_REMOVED	 0x02
50 #define	RQ_ENVELOPE_SUSPEND	 0x04
51 #define	RQ_ENVELOPE_UPDATE	 0x08
52 #define	RQ_ENVELOPE_OVERFLOW	 0x10
53 	uint8_t			 flags;
54 
55 	time_t			 ctime;
56 	time_t			 sched;
57 	time_t			 expire;
58 
59 	struct rq_message	*message;
60 
61 	time_t			 t_inflight;
62 	time_t			 t_scheduled;
63 };
64 
65 struct rq_holdq {
66 	struct evplist		 q;
67 	size_t			 count;
68 };
69 
70 struct rq_queue {
71 	size_t			 evpcount;
72 	struct tree		 messages;
73 	SPLAY_HEAD(prioqtree, rq_envelope)	q_priotree;
74 
75 	struct evplist		 q_pending;
76 	struct evplist		 q_inflight;
77 
78 	struct evplist		 q_mta;
79 	struct evplist		 q_mda;
80 	struct evplist		 q_bounce;
81 	struct evplist		 q_update;
82 	struct evplist		 q_expired;
83 	struct evplist		 q_removed;
84 };
85 
86 static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *);
87 
88 SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
89 static int scheduler_ram_init(const char *);
90 static int scheduler_ram_insert(struct scheduler_info *);
91 static size_t scheduler_ram_commit(uint32_t);
92 static size_t scheduler_ram_rollback(uint32_t);
93 static int scheduler_ram_update(struct scheduler_info *);
94 static int scheduler_ram_delete(uint64_t);
95 static int scheduler_ram_hold(uint64_t, uint64_t);
96 static int scheduler_ram_release(int, uint64_t, int);
97 static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *);
98 static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t);
99 static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t);
100 static int scheduler_ram_schedule(uint64_t);
101 static int scheduler_ram_remove(uint64_t);
102 static int scheduler_ram_suspend(uint64_t);
103 static int scheduler_ram_resume(uint64_t);
104 static int scheduler_ram_query(uint64_t);
105 
106 static void sorted_insert(struct rq_queue *, struct rq_envelope *);
107 
108 static void rq_queue_init(struct rq_queue *);
109 static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
110 static void rq_queue_dump(struct rq_queue *, const char *);
111 static void rq_queue_schedule(struct rq_queue *rq);
112 static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *);
113 static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *);
114 static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *);
115 static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *);
116 static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *);
117 static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
118 static const char *rq_envelope_to_text(struct rq_envelope *);
119 
120 struct scheduler_backend scheduler_backend_ramqueue = {
121 	scheduler_ram_init,
122 
123 	scheduler_ram_insert,
124 	scheduler_ram_commit,
125 	scheduler_ram_rollback,
126 
127 	scheduler_ram_update,
128 	scheduler_ram_delete,
129 	scheduler_ram_hold,
130 	scheduler_ram_release,
131 
132 	scheduler_ram_batch,
133 
134 	scheduler_ram_messages,
135 	scheduler_ram_envelopes,
136 	scheduler_ram_schedule,
137 	scheduler_ram_remove,
138 	scheduler_ram_suspend,
139 	scheduler_ram_resume,
140 	scheduler_ram_query,
141 };
142 
143 static struct rq_queue	ramqueue;
144 static struct tree	updates;
145 static struct tree	holdqs[3]; /* delivery type */
146 
147 static time_t		currtime;
148 
149 #define BACKOFF_TRANSFER	400
150 #define BACKOFF_DELIVERY	10
151 #define BACKOFF_OVERFLOW	3
152 
153 static time_t
154 scheduler_backoff(time_t t0, time_t base, uint32_t step)
155 {
156 	return (t0 + base * step * step);
157 }
158 
159 static time_t
160 scheduler_next(time_t t0, time_t base, uint32_t step)
161 {
162 	time_t t;
163 
164 	/* XXX be more efficient */
165 	while ((t = scheduler_backoff(t0, base, step)) <= currtime)
166 		step++;
167 
168 	return (t);
169 }
170 
171 static int
172 scheduler_ram_init(const char *arg)
173 {
174 	rq_queue_init(&ramqueue);
175 	tree_init(&updates);
176 	tree_init(&holdqs[D_MDA]);
177 	tree_init(&holdqs[D_MTA]);
178 	tree_init(&holdqs[D_BOUNCE]);
179 
180 	return (1);
181 }
182 
183 static int
184 scheduler_ram_insert(struct scheduler_info *si)
185 {
186 	struct rq_queue		*update;
187 	struct rq_message	*message;
188 	struct rq_envelope	*envelope;
189 	uint32_t		 msgid;
190 
191 	currtime = time(NULL);
192 
193 	msgid = evpid_to_msgid(si->evpid);
194 
195 	/* find/prepare a ramqueue update */
196 	if ((update = tree_get(&updates, msgid)) == NULL) {
197 		update = xcalloc(1, sizeof *update);
198 		stat_increment("scheduler.ramqueue.update", 1);
199 		rq_queue_init(update);
200 		tree_xset(&updates, msgid, update);
201 	}
202 
203 	/* find/prepare the msgtree message in ramqueue update */
204 	if ((message = tree_get(&update->messages, msgid)) == NULL) {
205 		message = xcalloc(1, sizeof *message);
206 		message->msgid = msgid;
207 		tree_init(&message->envelopes);
208 		tree_xset(&update->messages, msgid, message);
209 		stat_increment("scheduler.ramqueue.message", 1);
210 	}
211 
212 	/* create envelope in ramqueue message */
213 	envelope = xcalloc(1, sizeof *envelope);
214 	envelope->evpid = si->evpid;
215 	envelope->type = si->type;
216 	envelope->message = message;
217 	envelope->ctime = si->creation;
218 	envelope->expire = si->creation + si->ttl;
219 	envelope->sched = scheduler_backoff(si->creation,
220 	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
221 	tree_xset(&message->envelopes, envelope->evpid, envelope);
222 
223 	update->evpcount++;
224 	stat_increment("scheduler.ramqueue.envelope", 1);
225 
226 	envelope->state = RQ_EVPSTATE_PENDING;
227 	TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry);
228 
229 	si->nexttry = envelope->sched;
230 
231 	return (1);
232 }
233 
234 static size_t
235 scheduler_ram_commit(uint32_t msgid)
236 {
237 	struct rq_queue	*update;
238 	size_t		 r;
239 
240 	currtime = time(NULL);
241 
242 	update = tree_xpop(&updates, msgid);
243 	r = update->evpcount;
244 
245 	if (tracing & TRACE_SCHEDULER)
246 		rq_queue_dump(update, "update to commit");
247 
248 	rq_queue_merge(&ramqueue, update);
249 
250 	if (tracing & TRACE_SCHEDULER)
251 		rq_queue_dump(&ramqueue, "resulting queue");
252 
253 	rq_queue_schedule(&ramqueue);
254 
255 	free(update);
256 	stat_decrement("scheduler.ramqueue.update", 1);
257 
258 	return (r);
259 }
260 
261 static size_t
262 scheduler_ram_rollback(uint32_t msgid)
263 {
264 	struct rq_queue		*update;
265 	struct rq_envelope	*evp;
266 	size_t			 r;
267 
268 	currtime = time(NULL);
269 
270 	if ((update = tree_pop(&updates, msgid)) == NULL)
271 		return (0);
272 	r = update->evpcount;
273 
274 	while ((evp = TAILQ_FIRST(&update->q_pending))) {
275 		TAILQ_REMOVE(&update->q_pending, evp, entry);
276 		rq_envelope_delete(update, evp);
277 	}
278 
279 	free(update);
280 	stat_decrement("scheduler.ramqueue.update", 1);
281 
282 	return (r);
283 }
284 
285 static int
286 scheduler_ram_update(struct scheduler_info *si)
287 {
288 	struct rq_message	*msg;
289 	struct rq_envelope	*evp;
290 	uint32_t		 msgid;
291 
292 	currtime = time(NULL);
293 
294 	msgid = evpid_to_msgid(si->evpid);
295 	msg = tree_xget(&ramqueue.messages, msgid);
296 	evp = tree_xget(&msg->envelopes, si->evpid);
297 
298 	/* it *must* be in-flight */
299 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
300 		fatalx("evp:%016" PRIx64 " not in-flight", si->evpid);
301 
302 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
303 
304 	/*
305 	 * If the envelope was removed while inflight,  schedule it for
306 	 * removal immediately.
307 	 */
308 	if (evp->flags & RQ_ENVELOPE_REMOVED) {
309 		TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry);
310 		evp->state = RQ_EVPSTATE_SCHEDULED;
311 		evp->t_scheduled = currtime;
312 		return (1);
313 	}
314 
315 	evp->sched = scheduler_next(evp->ctime,
316 	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
317 
318 	evp->state = RQ_EVPSTATE_PENDING;
319 	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
320 		sorted_insert(&ramqueue, evp);
321 
322 	si->nexttry = evp->sched;
323 
324 	return (1);
325 }
326 
327 static int
328 scheduler_ram_delete(uint64_t evpid)
329 {
330 	struct rq_message	*msg;
331 	struct rq_envelope	*evp;
332 	uint32_t		 msgid;
333 
334 	currtime = time(NULL);
335 
336 	msgid = evpid_to_msgid(evpid);
337 	msg = tree_xget(&ramqueue.messages, msgid);
338 	evp = tree_xget(&msg->envelopes, evpid);
339 
340 	/* it *must* be in-flight */
341 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
342 		fatalx("evp:%016" PRIx64 " not in-flight", evpid);
343 
344 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
345 
346 	rq_envelope_delete(&ramqueue, evp);
347 
348 	return (1);
349 }
350 
351 #define HOLDQ_MAXSIZE	1000
352 
353 static int
354 scheduler_ram_hold(uint64_t evpid, uint64_t holdq)
355 {
356 	struct rq_holdq		*hq;
357 	struct rq_message	*msg;
358 	struct rq_envelope	*evp;
359 	uint32_t		 msgid;
360 
361 	currtime = time(NULL);
362 
363 	msgid = evpid_to_msgid(evpid);
364 	msg = tree_xget(&ramqueue.messages, msgid);
365 	evp = tree_xget(&msg->envelopes, evpid);
366 
367 	/* it *must* be in-flight */
368 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
369 		fatalx("evp:%016" PRIx64 " not in-flight", evpid);
370 
371 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
372 
373 	/* If the envelope is suspended, just mark it as pending */
374 	if (evp->flags & RQ_ENVELOPE_SUSPEND) {
375 		evp->state = RQ_EVPSTATE_PENDING;
376 		return (0);
377 	}
378 
379 	hq = tree_get(&holdqs[evp->type], holdq);
380 	if (hq == NULL) {
381 		hq = xcalloc(1, sizeof(*hq));
382 		TAILQ_INIT(&hq->q);
383 		tree_xset(&holdqs[evp->type], holdq, hq);
384 		stat_increment("scheduler.ramqueue.holdq", 1);
385 	}
386 
387 	/* If the holdq is full, just "tempfail" the envelope */
388 	if (hq->count >= HOLDQ_MAXSIZE) {
389 		evp->state = RQ_EVPSTATE_PENDING;
390 		evp->flags |= RQ_ENVELOPE_UPDATE;
391 		evp->flags |= RQ_ENVELOPE_OVERFLOW;
392 		sorted_insert(&ramqueue, evp);
393 		stat_increment("scheduler.ramqueue.hold-overflow", 1);
394 		return (0);
395 	}
396 
397 	evp->state = RQ_EVPSTATE_HELD;
398 	evp->holdq = holdq;
399 	/* This is an optimization: upon release, the envelopes will be
400 	 * inserted in the pending queue from the first element to the last.
401 	 * Since elements already in the queue were received first, they
402 	 * were scheduled first, so they will be reinserted before the
403 	 * current element.
404 	 */
405 	TAILQ_INSERT_HEAD(&hq->q, evp, entry);
406 	hq->count += 1;
407 	stat_increment("scheduler.ramqueue.hold", 1);
408 
409 	return (1);
410 }
411 
412 static int
413 scheduler_ram_release(int type, uint64_t holdq, int n)
414 {
415 	struct rq_holdq		*hq;
416 	struct rq_envelope	*evp;
417 	int			 i, update;
418 
419 	currtime = time(NULL);
420 
421 	hq = tree_get(&holdqs[type], holdq);
422 	if (hq == NULL)
423 		return (0);
424 
425 	if (n == -1) {
426 		n = 0;
427 		update = 1;
428 	}
429 	else
430 		update = 0;
431 
432 	for (i = 0; n == 0 || i < n; i++) {
433 		evp = TAILQ_FIRST(&hq->q);
434 		if (evp == NULL)
435 			break;
436 
437 		TAILQ_REMOVE(&hq->q, evp, entry);
438 		hq->count -= 1;
439 		evp->holdq = 0;
440 
441 		/* When released, all envelopes are put in the pending queue
442 		 * and will be rescheduled immediately.  As an optimization,
443 		 * we could just schedule them directly.
444 		 */
445 		evp->state = RQ_EVPSTATE_PENDING;
446 		if (update)
447 			evp->flags |= RQ_ENVELOPE_UPDATE;
448 		sorted_insert(&ramqueue, evp);
449 	}
450 
451 	if (TAILQ_EMPTY(&hq->q)) {
452 		tree_xpop(&holdqs[type], holdq);
453 		free(hq);
454 		stat_decrement("scheduler.ramqueue.holdq", 1);
455 	}
456 	stat_decrement("scheduler.ramqueue.hold", i);
457 
458 	return (i);
459 }
460 
461 static int
462 scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types)
463 {
464 	struct rq_envelope	*evp;
465 	size_t			 i, n;
466 	time_t			 t;
467 
468 	currtime = time(NULL);
469 
470 	rq_queue_schedule(&ramqueue);
471 	if (tracing & TRACE_SCHEDULER)
472 		rq_queue_dump(&ramqueue, "scheduler_ram_batch()");
473 
474 	i = 0;
475 	n = 0;
476 
477 	for (;;) {
478 
479 		if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) {
480 			TAILQ_REMOVE(&ramqueue.q_removed, evp, entry);
481 			types[i] = SCHED_REMOVE;
482 			evpids[i] = evp->evpid;
483 			rq_envelope_delete(&ramqueue, evp);
484 
485 			if (++i == *count)
486 				break;
487 		}
488 
489 		if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) {
490 			TAILQ_REMOVE(&ramqueue.q_expired, evp, entry);
491 			types[i] = SCHED_EXPIRE;
492 			evpids[i] = evp->evpid;
493 			rq_envelope_delete(&ramqueue, evp);
494 
495 			if (++i == *count)
496 				break;
497 		}
498 
499 		if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) {
500 			TAILQ_REMOVE(&ramqueue.q_update, evp, entry);
501 			types[i] = SCHED_UPDATE;
502 			evpids[i] = evp->evpid;
503 
504 			if (evp->flags & RQ_ENVELOPE_OVERFLOW)
505 				t = BACKOFF_OVERFLOW;
506 			else if (evp->type == D_MTA)
507 				t = BACKOFF_TRANSFER;
508 			else
509 				t = BACKOFF_DELIVERY;
510 
511 			evp->sched = scheduler_next(evp->ctime, t, 0);
512 			evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW);
513 			evp->state = RQ_EVPSTATE_PENDING;
514 			if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
515 				sorted_insert(&ramqueue, evp);
516 
517 			if (++i == *count)
518 				break;
519 		}
520 
521 		if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) {
522 			TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry);
523 			types[i] = SCHED_BOUNCE;
524 			evpids[i] = evp->evpid;
525 
526 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
527 			evp->state = RQ_EVPSTATE_INFLIGHT;
528 			evp->t_inflight = currtime;
529 
530 			if (++i == *count)
531 				break;
532 		}
533 
534 		if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) {
535 			TAILQ_REMOVE(&ramqueue.q_mda, evp, entry);
536 			types[i] = SCHED_MDA;
537 			evpids[i] = evp->evpid;
538 
539 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
540 			evp->state = RQ_EVPSTATE_INFLIGHT;
541 			evp->t_inflight = currtime;
542 
543 			if (++i == *count)
544 				break;
545 		}
546 
547 		if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) {
548 			TAILQ_REMOVE(&ramqueue.q_mta, evp, entry);
549 			types[i] = SCHED_MTA;
550 			evpids[i] = evp->evpid;
551 
552 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
553 			evp->state = RQ_EVPSTATE_INFLIGHT;
554 			evp->t_inflight = currtime;
555 
556 			if (++i == *count)
557 				break;
558 		}
559 
560 		/* nothing seen this round */
561 		if (i == n)
562 			break;
563 
564 		n = i;
565 	}
566 
567 	if (i) {
568 		*count = i;
569 		return (1);
570 	}
571 
572 	if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
573 		if (evp->sched < evp->expire)
574 			t = evp->sched;
575 		else
576 			t = evp->expire;
577 		*delay = (t < currtime) ? 0 : (t - currtime);
578 	}
579 	else
580 		*delay = -1;
581 
582 	return (0);
583 }
584 
585 static size_t
586 scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size)
587 {
588 	uint64_t id;
589 	size_t	 n;
590 	void	*i;
591 
592 	for (n = 0, i = NULL; n < size; n++) {
593 		if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
594 			break;
595 		dst[n] = id;
596 	}
597 
598 	return (n);
599 }
600 
601 static size_t
602 scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size)
603 {
604 	struct rq_message	*msg;
605 	struct rq_envelope	*evp;
606 	void			*i;
607 	size_t			 n;
608 
609 	if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
610 		return (0);
611 
612 	for (n = 0, i = NULL; n < size; ) {
613 
614 		if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
615 		    (void**)&evp) == 0)
616 			break;
617 
618 		if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
619 			continue;
620 
621 		dst[n].evpid = evp->evpid;
622 		dst[n].flags = 0;
623 		dst[n].retry = 0;
624 		dst[n].time = 0;
625 
626 		if (evp->state == RQ_EVPSTATE_PENDING) {
627 			dst[n].time = evp->sched;
628 			dst[n].flags = EF_PENDING;
629 		}
630 		else if (evp->state == RQ_EVPSTATE_SCHEDULED) {
631 			dst[n].time = evp->t_scheduled;
632 			dst[n].flags = EF_PENDING;
633 		}
634 		else if (evp->state == RQ_EVPSTATE_INFLIGHT) {
635 			dst[n].time = evp->t_inflight;
636 			dst[n].flags = EF_INFLIGHT;
637 		}
638 		else if (evp->state == RQ_EVPSTATE_HELD) {
639 			/* same as scheduled */
640 			dst[n].time = evp->t_scheduled;
641 			dst[n].flags = EF_PENDING;
642 			dst[n].flags |= EF_HOLD;
643 		}
644 		if (evp->flags & RQ_ENVELOPE_SUSPEND)
645 			dst[n].flags |= EF_SUSPEND;
646 
647 		n++;
648 	}
649 
650 	return (n);
651 }
652 
653 static int
654 scheduler_ram_schedule(uint64_t evpid)
655 {
656 	struct rq_message	*msg;
657 	struct rq_envelope	*evp;
658 	uint32_t		 msgid;
659 	void			*i;
660 	int			 r;
661 
662 	currtime = time(NULL);
663 
664 	if (evpid > 0xffffffff) {
665 		msgid = evpid_to_msgid(evpid);
666 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
667 			return (0);
668 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
669 			return (0);
670 		if (evp->state == RQ_EVPSTATE_INFLIGHT)
671 			return (0);
672 		rq_envelope_schedule(&ramqueue, evp);
673 		return (1);
674 	}
675 	else {
676 		msgid = evpid;
677 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
678 			return (0);
679 		i = NULL;
680 		r = 0;
681 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) {
682 			if (evp->state == RQ_EVPSTATE_INFLIGHT)
683 				continue;
684 			rq_envelope_schedule(&ramqueue, evp);
685 			r++;
686 		}
687 		return (r);
688 	}
689 }
690 
691 static int
692 scheduler_ram_remove(uint64_t evpid)
693 {
694 	struct rq_message	*msg;
695 	struct rq_envelope	*evp;
696 	uint32_t		 msgid;
697 	void			*i;
698 	int			 r;
699 
700 	currtime = time(NULL);
701 
702 	if (evpid > 0xffffffff) {
703 		msgid = evpid_to_msgid(evpid);
704 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
705 			return (0);
706 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
707 			return (0);
708 		if (rq_envelope_remove(&ramqueue, evp))
709 			return (1);
710 		return (0);
711 	}
712 	else {
713 		msgid = evpid;
714 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
715 			return (0);
716 		i = NULL;
717 		r = 0;
718 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
719 			if (rq_envelope_remove(&ramqueue, evp))
720 				r++;
721 		return (r);
722 	}
723 }
724 
725 static int
726 scheduler_ram_suspend(uint64_t evpid)
727 {
728 	struct rq_message	*msg;
729 	struct rq_envelope	*evp;
730 	uint32_t		 msgid;
731 	void			*i;
732 	int			 r;
733 
734 	currtime = time(NULL);
735 
736 	if (evpid > 0xffffffff) {
737 		msgid = evpid_to_msgid(evpid);
738 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
739 			return (0);
740 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
741 			return (0);
742 		if (rq_envelope_suspend(&ramqueue, evp))
743 			return (1);
744 		return (0);
745 	}
746 	else {
747 		msgid = evpid;
748 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
749 			return (0);
750 		i = NULL;
751 		r = 0;
752 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
753 			if (rq_envelope_suspend(&ramqueue, evp))
754 				r++;
755 		return (r);
756 	}
757 }
758 
759 static int
760 scheduler_ram_resume(uint64_t evpid)
761 {
762 	struct rq_message	*msg;
763 	struct rq_envelope	*evp;
764 	uint32_t		 msgid;
765 	void			*i;
766 	int			 r;
767 
768 	currtime = time(NULL);
769 
770 	if (evpid > 0xffffffff) {
771 		msgid = evpid_to_msgid(evpid);
772 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
773 			return (0);
774 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
775 			return (0);
776 		if (rq_envelope_resume(&ramqueue, evp))
777 			return (1);
778 		return (0);
779 	}
780 	else {
781 		msgid = evpid;
782 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
783 			return (0);
784 		i = NULL;
785 		r = 0;
786 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
787 			if (rq_envelope_resume(&ramqueue, evp))
788 				r++;
789 		return (r);
790 	}
791 }
792 
793 static int
794 scheduler_ram_query(uint64_t evpid)
795 {
796 	uint32_t msgid;
797 
798 	if (evpid > 0xffffffff)
799 		msgid = evpid_to_msgid(evpid);
800 	else
801 		msgid = evpid;
802 
803 	if (tree_get(&ramqueue.messages, msgid) == NULL)
804 		return (0);
805 
806 	return (1);
807 }
808 
809 static void
810 sorted_insert(struct rq_queue *rq, struct rq_envelope *evp)
811 {
812 	struct rq_envelope	*evp2;
813 
814 	SPLAY_INSERT(prioqtree, &rq->q_priotree, evp);
815 	evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp);
816 	if (evp2)
817 		TAILQ_INSERT_BEFORE(evp2, evp, entry);
818 	else
819 		TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry);
820 }
821 
822 static void
823 rq_queue_init(struct rq_queue *rq)
824 {
825 	memset(rq, 0, sizeof *rq);
826 	tree_init(&rq->messages);
827 	TAILQ_INIT(&rq->q_pending);
828 	TAILQ_INIT(&rq->q_inflight);
829 	TAILQ_INIT(&rq->q_mta);
830 	TAILQ_INIT(&rq->q_mda);
831 	TAILQ_INIT(&rq->q_bounce);
832 	TAILQ_INIT(&rq->q_update);
833 	TAILQ_INIT(&rq->q_expired);
834 	TAILQ_INIT(&rq->q_removed);
835 	SPLAY_INIT(&rq->q_priotree);
836 }
837 
838 static void
839 rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
840 {
841 	struct rq_message	*message, *tomessage;
842 	struct rq_envelope	*envelope;
843 	uint64_t		 id;
844 	void			*i;
845 
846 	while (tree_poproot(&update->messages, &id, (void*)&message)) {
847 		if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
848 			/* message does not exist. re-use structure */
849 			tree_xset(&rq->messages, id, message);
850 			continue;
851 		}
852 		/* need to re-link all envelopes before merging them */
853 		i = NULL;
854 		while ((tree_iter(&message->envelopes, &i, &id,
855 		    (void*)&envelope)))
856 			envelope->message = tomessage;
857 		tree_merge(&tomessage->envelopes, &message->envelopes);
858 		free(message);
859 		stat_decrement("scheduler.ramqueue.message", 1);
860 	}
861 
862 	/* Sorted insert in the pending queue */
863 	while ((envelope = TAILQ_FIRST(&update->q_pending))) {
864 		TAILQ_REMOVE(&update->q_pending, envelope, entry);
865 		sorted_insert(rq, envelope);
866 	}
867 
868 	rq->evpcount += update->evpcount;
869 }
870 
871 #define SCHEDULEMAX	1024
872 
873 static void
874 rq_queue_schedule(struct rq_queue *rq)
875 {
876 	struct rq_envelope	*evp;
877 	size_t			 n;
878 
879 	n = 0;
880 	while ((evp = TAILQ_FIRST(&rq->q_pending))) {
881 		if (evp->sched > currtime && evp->expire > currtime)
882 			break;
883 
884 		if (n == SCHEDULEMAX)
885 			break;
886 
887 		if (evp->state != RQ_EVPSTATE_PENDING)
888 			fatalx("evp:%016" PRIx64 " flags=0x%x", evp->evpid,
889 			    evp->flags);
890 
891 		if (evp->expire <= currtime) {
892 			TAILQ_REMOVE(&rq->q_pending, evp, entry);
893 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
894 			TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
895 			evp->state = RQ_EVPSTATE_SCHEDULED;
896 			evp->flags |= RQ_ENVELOPE_EXPIRED;
897 			evp->t_scheduled = currtime;
898 			continue;
899 		}
900 		rq_envelope_schedule(rq, evp);
901 		n += 1;
902 	}
903 }
904 
905 static struct evplist *
906 rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
907 {
908 	switch (evp->state) {
909 	case RQ_EVPSTATE_PENDING:
910 		return &rq->q_pending;
911 
912 	case RQ_EVPSTATE_SCHEDULED:
913 		if (evp->flags & RQ_ENVELOPE_EXPIRED)
914 			return &rq->q_expired;
915 		if (evp->flags & RQ_ENVELOPE_REMOVED)
916 			return &rq->q_removed;
917 		if (evp->flags & RQ_ENVELOPE_UPDATE)
918 			return &rq->q_update;
919 		if (evp->type == D_MTA)
920 			return &rq->q_mta;
921 		if (evp->type == D_MDA)
922 			return &rq->q_mda;
923 		if (evp->type == D_BOUNCE)
924 			return &rq->q_bounce;
925 		fatalx("%016" PRIx64 " bad evp type %d", evp->evpid, evp->type);
926 
927 	case RQ_EVPSTATE_INFLIGHT:
928 		return &rq->q_inflight;
929 
930 	case RQ_EVPSTATE_HELD:
931 		return (NULL);
932 	}
933 
934 	fatalx("%016" PRIx64 " bad state %d", evp->evpid, evp->state);
935 	return (NULL);
936 }
937 
938 static void
939 rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
940 {
941 	struct rq_holdq	*hq;
942 	struct evplist	*q = NULL;
943 
944 	switch (evp->type) {
945 	case D_MTA:
946 		q = &rq->q_mta;
947 		break;
948 	case D_MDA:
949 		q = &rq->q_mda;
950 		break;
951 	case D_BOUNCE:
952 		q = &rq->q_bounce;
953 		break;
954 	}
955 
956 	if (evp->flags & RQ_ENVELOPE_UPDATE)
957 		q = &rq->q_update;
958 
959 	if (evp->state == RQ_EVPSTATE_HELD) {
960 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
961 		TAILQ_REMOVE(&hq->q, evp, entry);
962 		hq->count -= 1;
963 		if (TAILQ_EMPTY(&hq->q)) {
964 			tree_xpop(&holdqs[evp->type], evp->holdq);
965 			free(hq);
966 		}
967 		evp->holdq = 0;
968 		stat_decrement("scheduler.ramqueue.hold", 1);
969 	}
970 	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
971 		TAILQ_REMOVE(&rq->q_pending, evp, entry);
972 		SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
973 	}
974 
975 	TAILQ_INSERT_TAIL(q, evp, entry);
976 	evp->state = RQ_EVPSTATE_SCHEDULED;
977 	evp->t_scheduled = currtime;
978 }
979 
980 static int
981 rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
982 {
983 	struct rq_holdq	*hq;
984 	struct evplist	*evl;
985 
986 	if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
987 		return (0);
988 	/*
989 	 * If envelope is inflight, mark it envelope for removal.
990 	 */
991 	if (evp->state == RQ_EVPSTATE_INFLIGHT) {
992 		evp->flags |= RQ_ENVELOPE_REMOVED;
993 		return (1);
994 	}
995 
996 	if (evp->state == RQ_EVPSTATE_HELD) {
997 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
998 		TAILQ_REMOVE(&hq->q, evp, entry);
999 		hq->count -= 1;
1000 		if (TAILQ_EMPTY(&hq->q)) {
1001 			tree_xpop(&holdqs[evp->type], evp->holdq);
1002 			free(hq);
1003 		}
1004 		evp->holdq = 0;
1005 		stat_decrement("scheduler.ramqueue.hold", 1);
1006 	}
1007 	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
1008 		evl = rq_envelope_list(rq, evp);
1009 		TAILQ_REMOVE(evl, evp, entry);
1010 		if (evl == &rq->q_pending)
1011 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1012 	}
1013 
1014 	TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
1015 	evp->state = RQ_EVPSTATE_SCHEDULED;
1016 	evp->flags |= RQ_ENVELOPE_REMOVED;
1017 	evp->t_scheduled = currtime;
1018 
1019 	return (1);
1020 }
1021 
1022 static int
1023 rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
1024 {
1025 	struct rq_holdq	*hq;
1026 	struct evplist	*evl;
1027 
1028 	if (evp->flags & RQ_ENVELOPE_SUSPEND)
1029 		return (0);
1030 
1031 	if (evp->state == RQ_EVPSTATE_HELD) {
1032 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
1033 		TAILQ_REMOVE(&hq->q, evp, entry);
1034 		hq->count -= 1;
1035 		if (TAILQ_EMPTY(&hq->q)) {
1036 			tree_xpop(&holdqs[evp->type], evp->holdq);
1037 			free(hq);
1038 		}
1039 		evp->holdq = 0;
1040 		evp->state = RQ_EVPSTATE_PENDING;
1041 		stat_decrement("scheduler.ramqueue.hold", 1);
1042 	}
1043 	else if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1044 		evl = rq_envelope_list(rq, evp);
1045 		TAILQ_REMOVE(evl, evp, entry);
1046 		if (evl == &rq->q_pending)
1047 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1048 	}
1049 
1050 	evp->flags |= RQ_ENVELOPE_SUSPEND;
1051 
1052 	return (1);
1053 }
1054 
1055 static int
1056 rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
1057 {
1058 	struct evplist	*evl;
1059 
1060 	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
1061 		return (0);
1062 
1063 	if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1064 		evl = rq_envelope_list(rq, evp);
1065 		if (evl == &rq->q_pending)
1066 			sorted_insert(rq, evp);
1067 		else
1068 			TAILQ_INSERT_TAIL(evl, evp, entry);
1069 	}
1070 
1071 	evp->flags &= ~RQ_ENVELOPE_SUSPEND;
1072 
1073 	return (1);
1074 }
1075 
1076 static void
1077 rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
1078 {
1079 	tree_xpop(&evp->message->envelopes, evp->evpid);
1080 	if (tree_empty(&evp->message->envelopes)) {
1081 		tree_xpop(&rq->messages, evp->message->msgid);
1082 		free(evp->message);
1083 		stat_decrement("scheduler.ramqueue.message", 1);
1084 	}
1085 
1086 	free(evp);
1087 	rq->evpcount--;
1088 	stat_decrement("scheduler.ramqueue.envelope", 1);
1089 }
1090 
1091 static const char *
1092 rq_envelope_to_text(struct rq_envelope *e)
1093 {
1094 	static char	buf[256];
1095 	char		t[64];
1096 
1097 	(void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);
1098 
1099 	if (e->type == D_BOUNCE)
1100 		(void)strlcat(buf, "bounce", sizeof buf);
1101 	else if (e->type == D_MDA)
1102 		(void)strlcat(buf, "mda", sizeof buf);
1103 	else if (e->type == D_MTA)
1104 		(void)strlcat(buf, "mta", sizeof buf);
1105 
1106 	(void)snprintf(t, sizeof t, ",expire=%s",
1107 	    duration_to_text(e->expire - currtime));
1108 	(void)strlcat(buf, t, sizeof buf);
1109 
1110 
1111 	switch (e->state) {
1112 	case RQ_EVPSTATE_PENDING:
1113 		(void)snprintf(t, sizeof t, ",pending=%s",
1114 		    duration_to_text(e->sched - currtime));
1115 		(void)strlcat(buf, t, sizeof buf);
1116 		break;
1117 
1118 	case RQ_EVPSTATE_SCHEDULED:
1119 		(void)snprintf(t, sizeof t, ",scheduled=%s",
1120 		    duration_to_text(currtime - e->t_scheduled));
1121 		(void)strlcat(buf, t, sizeof buf);
1122 		break;
1123 
1124 	case RQ_EVPSTATE_INFLIGHT:
1125 		(void)snprintf(t, sizeof t, ",inflight=%s",
1126 		    duration_to_text(currtime - e->t_inflight));
1127 		(void)strlcat(buf, t, sizeof buf);
1128 		break;
1129 
1130 	case RQ_EVPSTATE_HELD:
1131 		(void)snprintf(t, sizeof t, ",held=%s",
1132 		    duration_to_text(currtime - e->t_inflight));
1133 		(void)strlcat(buf, t, sizeof buf);
1134 		break;
1135 	default:
1136 		fatalx("%016" PRIx64 " bad state %d", e->evpid, e->state);
1137 	}
1138 
1139 	if (e->flags & RQ_ENVELOPE_REMOVED)
1140 		(void)strlcat(buf, ",removed", sizeof buf);
1141 	if (e->flags & RQ_ENVELOPE_EXPIRED)
1142 		(void)strlcat(buf, ",expired", sizeof buf);
1143 	if (e->flags & RQ_ENVELOPE_SUSPEND)
1144 		(void)strlcat(buf, ",suspended", sizeof buf);
1145 
1146 	(void)strlcat(buf, "]", sizeof buf);
1147 
1148 	return (buf);
1149 }
1150 
1151 static void
1152 rq_queue_dump(struct rq_queue *rq, const char * name)
1153 {
1154 	struct rq_message	*message;
1155 	struct rq_envelope	*envelope;
1156 	void			*i, *j;
1157 	uint64_t		 id;
1158 
1159 	log_debug("debug: /--- ramqueue: %s", name);
1160 
1161 	i = NULL;
1162 	while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
1163 		log_debug("debug: | msg:%08" PRIx32, message->msgid);
1164 		j = NULL;
1165 		while ((tree_iter(&message->envelopes, &j, &id,
1166 		    (void*)&envelope)))
1167 			log_debug("debug: |   %s",
1168 			    rq_envelope_to_text(envelope));
1169 	}
1170 	log_debug("debug: \\---");
1171 }
1172 
1173 static int
1174 rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2)
1175 {
1176 	time_t	ref1, ref2;
1177 
1178 	ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire;
1179 	ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire;
1180 	if (ref1 != ref2)
1181 		return (ref1 < ref2) ? -1 : 1;
1182 
1183 	if (e1->evpid != e2->evpid)
1184 		return (e1->evpid < e2->evpid) ? -1 : 1;
1185 
1186 	return 0;
1187 }
1188 
1189 SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
1190