xref: /openbsd-src/usr.sbin/smtpd/scheduler_ramqueue.c (revision 4c1e55dc91edd6e69ccc60ce855900fbc12cf34f)
1 /*	$OpenBSD: scheduler_ramqueue.c,v 1.11 2012/07/10 11:13:40 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/param.h>
23 #include <sys/socket.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <event.h>
28 #include <fcntl.h>
29 #include <imsg.h>
30 #include <inttypes.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <time.h>
35 
36 #include "smtpd.h"
37 #include "log.h"
38 
39 
40 struct ramqueue_host {
41 	RB_ENTRY(ramqueue_host)		hosttree_entry;
42 	TAILQ_HEAD(,ramqueue_batch)	batch_queue;
43 	char				hostname[MAXHOSTNAMELEN];
44 };
45 struct ramqueue_batch {
46 	enum delivery_type		type;
47 	TAILQ_ENTRY(ramqueue_batch)	batch_entry;
48 	TAILQ_HEAD(,ramqueue_envelope)	envelope_queue;
49 	struct ramqueue_host	       *rq_host;
50 	u_int64_t			b_id;
51 	u_int32_t      			msgid;
52 	u_int32_t			evpcnt;
53 };
54 struct ramqueue_envelope {
55 	TAILQ_ENTRY(ramqueue_envelope)	 queue_entry;
56 	TAILQ_ENTRY(ramqueue_envelope)	 batchqueue_entry;
57 	RB_ENTRY(ramqueue_envelope)	 evptree_entry;
58 	struct ramqueue_batch		*rq_batch;
59 	struct ramqueue_message		*rq_msg;
60 	struct ramqueue_host		*rq_host;
61 	u_int64_t      			 evpid;
62 	time_t				 sched;
63 };
64 struct ramqueue_message {
65 	RB_ENTRY(ramqueue_message)		msgtree_entry;
66 	RB_HEAD(evptree, ramqueue_envelope)	evptree;
67 	u_int32_t				msgid;
68 	u_int32_t				evpcnt;
69 };
70 struct ramqueue {
71 	RB_HEAD(hosttree, ramqueue_host)	hosttree;
72 	RB_HEAD(msgtree, ramqueue_message)	msgtree;
73 	RB_HEAD(offloadtree, ramqueue_envelope)	offloadtree;
74 	TAILQ_HEAD(,ramqueue_envelope)		queue;
75 };
76 
77 RB_PROTOTYPE(hosttree,    ramqueue_host, hosttree_entry, ramqueue_host_cmp);
78 RB_PROTOTYPE(msgtree,     ramqueue_message, msg_entry, ramqueue_msg_cmp);
79 RB_PROTOTYPE(evptree,     ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
80 RB_PROTOTYPE(offloadtree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
81 
82 enum ramqueue_iter_type {
83 	RAMQUEUE_ITER_HOST,
84 	RAMQUEUE_ITER_BATCH,
85 	RAMQUEUE_ITER_MESSAGE,
86 	RAMQUEUE_ITER_QUEUE
87 };
88 
89 struct ramqueue_iter {
90 	enum ramqueue_iter_type		type;
91 	union {
92 		struct ramqueue_host		*host;
93 		struct ramqueue_batch		*batch;
94 		struct ramqueue_message		*message;
95 	} u;
96 };
97 
98 
99 static int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *);
100 static int ramqueue_msg_cmp(struct ramqueue_message *, struct ramqueue_message *);
101 static int ramqueue_evp_cmp(struct ramqueue_envelope *, struct ramqueue_envelope *);
102 static struct ramqueue_host *ramqueue_lookup_host(char *);
103 static struct ramqueue_host *ramqueue_insert_host(char *);
104 static void ramqueue_remove_host(struct ramqueue_host *);
105 static struct ramqueue_batch *ramqueue_lookup_batch(struct ramqueue_host *,
106     u_int32_t);
107 static struct ramqueue_batch *ramqueue_insert_batch(struct ramqueue_host *,
108     u_int32_t);
109 static void ramqueue_remove_batch(struct ramqueue_host *, struct ramqueue_batch *);
110 static struct ramqueue_message *ramqueue_lookup_message(u_int32_t);
111 static struct ramqueue_message *ramqueue_insert_message(u_int32_t);
112 static void ramqueue_remove_message(struct ramqueue_message *);
113 
114 static struct ramqueue_envelope *ramqueue_lookup_envelope(u_int64_t);
115 static struct ramqueue_envelope *ramqueue_lookup_offload(u_int64_t);
116 
117 
118 /*NEEDSFIX*/
119 static int ramqueue_expire(struct envelope *);
120 static time_t ramqueue_next_schedule(struct scheduler_info *, time_t);
121 
122 static void  scheduler_ramqueue_init(void);
123 static int   scheduler_ramqueue_setup(void);
124 static int   scheduler_ramqueue_next(u_int64_t *, time_t *);
125 static void  scheduler_ramqueue_insert(struct scheduler_info *);
126 static void  scheduler_ramqueue_schedule(u_int64_t);
127 static void  scheduler_ramqueue_remove(u_int64_t);
128 static void *scheduler_ramqueue_host(char *);
129 static void *scheduler_ramqueue_message(u_int32_t);
130 static void *scheduler_ramqueue_batch(u_int64_t);
131 static void *scheduler_ramqueue_queue(void);
132 static void  scheduler_ramqueue_close(void *);
133 static int   scheduler_ramqueue_fetch(void *, u_int64_t *);
134 static int   scheduler_ramqueue_force(u_int64_t);
135 static void  scheduler_ramqueue_display(void);
136 
137 struct scheduler_backend scheduler_backend_ramqueue = {
138 	scheduler_ramqueue_init,
139 	scheduler_ramqueue_setup,
140 	scheduler_ramqueue_next,
141 	scheduler_ramqueue_insert,
142 	scheduler_ramqueue_schedule,
143 	scheduler_ramqueue_remove,
144 	scheduler_ramqueue_host,
145 	scheduler_ramqueue_message,
146 	scheduler_ramqueue_batch,
147 	scheduler_ramqueue_queue,
148 	scheduler_ramqueue_close,
149 	scheduler_ramqueue_fetch,
150 	scheduler_ramqueue_force,
151 	scheduler_ramqueue_display
152 };
153 static struct ramqueue	ramqueue;
154 
155 static void
156 scheduler_ramqueue_display_hosttree(void)
157 {
158 	struct ramqueue_host		*rq_host;
159 	struct ramqueue_batch		*rq_batch;
160 	struct ramqueue_envelope	*rq_evp;
161 
162 	log_debug("\tscheduler_ramqueue: hosttree display");
163 	RB_FOREACH(rq_host, hosttree, &ramqueue.hosttree) {
164 		log_debug("\t\thost: [%p] %s", rq_host, rq_host->hostname);
165 		TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) {
166 			log_debug("\t\t\tbatch: [%p] %016x",
167 			    rq_batch, rq_batch->msgid);
168 			TAILQ_FOREACH(rq_evp, &rq_batch->envelope_queue,
169 			    batchqueue_entry) {
170 				log_debug("\t\t\t\tevpid: [%p] %016"PRIx64,
171 				    rq_evp, rq_evp->evpid);
172 			}
173 		}
174 	}
175 }
176 
177 static void
178 scheduler_ramqueue_display_msgtree(void)
179 {
180 	struct ramqueue_message		*rq_msg;
181 	struct ramqueue_envelope	*rq_evp;
182 
183 	log_debug("\tscheduler_ramqueue: msgtree display");
184 	RB_FOREACH(rq_msg, msgtree, &ramqueue.msgtree) {
185 		log_debug("\t\tmsg: [%p] %016x", rq_msg, rq_msg->msgid);
186 		RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) {
187 			log_debug("\t\t\tevp: [%p] %016"PRIx64,
188 			    rq_evp, rq_evp->evpid);
189 		}
190 	}
191 }
192 
193 static void
194 scheduler_ramqueue_display_offloadtree(void)
195 {
196 	struct ramqueue_envelope	*rq_evp;
197 
198 	log_debug("\tscheduler_ramqueue: offloadtree display");
199 	RB_FOREACH(rq_evp, offloadtree, &ramqueue.offloadtree) {
200 		log_debug("\t\t\tevp: [%p] %016"PRIx64,
201 		    rq_evp, rq_evp->evpid);
202 	}
203 }
204 
205 static void
206 scheduler_ramqueue_display_queue(void)
207 {
208 	struct ramqueue_envelope *rq_evp;
209 
210 	log_debug("\tscheduler_ramqueue: queue display");
211 	TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) {
212 		log_debug("\t\tevpid: [%p] [batch: %p], %016"PRIx64,
213 		    rq_evp, rq_evp->rq_batch, rq_evp->evpid);
214 	}
215 }
216 
217 static void
218 scheduler_ramqueue_display(void)
219 {
220 	log_debug("scheduler_ramqueue: display");
221 	scheduler_ramqueue_display_hosttree();
222 	scheduler_ramqueue_display_msgtree();
223 	scheduler_ramqueue_display_offloadtree();
224 	scheduler_ramqueue_display_queue();
225 }
226 
227 static void
228 scheduler_ramqueue_init(void)
229 {
230 	log_debug("scheduler_ramqueue: init");
231 	bzero(&ramqueue, sizeof (ramqueue));
232 	TAILQ_INIT(&ramqueue.queue);
233 	RB_INIT(&ramqueue.hosttree);
234 	RB_INIT(&ramqueue.msgtree);
235 	RB_INIT(&ramqueue.offloadtree);
236 }
237 
238 static int
239 scheduler_ramqueue_setup(void)
240 {
241 	struct envelope		envelope;
242 	static struct qwalk    *q = NULL;
243 	u_int64_t	evpid;
244 	struct scheduler_info	si;
245 
246 	log_debug("scheduler_ramqueue: load");
247 
248 	log_info("scheduler_ramqueue: queue loading in progress");
249 	if (q == NULL)
250 		q = qwalk_new(0);
251 
252 	while (qwalk(q, &evpid)) {
253 		/* the envelope is already in ramqueue, skip */
254 		if (ramqueue_lookup_envelope(evpid) ||
255 		    ramqueue_lookup_offload(evpid))
256 			continue;
257 
258 		if (! queue_envelope_load(evpid, &envelope)) {
259 			log_debug("scheduler_ramqueue: evp -> /corrupt");
260 			queue_message_corrupt(evpid_to_msgid(evpid));
261 			continue;
262 		}
263 		if (ramqueue_expire(&envelope))
264 			continue;
265 
266 		scheduler_info(&si, &envelope);
267 		scheduler_ramqueue_insert(&si);
268 
269 		log_debug("ramqueue: loading interrupted");
270 		return (0);
271 	}
272 	qwalk_close(q);
273 	q = NULL;
274 	log_debug("ramqueue: loading over");
275 	return (1);
276 }
277 
278 static int
279 scheduler_ramqueue_next(u_int64_t *evpid, time_t *sched)
280 {
281 	struct ramqueue_envelope *rq_evp = NULL;
282 
283 	log_debug("scheduler_ramqueue: next");
284 	TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) {
285 		if (rq_evp->rq_batch->type == D_MDA)
286 			if (env->sc_flags & (SMTPD_MDA_PAUSED|SMTPD_MDA_BUSY))
287 				continue;
288 		if (rq_evp->rq_batch->type == D_MTA)
289 			if (env->sc_flags & (SMTPD_MTA_PAUSED|SMTPD_MTA_BUSY))
290 				continue;
291 		if (evpid)
292 			*evpid = rq_evp->evpid;
293 		if (sched)
294 			*sched = rq_evp->sched;
295 		log_debug("scheduler_ramqueue: next: found");
296 		return 1;
297 	}
298 
299 	log_debug("scheduler_ramqueue: next: nothing schedulable");
300 	return 0;
301 }
302 
303 static void
304 scheduler_ramqueue_insert(struct scheduler_info *si)
305 {
306 	struct ramqueue_host *rq_host;
307 	struct ramqueue_message *rq_msg;
308 	struct ramqueue_batch *rq_batch;
309 	struct ramqueue_envelope *rq_evp, *evp;
310 	u_int32_t msgid;
311 	time_t curtm = time(NULL);
312 
313 	log_debug("scheduler_ramqueue: insert");
314 
315 	rq_evp = ramqueue_lookup_offload(si->evpid);
316 	if (rq_evp) {
317 		rq_msg = rq_evp->rq_msg;
318 		rq_batch = rq_evp->rq_batch;
319 		rq_host = rq_evp->rq_host;
320 		RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp);
321 	}
322 	else {
323 		msgid = evpid_to_msgid(si->evpid);
324 		rq_msg = ramqueue_lookup_message(msgid);
325 		if (rq_msg == NULL)
326 			rq_msg = ramqueue_insert_message(msgid);
327 
328 		rq_host = ramqueue_lookup_host(si->destination);
329 		if (rq_host == NULL)
330 			rq_host = ramqueue_insert_host(si->destination);
331 
332 		rq_batch = ramqueue_lookup_batch(rq_host, msgid);
333 		if (rq_batch == NULL)
334 			rq_batch = ramqueue_insert_batch(rq_host, msgid);
335 
336 		rq_evp = calloc(1, sizeof (*rq_evp));
337 		if (rq_evp == NULL)
338 			fatal("calloc");
339 		rq_evp->evpid = si->evpid;
340 		rq_batch->evpcnt++;
341 		rq_msg->evpcnt++;
342 	}
343 
344 	rq_evp->sched = ramqueue_next_schedule(si, curtm);
345 	rq_evp->rq_host = rq_host;
346 	rq_evp->rq_batch = rq_batch;
347 	rq_evp->rq_msg = rq_msg;
348 	RB_INSERT(evptree, &rq_msg->evptree, rq_evp);
349 	TAILQ_INSERT_TAIL(&rq_batch->envelope_queue, rq_evp,
350 		    batchqueue_entry);
351 
352 	/* sorted insert */
353 	TAILQ_FOREACH(evp, &ramqueue.queue, queue_entry) {
354 		if (evp->sched >= rq_evp->sched) {
355 			TAILQ_INSERT_BEFORE(evp, rq_evp, queue_entry);
356 			break;
357 		}
358 	}
359 	if (evp == NULL)
360 		TAILQ_INSERT_TAIL(&ramqueue.queue, rq_evp, queue_entry);
361 
362 	stat_increment(STATS_RAMQUEUE_ENVELOPE);
363 }
364 
365 static void
366 scheduler_ramqueue_schedule(u_int64_t evpid)
367 {
368 	struct ramqueue_envelope *rq_evp;
369 	struct ramqueue_message	 *rq_msg;
370 	struct ramqueue_batch	 *rq_batch;
371 
372 	log_debug("scheduler_ramqueue: schedule");
373 
374 	rq_evp = ramqueue_lookup_envelope(evpid);
375 	rq_msg = rq_evp->rq_msg;
376 	rq_batch = rq_evp->rq_batch;
377 
378 	/* remove from msg tree, batch queue and linear queue */
379 	RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
380 	TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
381 	TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
382 
383 	/* insert into offload tree*/
384 	RB_INSERT(offloadtree, &ramqueue.offloadtree, rq_evp);
385 
386 	/* that's one less envelope to process in the ramqueue */
387 	stat_decrement(STATS_RAMQUEUE_ENVELOPE);
388 }
389 
390 static void
391 scheduler_ramqueue_remove(u_int64_t evpid)
392 {
393 	struct ramqueue_batch *rq_batch;
394 	struct ramqueue_message *rq_msg;
395 	struct ramqueue_envelope *rq_evp;
396 	struct ramqueue_host *rq_host;
397 
398 	log_debug("scheduler_ramqueue: remove");
399 
400 	rq_evp = ramqueue_lookup_offload(evpid);
401 	if (rq_evp) {
402 		RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp);
403 		rq_msg = rq_evp->rq_msg;
404 		rq_batch = rq_evp->rq_batch;
405 		rq_host = rq_evp->rq_host;
406 	}
407 	else {
408 		rq_evp = ramqueue_lookup_envelope(evpid);
409 		rq_msg = rq_evp->rq_msg;
410 		rq_batch = rq_evp->rq_batch;
411 		rq_host = rq_evp->rq_host;
412 
413 		RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
414 		TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
415 		TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
416 		stat_decrement(STATS_RAMQUEUE_ENVELOPE);
417 	}
418 
419 	rq_batch->evpcnt--;
420 	rq_msg->evpcnt--;
421 
422 	/* check if we are the last of a message */
423 	if (rq_msg->evpcnt == 0) {
424 		ramqueue_remove_message(rq_msg);
425 	}
426 
427 	/* check if we are the last of a batch */
428 	if (rq_batch->evpcnt == 0) {
429 		ramqueue_remove_batch(rq_host, rq_batch);
430 	}
431 
432 	/* check if we are the last of a host */
433 	if (TAILQ_FIRST(&rq_host->batch_queue) == NULL) {
434 		ramqueue_remove_host(rq_host);
435 	}
436 
437 	free(rq_evp);
438 }
439 
440 static void *
441 scheduler_ramqueue_host(char *host)
442 {
443 	struct ramqueue_iter *iter;
444 	struct ramqueue_host *rq_host;
445 
446 	rq_host = ramqueue_lookup_host(host);
447 	if (rq_host == NULL)
448 		return NULL;
449 
450 	iter = calloc(1, sizeof *iter);
451 	if (iter == NULL)
452 		err(1, "calloc");
453 
454 	iter->type = RAMQUEUE_ITER_HOST;
455 	iter->u.host = rq_host;
456 
457 	return iter;
458 }
459 
460 static void *
461 scheduler_ramqueue_batch(u_int64_t evpid)
462 {
463 	struct ramqueue_iter *iter;
464 	struct ramqueue_envelope *rq_evp;
465 
466 	rq_evp = ramqueue_lookup_envelope(evpid);
467 	if (rq_evp == NULL)
468 		return NULL;
469 
470 	iter = calloc(1, sizeof *iter);
471 	if (iter == NULL)
472 		err(1, "calloc");
473 
474 	iter->type = RAMQUEUE_ITER_BATCH;
475 	iter->u.batch = rq_evp->rq_batch;
476 
477 	return iter;
478 }
479 
480 static void *
481 scheduler_ramqueue_message(u_int32_t msgid)
482 {
483 	struct ramqueue_iter *iter;
484 	struct ramqueue_message *rq_msg;
485 
486 	rq_msg = ramqueue_lookup_message(msgid);
487 	if (rq_msg == NULL)
488 		return NULL;
489 
490 	iter = calloc(1, sizeof *iter);
491 	if (iter == NULL)
492 		err(1, "calloc");
493 
494 	iter->type = RAMQUEUE_ITER_MESSAGE;
495 	iter->u.message = rq_msg;
496 
497 	return iter;
498 
499 }
500 
501 static void *
502 scheduler_ramqueue_queue(void)
503 {
504 	struct ramqueue_iter *iter;
505 
506 	iter = calloc(1, sizeof *iter);
507 	if (iter == NULL)
508 		err(1, "calloc");
509 
510 	iter->type = RAMQUEUE_ITER_QUEUE;
511 
512 	return iter;
513 }
514 
515 static void
516 scheduler_ramqueue_close(void *hdl)
517 {
518 	free(hdl);
519 }
520 
521 int
522 scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
523 {
524 	struct ramqueue_iter		*iter = hdl;
525 	struct ramqueue_envelope	*rq_evp;
526 	struct ramqueue_batch		*rq_batch;
527 
528 	switch (iter->type) {
529 	case RAMQUEUE_ITER_HOST:
530 		rq_batch = TAILQ_FIRST(&iter->u.host->batch_queue);
531 		if (rq_batch == NULL)
532 			break;
533 		rq_evp = TAILQ_FIRST(&rq_batch->envelope_queue);
534 		if (rq_evp == NULL)
535 			break;
536 		*evpid = rq_evp->evpid;
537 		return 1;
538 
539 	case RAMQUEUE_ITER_BATCH:
540 		rq_evp = TAILQ_FIRST(&iter->u.batch->envelope_queue);
541 		if (rq_evp == NULL)
542 			break;
543 		*evpid = rq_evp->evpid;
544 		return 1;
545 
546 	case RAMQUEUE_ITER_MESSAGE:
547 		rq_evp = RB_ROOT(&iter->u.message->evptree);
548 		if (rq_evp == NULL)
549 			break;
550 		*evpid = rq_evp->evpid;
551 		return 1;
552 
553 	case RAMQUEUE_ITER_QUEUE:
554 		rq_evp = TAILQ_FIRST(&ramqueue.queue);
555 		if (rq_evp == NULL)
556 			break;
557 		*evpid = rq_evp->evpid;
558 		return 1;
559 	}
560 
561 	return 0;
562 }
563 
564 static int
565 scheduler_ramqueue_force(u_int64_t id)
566 {
567 	struct ramqueue_envelope	*rq_evp;
568 	struct ramqueue_message		*rq_msg;
569 	int	ret;
570 
571 	/* schedule *all* */
572 	if (id == 0) {
573 		ret = 0;
574 		TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) {
575 			rq_evp->sched = 0;
576 			ret++;
577 		}
578 		return ret;
579 	}
580 
581 	/* scheduling by evpid */
582 	if (id > 0xffffffffL) {
583 		rq_evp = ramqueue_lookup_envelope(id);
584 		if (rq_evp == NULL)
585 			return 0;
586 
587 		rq_evp->sched = 0;
588 		TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
589 		TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry);
590 		return 1;
591 	}
592 
593 	rq_msg = ramqueue_lookup_message(id);
594 	if (rq_msg == NULL)
595 		return 0;
596 
597 	/* scheduling by msgid */
598 	ret = 0;
599 	RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) {
600 		rq_evp->sched = 0;
601 		TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
602 		TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry);
603 		ret++;
604 	}
605 	return ret;
606 }
607 
608 static struct ramqueue_host *
609 ramqueue_lookup_host(char *host)
610 {
611 	struct ramqueue_host hostkey;
612 
613 	strlcpy(hostkey.hostname, host, sizeof(hostkey.hostname));
614 	return RB_FIND(hosttree, &ramqueue.hosttree, &hostkey);
615 }
616 
617 static struct ramqueue_message *
618 ramqueue_lookup_message(u_int32_t msgid)
619 {
620 	struct ramqueue_message msgkey;
621 
622 	msgkey.msgid = msgid;
623 	return RB_FIND(msgtree, &ramqueue.msgtree, &msgkey);
624 }
625 
626 static struct ramqueue_envelope *
627 ramqueue_lookup_offload(u_int64_t evpid)
628 {
629 	struct ramqueue_envelope evpkey;
630 
631 	evpkey.evpid = evpid;
632 	return RB_FIND(offloadtree, &ramqueue.offloadtree, &evpkey);
633 }
634 
635 static struct ramqueue_envelope *
636 ramqueue_lookup_envelope(u_int64_t evpid)
637 {
638 	struct ramqueue_message *rq_msg;
639 	struct ramqueue_envelope evpkey;
640 
641 	rq_msg = ramqueue_lookup_message(evpid_to_msgid(evpid));
642 	if (rq_msg == NULL)
643 		return NULL;
644 
645 	evpkey.evpid = evpid;
646 	return RB_FIND(evptree, &rq_msg->evptree, &evpkey);
647 }
648 
649 static struct ramqueue_batch *
650 ramqueue_lookup_batch(struct ramqueue_host *rq_host, u_int32_t msgid)
651 {
652 	struct ramqueue_batch *rq_batch;
653 
654 	TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) {
655 		if (rq_batch->msgid == msgid)
656 			return rq_batch;
657 	}
658 
659 	return NULL;
660 }
661 
662 static int
663 ramqueue_expire(struct envelope *envelope)
664 {
665 	struct envelope bounce;
666 	struct scheduler_info	si;
667 	time_t	curtm;
668 
669 	curtm = time(NULL);
670 	if (curtm - envelope->creation >= envelope->expire) {
671 		envelope_set_errormsg(envelope,
672 		    "message expired after sitting in queue for %d days",
673 		    envelope->expire / 60 / 60 / 24);
674 		bounce_record_message(envelope, &bounce);
675 
676 		scheduler_info(&si, &bounce);
677 		scheduler_ramqueue_insert(&si);
678 
679 		log_debug("#### %s: queue_envelope_delete: %016" PRIx64,
680 		    __func__, envelope->id);
681 		queue_envelope_delete(envelope);
682 		return 1;
683 	}
684 	return 0;
685 }
686 
687 static time_t
688 ramqueue_next_schedule(struct scheduler_info *si, time_t curtm)
689 {
690 	time_t delay;
691 
692 	if (si->lasttry == 0)
693 		return curtm;
694 
695 	delay = SMTPD_QUEUE_MAXINTERVAL;
696 
697 	if (si->type == D_MDA ||
698 	    si->type == D_BOUNCE) {
699 		if (si->retry < 5)
700 			return curtm;
701 
702 		if (si->retry < 15)
703 			delay = (si->retry * 60) + arc4random_uniform(60);
704 	}
705 
706 	if (si->type == D_MTA) {
707 		if (si->retry < 3)
708 			delay = SMTPD_QUEUE_INTERVAL;
709 		else if (si->retry <= 7) {
710 			delay = SMTPD_QUEUE_INTERVAL * (1 << (si->retry - 3));
711 			if (delay > SMTPD_QUEUE_MAXINTERVAL)
712 				delay = SMTPD_QUEUE_MAXINTERVAL;
713 		}
714 	}
715 
716 	if (curtm >= si->lasttry + delay)
717 		return curtm;
718 
719 	return curtm + delay;
720 }
721 
722 static struct ramqueue_message *
723 ramqueue_insert_message(u_int32_t msgid)
724 {
725 	struct ramqueue_message *rq_msg;
726 
727 	rq_msg = calloc(1, sizeof (*rq_msg));
728 	if (rq_msg == NULL)
729 		fatal("calloc");
730 	rq_msg->msgid = msgid;
731 	RB_INSERT(msgtree, &ramqueue.msgtree, rq_msg);
732 	RB_INIT(&rq_msg->evptree);
733 	stat_increment(STATS_RAMQUEUE_MESSAGE);
734 
735 	return rq_msg;
736 }
737 
738 static struct ramqueue_host *
739 ramqueue_insert_host(char *host)
740 {
741 	struct ramqueue_host *rq_host;
742 
743 	rq_host = calloc(1, sizeof (*rq_host));
744 	if (rq_host == NULL)
745 		fatal("calloc");
746 	strlcpy(rq_host->hostname, host, sizeof(rq_host->hostname));
747 	TAILQ_INIT(&rq_host->batch_queue);
748 	RB_INSERT(hosttree, &ramqueue.hosttree, rq_host);
749 	stat_increment(STATS_RAMQUEUE_HOST);
750 
751 	return rq_host;
752 }
753 
754 static struct ramqueue_batch *
755 ramqueue_insert_batch(struct ramqueue_host *rq_host, u_int32_t msgid)
756 {
757 	struct ramqueue_batch *rq_batch;
758 
759 	rq_batch = calloc(1, sizeof (*rq_batch));
760 	if (rq_batch == NULL)
761 		fatal("calloc");
762 	rq_batch->b_id = generate_uid();
763 	rq_batch->rq_host = rq_host;
764 	rq_batch->msgid = msgid;
765 
766 	TAILQ_INIT(&rq_batch->envelope_queue);
767 	TAILQ_INSERT_TAIL(&rq_host->batch_queue, rq_batch, batch_entry);
768 
769 	stat_increment(STATS_RAMQUEUE_BATCH);
770 
771 	return rq_batch;
772 }
773 
774 static void
775 ramqueue_remove_host(struct ramqueue_host *rq_host)
776 {
777 	RB_REMOVE(hosttree, &ramqueue.hosttree, rq_host);
778 	free(rq_host);
779 	stat_decrement(STATS_RAMQUEUE_HOST);
780 }
781 
782 static void
783 ramqueue_remove_message(struct ramqueue_message *rq_msg)
784 {
785 	RB_REMOVE(msgtree, &ramqueue.msgtree, rq_msg);
786 	free(rq_msg);
787 	stat_decrement(STATS_RAMQUEUE_MESSAGE);
788 }
789 
790 
791 static void
792 ramqueue_remove_batch(struct ramqueue_host *rq_host,
793     struct ramqueue_batch *rq_batch)
794 {
795 	TAILQ_REMOVE(&rq_host->batch_queue, rq_batch, batch_entry);
796 	free(rq_batch);
797 	stat_decrement(STATS_RAMQUEUE_BATCH);
798 }
799 
800 static int
801 ramqueue_host_cmp(struct ramqueue_host *h1, struct ramqueue_host *h2)
802 {
803 	return strcmp(h1->hostname, h2->hostname);
804 }
805 
806 
807 static int
808 ramqueue_msg_cmp(struct ramqueue_message *m1, struct ramqueue_message *m2)
809 {
810 	return (m1->msgid < m2->msgid ? -1 : m1->msgid > m2->msgid);
811 }
812 
813 static int
814 ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2)
815 {
816 	return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid);
817 }
818 
819 RB_GENERATE(hosttree,    ramqueue_host, hosttree_entry, ramqueue_host_cmp);
820 RB_GENERATE(msgtree,     ramqueue_message, msgtree_entry, ramqueue_msg_cmp);
821 RB_GENERATE(evptree,     ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
822 RB_GENERATE(offloadtree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
823