1 /* $OpenBSD: scheduler_ramqueue.c,v 1.11 2012/07/10 11:13:40 gilles Exp $ */ 2 3 /* 4 * Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/param.h> 23 #include <sys/socket.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <event.h> 28 #include <fcntl.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <stdio.h> 32 #include <stdlib.h> 33 #include <string.h> 34 #include <time.h> 35 36 #include "smtpd.h" 37 #include "log.h" 38 39 40 struct ramqueue_host { 41 RB_ENTRY(ramqueue_host) hosttree_entry; 42 TAILQ_HEAD(,ramqueue_batch) batch_queue; 43 char hostname[MAXHOSTNAMELEN]; 44 }; 45 struct ramqueue_batch { 46 enum delivery_type type; 47 TAILQ_ENTRY(ramqueue_batch) batch_entry; 48 TAILQ_HEAD(,ramqueue_envelope) envelope_queue; 49 struct ramqueue_host *rq_host; 50 u_int64_t b_id; 51 u_int32_t msgid; 52 u_int32_t evpcnt; 53 }; 54 struct ramqueue_envelope { 55 TAILQ_ENTRY(ramqueue_envelope) queue_entry; 56 TAILQ_ENTRY(ramqueue_envelope) batchqueue_entry; 57 RB_ENTRY(ramqueue_envelope) evptree_entry; 58 struct ramqueue_batch *rq_batch; 59 struct ramqueue_message *rq_msg; 60 struct ramqueue_host *rq_host; 61 u_int64_t evpid; 62 time_t sched; 63 }; 64 struct ramqueue_message { 65 RB_ENTRY(ramqueue_message) msgtree_entry; 66 RB_HEAD(evptree, ramqueue_envelope) evptree; 67 u_int32_t msgid; 68 u_int32_t evpcnt; 69 }; 70 struct ramqueue { 71 RB_HEAD(hosttree, ramqueue_host) hosttree; 72 RB_HEAD(msgtree, ramqueue_message) msgtree; 73 RB_HEAD(offloadtree, ramqueue_envelope) offloadtree; 74 TAILQ_HEAD(,ramqueue_envelope) queue; 75 }; 76 77 RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); 78 RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp); 79 RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); 80 RB_PROTOTYPE(offloadtree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); 81 82 enum ramqueue_iter_type { 83 RAMQUEUE_ITER_HOST, 84 RAMQUEUE_ITER_BATCH, 85 RAMQUEUE_ITER_MESSAGE, 86 RAMQUEUE_ITER_QUEUE 87 }; 88 89 struct ramqueue_iter { 90 enum ramqueue_iter_type type; 91 union { 92 struct ramqueue_host *host; 93 struct ramqueue_batch *batch; 94 struct ramqueue_message *message; 95 } u; 96 }; 97 98 99 static int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *); 100 static int ramqueue_msg_cmp(struct ramqueue_message *, struct ramqueue_message *); 101 static int ramqueue_evp_cmp(struct ramqueue_envelope *, struct ramqueue_envelope *); 102 static struct ramqueue_host *ramqueue_lookup_host(char *); 103 static struct ramqueue_host *ramqueue_insert_host(char *); 104 static void ramqueue_remove_host(struct ramqueue_host *); 105 static struct ramqueue_batch *ramqueue_lookup_batch(struct ramqueue_host *, 106 u_int32_t); 107 static struct ramqueue_batch *ramqueue_insert_batch(struct ramqueue_host *, 108 u_int32_t); 109 static void ramqueue_remove_batch(struct ramqueue_host *, struct ramqueue_batch *); 110 static struct ramqueue_message *ramqueue_lookup_message(u_int32_t); 111 static struct ramqueue_message *ramqueue_insert_message(u_int32_t); 112 static void ramqueue_remove_message(struct ramqueue_message *); 113 114 static struct ramqueue_envelope *ramqueue_lookup_envelope(u_int64_t); 115 static struct ramqueue_envelope *ramqueue_lookup_offload(u_int64_t); 116 117 118 /*NEEDSFIX*/ 119 static int ramqueue_expire(struct envelope *); 120 static time_t ramqueue_next_schedule(struct scheduler_info *, time_t); 121 122 static void scheduler_ramqueue_init(void); 123 static int scheduler_ramqueue_setup(void); 124 static int scheduler_ramqueue_next(u_int64_t *, time_t *); 125 static void scheduler_ramqueue_insert(struct scheduler_info *); 126 static void scheduler_ramqueue_schedule(u_int64_t); 127 static void scheduler_ramqueue_remove(u_int64_t); 128 static void *scheduler_ramqueue_host(char *); 129 static void *scheduler_ramqueue_message(u_int32_t); 130 static void *scheduler_ramqueue_batch(u_int64_t); 131 static void *scheduler_ramqueue_queue(void); 132 static void scheduler_ramqueue_close(void *); 133 static int scheduler_ramqueue_fetch(void *, u_int64_t *); 134 static int scheduler_ramqueue_force(u_int64_t); 135 static void scheduler_ramqueue_display(void); 136 137 struct scheduler_backend scheduler_backend_ramqueue = { 138 scheduler_ramqueue_init, 139 scheduler_ramqueue_setup, 140 scheduler_ramqueue_next, 141 scheduler_ramqueue_insert, 142 scheduler_ramqueue_schedule, 143 scheduler_ramqueue_remove, 144 scheduler_ramqueue_host, 145 scheduler_ramqueue_message, 146 scheduler_ramqueue_batch, 147 scheduler_ramqueue_queue, 148 scheduler_ramqueue_close, 149 scheduler_ramqueue_fetch, 150 scheduler_ramqueue_force, 151 scheduler_ramqueue_display 152 }; 153 static struct ramqueue ramqueue; 154 155 static void 156 scheduler_ramqueue_display_hosttree(void) 157 { 158 struct ramqueue_host *rq_host; 159 struct ramqueue_batch *rq_batch; 160 struct ramqueue_envelope *rq_evp; 161 162 log_debug("\tscheduler_ramqueue: hosttree display"); 163 RB_FOREACH(rq_host, hosttree, &ramqueue.hosttree) { 164 log_debug("\t\thost: [%p] %s", rq_host, rq_host->hostname); 165 TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) { 166 log_debug("\t\t\tbatch: [%p] %016x", 167 rq_batch, rq_batch->msgid); 168 TAILQ_FOREACH(rq_evp, &rq_batch->envelope_queue, 169 batchqueue_entry) { 170 log_debug("\t\t\t\tevpid: [%p] %016"PRIx64, 171 rq_evp, rq_evp->evpid); 172 } 173 } 174 } 175 } 176 177 static void 178 scheduler_ramqueue_display_msgtree(void) 179 { 180 struct ramqueue_message *rq_msg; 181 struct ramqueue_envelope *rq_evp; 182 183 log_debug("\tscheduler_ramqueue: msgtree display"); 184 RB_FOREACH(rq_msg, msgtree, &ramqueue.msgtree) { 185 log_debug("\t\tmsg: [%p] %016x", rq_msg, rq_msg->msgid); 186 RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) { 187 log_debug("\t\t\tevp: [%p] %016"PRIx64, 188 rq_evp, rq_evp->evpid); 189 } 190 } 191 } 192 193 static void 194 scheduler_ramqueue_display_offloadtree(void) 195 { 196 struct ramqueue_envelope *rq_evp; 197 198 log_debug("\tscheduler_ramqueue: offloadtree display"); 199 RB_FOREACH(rq_evp, offloadtree, &ramqueue.offloadtree) { 200 log_debug("\t\t\tevp: [%p] %016"PRIx64, 201 rq_evp, rq_evp->evpid); 202 } 203 } 204 205 static void 206 scheduler_ramqueue_display_queue(void) 207 { 208 struct ramqueue_envelope *rq_evp; 209 210 log_debug("\tscheduler_ramqueue: queue display"); 211 TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) { 212 log_debug("\t\tevpid: [%p] [batch: %p], %016"PRIx64, 213 rq_evp, rq_evp->rq_batch, rq_evp->evpid); 214 } 215 } 216 217 static void 218 scheduler_ramqueue_display(void) 219 { 220 log_debug("scheduler_ramqueue: display"); 221 scheduler_ramqueue_display_hosttree(); 222 scheduler_ramqueue_display_msgtree(); 223 scheduler_ramqueue_display_offloadtree(); 224 scheduler_ramqueue_display_queue(); 225 } 226 227 static void 228 scheduler_ramqueue_init(void) 229 { 230 log_debug("scheduler_ramqueue: init"); 231 bzero(&ramqueue, sizeof (ramqueue)); 232 TAILQ_INIT(&ramqueue.queue); 233 RB_INIT(&ramqueue.hosttree); 234 RB_INIT(&ramqueue.msgtree); 235 RB_INIT(&ramqueue.offloadtree); 236 } 237 238 static int 239 scheduler_ramqueue_setup(void) 240 { 241 struct envelope envelope; 242 static struct qwalk *q = NULL; 243 u_int64_t evpid; 244 struct scheduler_info si; 245 246 log_debug("scheduler_ramqueue: load"); 247 248 log_info("scheduler_ramqueue: queue loading in progress"); 249 if (q == NULL) 250 q = qwalk_new(0); 251 252 while (qwalk(q, &evpid)) { 253 /* the envelope is already in ramqueue, skip */ 254 if (ramqueue_lookup_envelope(evpid) || 255 ramqueue_lookup_offload(evpid)) 256 continue; 257 258 if (! queue_envelope_load(evpid, &envelope)) { 259 log_debug("scheduler_ramqueue: evp -> /corrupt"); 260 queue_message_corrupt(evpid_to_msgid(evpid)); 261 continue; 262 } 263 if (ramqueue_expire(&envelope)) 264 continue; 265 266 scheduler_info(&si, &envelope); 267 scheduler_ramqueue_insert(&si); 268 269 log_debug("ramqueue: loading interrupted"); 270 return (0); 271 } 272 qwalk_close(q); 273 q = NULL; 274 log_debug("ramqueue: loading over"); 275 return (1); 276 } 277 278 static int 279 scheduler_ramqueue_next(u_int64_t *evpid, time_t *sched) 280 { 281 struct ramqueue_envelope *rq_evp = NULL; 282 283 log_debug("scheduler_ramqueue: next"); 284 TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) { 285 if (rq_evp->rq_batch->type == D_MDA) 286 if (env->sc_flags & (SMTPD_MDA_PAUSED|SMTPD_MDA_BUSY)) 287 continue; 288 if (rq_evp->rq_batch->type == D_MTA) 289 if (env->sc_flags & (SMTPD_MTA_PAUSED|SMTPD_MTA_BUSY)) 290 continue; 291 if (evpid) 292 *evpid = rq_evp->evpid; 293 if (sched) 294 *sched = rq_evp->sched; 295 log_debug("scheduler_ramqueue: next: found"); 296 return 1; 297 } 298 299 log_debug("scheduler_ramqueue: next: nothing schedulable"); 300 return 0; 301 } 302 303 static void 304 scheduler_ramqueue_insert(struct scheduler_info *si) 305 { 306 struct ramqueue_host *rq_host; 307 struct ramqueue_message *rq_msg; 308 struct ramqueue_batch *rq_batch; 309 struct ramqueue_envelope *rq_evp, *evp; 310 u_int32_t msgid; 311 time_t curtm = time(NULL); 312 313 log_debug("scheduler_ramqueue: insert"); 314 315 rq_evp = ramqueue_lookup_offload(si->evpid); 316 if (rq_evp) { 317 rq_msg = rq_evp->rq_msg; 318 rq_batch = rq_evp->rq_batch; 319 rq_host = rq_evp->rq_host; 320 RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp); 321 } 322 else { 323 msgid = evpid_to_msgid(si->evpid); 324 rq_msg = ramqueue_lookup_message(msgid); 325 if (rq_msg == NULL) 326 rq_msg = ramqueue_insert_message(msgid); 327 328 rq_host = ramqueue_lookup_host(si->destination); 329 if (rq_host == NULL) 330 rq_host = ramqueue_insert_host(si->destination); 331 332 rq_batch = ramqueue_lookup_batch(rq_host, msgid); 333 if (rq_batch == NULL) 334 rq_batch = ramqueue_insert_batch(rq_host, msgid); 335 336 rq_evp = calloc(1, sizeof (*rq_evp)); 337 if (rq_evp == NULL) 338 fatal("calloc"); 339 rq_evp->evpid = si->evpid; 340 rq_batch->evpcnt++; 341 rq_msg->evpcnt++; 342 } 343 344 rq_evp->sched = ramqueue_next_schedule(si, curtm); 345 rq_evp->rq_host = rq_host; 346 rq_evp->rq_batch = rq_batch; 347 rq_evp->rq_msg = rq_msg; 348 RB_INSERT(evptree, &rq_msg->evptree, rq_evp); 349 TAILQ_INSERT_TAIL(&rq_batch->envelope_queue, rq_evp, 350 batchqueue_entry); 351 352 /* sorted insert */ 353 TAILQ_FOREACH(evp, &ramqueue.queue, queue_entry) { 354 if (evp->sched >= rq_evp->sched) { 355 TAILQ_INSERT_BEFORE(evp, rq_evp, queue_entry); 356 break; 357 } 358 } 359 if (evp == NULL) 360 TAILQ_INSERT_TAIL(&ramqueue.queue, rq_evp, queue_entry); 361 362 stat_increment(STATS_RAMQUEUE_ENVELOPE); 363 } 364 365 static void 366 scheduler_ramqueue_schedule(u_int64_t evpid) 367 { 368 struct ramqueue_envelope *rq_evp; 369 struct ramqueue_message *rq_msg; 370 struct ramqueue_batch *rq_batch; 371 372 log_debug("scheduler_ramqueue: schedule"); 373 374 rq_evp = ramqueue_lookup_envelope(evpid); 375 rq_msg = rq_evp->rq_msg; 376 rq_batch = rq_evp->rq_batch; 377 378 /* remove from msg tree, batch queue and linear queue */ 379 RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); 380 TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry); 381 TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); 382 383 /* insert into offload tree*/ 384 RB_INSERT(offloadtree, &ramqueue.offloadtree, rq_evp); 385 386 /* that's one less envelope to process in the ramqueue */ 387 stat_decrement(STATS_RAMQUEUE_ENVELOPE); 388 } 389 390 static void 391 scheduler_ramqueue_remove(u_int64_t evpid) 392 { 393 struct ramqueue_batch *rq_batch; 394 struct ramqueue_message *rq_msg; 395 struct ramqueue_envelope *rq_evp; 396 struct ramqueue_host *rq_host; 397 398 log_debug("scheduler_ramqueue: remove"); 399 400 rq_evp = ramqueue_lookup_offload(evpid); 401 if (rq_evp) { 402 RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp); 403 rq_msg = rq_evp->rq_msg; 404 rq_batch = rq_evp->rq_batch; 405 rq_host = rq_evp->rq_host; 406 } 407 else { 408 rq_evp = ramqueue_lookup_envelope(evpid); 409 rq_msg = rq_evp->rq_msg; 410 rq_batch = rq_evp->rq_batch; 411 rq_host = rq_evp->rq_host; 412 413 RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); 414 TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry); 415 TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); 416 stat_decrement(STATS_RAMQUEUE_ENVELOPE); 417 } 418 419 rq_batch->evpcnt--; 420 rq_msg->evpcnt--; 421 422 /* check if we are the last of a message */ 423 if (rq_msg->evpcnt == 0) { 424 ramqueue_remove_message(rq_msg); 425 } 426 427 /* check if we are the last of a batch */ 428 if (rq_batch->evpcnt == 0) { 429 ramqueue_remove_batch(rq_host, rq_batch); 430 } 431 432 /* check if we are the last of a host */ 433 if (TAILQ_FIRST(&rq_host->batch_queue) == NULL) { 434 ramqueue_remove_host(rq_host); 435 } 436 437 free(rq_evp); 438 } 439 440 static void * 441 scheduler_ramqueue_host(char *host) 442 { 443 struct ramqueue_iter *iter; 444 struct ramqueue_host *rq_host; 445 446 rq_host = ramqueue_lookup_host(host); 447 if (rq_host == NULL) 448 return NULL; 449 450 iter = calloc(1, sizeof *iter); 451 if (iter == NULL) 452 err(1, "calloc"); 453 454 iter->type = RAMQUEUE_ITER_HOST; 455 iter->u.host = rq_host; 456 457 return iter; 458 } 459 460 static void * 461 scheduler_ramqueue_batch(u_int64_t evpid) 462 { 463 struct ramqueue_iter *iter; 464 struct ramqueue_envelope *rq_evp; 465 466 rq_evp = ramqueue_lookup_envelope(evpid); 467 if (rq_evp == NULL) 468 return NULL; 469 470 iter = calloc(1, sizeof *iter); 471 if (iter == NULL) 472 err(1, "calloc"); 473 474 iter->type = RAMQUEUE_ITER_BATCH; 475 iter->u.batch = rq_evp->rq_batch; 476 477 return iter; 478 } 479 480 static void * 481 scheduler_ramqueue_message(u_int32_t msgid) 482 { 483 struct ramqueue_iter *iter; 484 struct ramqueue_message *rq_msg; 485 486 rq_msg = ramqueue_lookup_message(msgid); 487 if (rq_msg == NULL) 488 return NULL; 489 490 iter = calloc(1, sizeof *iter); 491 if (iter == NULL) 492 err(1, "calloc"); 493 494 iter->type = RAMQUEUE_ITER_MESSAGE; 495 iter->u.message = rq_msg; 496 497 return iter; 498 499 } 500 501 static void * 502 scheduler_ramqueue_queue(void) 503 { 504 struct ramqueue_iter *iter; 505 506 iter = calloc(1, sizeof *iter); 507 if (iter == NULL) 508 err(1, "calloc"); 509 510 iter->type = RAMQUEUE_ITER_QUEUE; 511 512 return iter; 513 } 514 515 static void 516 scheduler_ramqueue_close(void *hdl) 517 { 518 free(hdl); 519 } 520 521 int 522 scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid) 523 { 524 struct ramqueue_iter *iter = hdl; 525 struct ramqueue_envelope *rq_evp; 526 struct ramqueue_batch *rq_batch; 527 528 switch (iter->type) { 529 case RAMQUEUE_ITER_HOST: 530 rq_batch = TAILQ_FIRST(&iter->u.host->batch_queue); 531 if (rq_batch == NULL) 532 break; 533 rq_evp = TAILQ_FIRST(&rq_batch->envelope_queue); 534 if (rq_evp == NULL) 535 break; 536 *evpid = rq_evp->evpid; 537 return 1; 538 539 case RAMQUEUE_ITER_BATCH: 540 rq_evp = TAILQ_FIRST(&iter->u.batch->envelope_queue); 541 if (rq_evp == NULL) 542 break; 543 *evpid = rq_evp->evpid; 544 return 1; 545 546 case RAMQUEUE_ITER_MESSAGE: 547 rq_evp = RB_ROOT(&iter->u.message->evptree); 548 if (rq_evp == NULL) 549 break; 550 *evpid = rq_evp->evpid; 551 return 1; 552 553 case RAMQUEUE_ITER_QUEUE: 554 rq_evp = TAILQ_FIRST(&ramqueue.queue); 555 if (rq_evp == NULL) 556 break; 557 *evpid = rq_evp->evpid; 558 return 1; 559 } 560 561 return 0; 562 } 563 564 static int 565 scheduler_ramqueue_force(u_int64_t id) 566 { 567 struct ramqueue_envelope *rq_evp; 568 struct ramqueue_message *rq_msg; 569 int ret; 570 571 /* schedule *all* */ 572 if (id == 0) { 573 ret = 0; 574 TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) { 575 rq_evp->sched = 0; 576 ret++; 577 } 578 return ret; 579 } 580 581 /* scheduling by evpid */ 582 if (id > 0xffffffffL) { 583 rq_evp = ramqueue_lookup_envelope(id); 584 if (rq_evp == NULL) 585 return 0; 586 587 rq_evp->sched = 0; 588 TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); 589 TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry); 590 return 1; 591 } 592 593 rq_msg = ramqueue_lookup_message(id); 594 if (rq_msg == NULL) 595 return 0; 596 597 /* scheduling by msgid */ 598 ret = 0; 599 RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) { 600 rq_evp->sched = 0; 601 TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); 602 TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry); 603 ret++; 604 } 605 return ret; 606 } 607 608 static struct ramqueue_host * 609 ramqueue_lookup_host(char *host) 610 { 611 struct ramqueue_host hostkey; 612 613 strlcpy(hostkey.hostname, host, sizeof(hostkey.hostname)); 614 return RB_FIND(hosttree, &ramqueue.hosttree, &hostkey); 615 } 616 617 static struct ramqueue_message * 618 ramqueue_lookup_message(u_int32_t msgid) 619 { 620 struct ramqueue_message msgkey; 621 622 msgkey.msgid = msgid; 623 return RB_FIND(msgtree, &ramqueue.msgtree, &msgkey); 624 } 625 626 static struct ramqueue_envelope * 627 ramqueue_lookup_offload(u_int64_t evpid) 628 { 629 struct ramqueue_envelope evpkey; 630 631 evpkey.evpid = evpid; 632 return RB_FIND(offloadtree, &ramqueue.offloadtree, &evpkey); 633 } 634 635 static struct ramqueue_envelope * 636 ramqueue_lookup_envelope(u_int64_t evpid) 637 { 638 struct ramqueue_message *rq_msg; 639 struct ramqueue_envelope evpkey; 640 641 rq_msg = ramqueue_lookup_message(evpid_to_msgid(evpid)); 642 if (rq_msg == NULL) 643 return NULL; 644 645 evpkey.evpid = evpid; 646 return RB_FIND(evptree, &rq_msg->evptree, &evpkey); 647 } 648 649 static struct ramqueue_batch * 650 ramqueue_lookup_batch(struct ramqueue_host *rq_host, u_int32_t msgid) 651 { 652 struct ramqueue_batch *rq_batch; 653 654 TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) { 655 if (rq_batch->msgid == msgid) 656 return rq_batch; 657 } 658 659 return NULL; 660 } 661 662 static int 663 ramqueue_expire(struct envelope *envelope) 664 { 665 struct envelope bounce; 666 struct scheduler_info si; 667 time_t curtm; 668 669 curtm = time(NULL); 670 if (curtm - envelope->creation >= envelope->expire) { 671 envelope_set_errormsg(envelope, 672 "message expired after sitting in queue for %d days", 673 envelope->expire / 60 / 60 / 24); 674 bounce_record_message(envelope, &bounce); 675 676 scheduler_info(&si, &bounce); 677 scheduler_ramqueue_insert(&si); 678 679 log_debug("#### %s: queue_envelope_delete: %016" PRIx64, 680 __func__, envelope->id); 681 queue_envelope_delete(envelope); 682 return 1; 683 } 684 return 0; 685 } 686 687 static time_t 688 ramqueue_next_schedule(struct scheduler_info *si, time_t curtm) 689 { 690 time_t delay; 691 692 if (si->lasttry == 0) 693 return curtm; 694 695 delay = SMTPD_QUEUE_MAXINTERVAL; 696 697 if (si->type == D_MDA || 698 si->type == D_BOUNCE) { 699 if (si->retry < 5) 700 return curtm; 701 702 if (si->retry < 15) 703 delay = (si->retry * 60) + arc4random_uniform(60); 704 } 705 706 if (si->type == D_MTA) { 707 if (si->retry < 3) 708 delay = SMTPD_QUEUE_INTERVAL; 709 else if (si->retry <= 7) { 710 delay = SMTPD_QUEUE_INTERVAL * (1 << (si->retry - 3)); 711 if (delay > SMTPD_QUEUE_MAXINTERVAL) 712 delay = SMTPD_QUEUE_MAXINTERVAL; 713 } 714 } 715 716 if (curtm >= si->lasttry + delay) 717 return curtm; 718 719 return curtm + delay; 720 } 721 722 static struct ramqueue_message * 723 ramqueue_insert_message(u_int32_t msgid) 724 { 725 struct ramqueue_message *rq_msg; 726 727 rq_msg = calloc(1, sizeof (*rq_msg)); 728 if (rq_msg == NULL) 729 fatal("calloc"); 730 rq_msg->msgid = msgid; 731 RB_INSERT(msgtree, &ramqueue.msgtree, rq_msg); 732 RB_INIT(&rq_msg->evptree); 733 stat_increment(STATS_RAMQUEUE_MESSAGE); 734 735 return rq_msg; 736 } 737 738 static struct ramqueue_host * 739 ramqueue_insert_host(char *host) 740 { 741 struct ramqueue_host *rq_host; 742 743 rq_host = calloc(1, sizeof (*rq_host)); 744 if (rq_host == NULL) 745 fatal("calloc"); 746 strlcpy(rq_host->hostname, host, sizeof(rq_host->hostname)); 747 TAILQ_INIT(&rq_host->batch_queue); 748 RB_INSERT(hosttree, &ramqueue.hosttree, rq_host); 749 stat_increment(STATS_RAMQUEUE_HOST); 750 751 return rq_host; 752 } 753 754 static struct ramqueue_batch * 755 ramqueue_insert_batch(struct ramqueue_host *rq_host, u_int32_t msgid) 756 { 757 struct ramqueue_batch *rq_batch; 758 759 rq_batch = calloc(1, sizeof (*rq_batch)); 760 if (rq_batch == NULL) 761 fatal("calloc"); 762 rq_batch->b_id = generate_uid(); 763 rq_batch->rq_host = rq_host; 764 rq_batch->msgid = msgid; 765 766 TAILQ_INIT(&rq_batch->envelope_queue); 767 TAILQ_INSERT_TAIL(&rq_host->batch_queue, rq_batch, batch_entry); 768 769 stat_increment(STATS_RAMQUEUE_BATCH); 770 771 return rq_batch; 772 } 773 774 static void 775 ramqueue_remove_host(struct ramqueue_host *rq_host) 776 { 777 RB_REMOVE(hosttree, &ramqueue.hosttree, rq_host); 778 free(rq_host); 779 stat_decrement(STATS_RAMQUEUE_HOST); 780 } 781 782 static void 783 ramqueue_remove_message(struct ramqueue_message *rq_msg) 784 { 785 RB_REMOVE(msgtree, &ramqueue.msgtree, rq_msg); 786 free(rq_msg); 787 stat_decrement(STATS_RAMQUEUE_MESSAGE); 788 } 789 790 791 static void 792 ramqueue_remove_batch(struct ramqueue_host *rq_host, 793 struct ramqueue_batch *rq_batch) 794 { 795 TAILQ_REMOVE(&rq_host->batch_queue, rq_batch, batch_entry); 796 free(rq_batch); 797 stat_decrement(STATS_RAMQUEUE_BATCH); 798 } 799 800 static int 801 ramqueue_host_cmp(struct ramqueue_host *h1, struct ramqueue_host *h2) 802 { 803 return strcmp(h1->hostname, h2->hostname); 804 } 805 806 807 static int 808 ramqueue_msg_cmp(struct ramqueue_message *m1, struct ramqueue_message *m2) 809 { 810 return (m1->msgid < m2->msgid ? -1 : m1->msgid > m2->msgid); 811 } 812 813 static int 814 ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2) 815 { 816 return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid); 817 } 818 819 RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); 820 RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp); 821 RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); 822 RB_GENERATE(offloadtree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); 823