1 /* $OpenBSD: scheduler_ramqueue.c,v 1.40 2014/07/10 14:45:02 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 6 * 7 * Permission to use, copy, modify, and distribute this software for any 8 * purpose with or without fee is hereby granted, provided that the above 9 * copyright notice and this permission notice appear in all copies. 10 * 11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 18 */ 19 20 #include <sys/types.h> 21 #include <sys/queue.h> 22 #include <sys/tree.h> 23 #include <sys/socket.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <event.h> 28 #include <fcntl.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <stdio.h> 32 #include <stdlib.h> 33 #include <string.h> 34 #include <time.h> 35 36 #include "smtpd.h" 37 #include "log.h" 38 39 TAILQ_HEAD(evplist, rq_envelope); 40 41 struct rq_message { 42 uint32_t msgid; 43 struct tree envelopes; 44 }; 45 46 struct rq_envelope { 47 TAILQ_ENTRY(rq_envelope) entry; 48 SPLAY_ENTRY(rq_envelope) t_entry; 49 50 uint64_t evpid; 51 uint64_t holdq; 52 enum delivery_type type; 53 54 #define RQ_EVPSTATE_PENDING 0 55 #define RQ_EVPSTATE_SCHEDULED 1 56 #define RQ_EVPSTATE_INFLIGHT 2 57 #define RQ_EVPSTATE_HELD 3 58 uint8_t state; 59 60 #define RQ_ENVELOPE_EXPIRED 0x01 61 #define RQ_ENVELOPE_REMOVED 0x02 62 #define RQ_ENVELOPE_SUSPEND 0x04 63 #define RQ_ENVELOPE_UPDATE 0x08 64 #define RQ_ENVELOPE_OVERFLOW 0x10 65 uint8_t flags; 66 67 time_t ctime; 68 time_t sched; 69 time_t expire; 70 71 struct rq_message *message; 72 73 time_t t_inflight; 74 time_t t_scheduled; 75 }; 76 77 struct rq_holdq { 78 struct evplist q; 79 size_t count; 80 }; 81 82 struct rq_queue { 83 size_t evpcount; 84 struct tree messages; 85 SPLAY_HEAD(prioqtree, rq_envelope) q_priotree; 86 87 struct evplist q_pending; 88 struct evplist q_inflight; 89 90 struct evplist q_mta; 91 struct evplist q_mda; 92 struct evplist q_bounce; 93 struct evplist q_update; 94 struct evplist q_expired; 95 struct evplist q_removed; 96 }; 97 98 static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *); 99 100 SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); 101 static int scheduler_ram_init(const char *); 102 static int scheduler_ram_insert(struct scheduler_info *); 103 static size_t scheduler_ram_commit(uint32_t); 104 static size_t scheduler_ram_rollback(uint32_t); 105 static int scheduler_ram_update(struct scheduler_info *); 106 static int scheduler_ram_delete(uint64_t); 107 static int scheduler_ram_hold(uint64_t, uint64_t); 108 static int scheduler_ram_release(int, uint64_t, int); 109 static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *); 110 static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t); 111 static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t); 112 static int scheduler_ram_schedule(uint64_t); 113 static int scheduler_ram_remove(uint64_t); 114 static int scheduler_ram_suspend(uint64_t); 115 static int scheduler_ram_resume(uint64_t); 116 117 static void sorted_insert(struct rq_queue *, struct rq_envelope *); 118 119 static void rq_queue_init(struct rq_queue *); 120 static void rq_queue_merge(struct rq_queue *, struct rq_queue *); 121 static void rq_queue_dump(struct rq_queue *, const char *); 122 static void rq_queue_schedule(struct rq_queue *rq); 123 static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *); 124 static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *); 125 static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *); 126 static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *); 127 static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *); 128 static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *); 129 static const char *rq_envelope_to_text(struct rq_envelope *); 130 131 struct scheduler_backend scheduler_backend_ramqueue = { 132 scheduler_ram_init, 133 134 scheduler_ram_insert, 135 scheduler_ram_commit, 136 scheduler_ram_rollback, 137 138 scheduler_ram_update, 139 scheduler_ram_delete, 140 scheduler_ram_hold, 141 scheduler_ram_release, 142 143 scheduler_ram_batch, 144 145 scheduler_ram_messages, 146 scheduler_ram_envelopes, 147 scheduler_ram_schedule, 148 scheduler_ram_remove, 149 scheduler_ram_suspend, 150 scheduler_ram_resume, 151 }; 152 153 static struct rq_queue ramqueue; 154 static struct tree updates; 155 static struct tree holdqs[3]; /* delivery type */ 156 157 static time_t currtime; 158 159 #define BACKOFF_TRANSFER 400 160 #define BACKOFF_DELIVERY 10 161 #define BACKOFF_OVERFLOW 3 162 163 static time_t 164 scheduler_backoff(time_t t0, time_t base, uint32_t step) 165 { 166 return (t0 + base * step * step); 167 } 168 169 static time_t 170 scheduler_next(time_t t0, time_t base, uint32_t step) 171 { 172 time_t t; 173 174 /* XXX be more efficient */ 175 while ((t = scheduler_backoff(t0, base, step)) <= currtime) 176 step++; 177 178 return (t); 179 } 180 181 static int 182 scheduler_ram_init(const char *arg) 183 { 184 rq_queue_init(&ramqueue); 185 tree_init(&updates); 186 tree_init(&holdqs[D_MDA]); 187 tree_init(&holdqs[D_MTA]); 188 tree_init(&holdqs[D_BOUNCE]); 189 190 return (1); 191 } 192 193 static int 194 scheduler_ram_insert(struct scheduler_info *si) 195 { 196 struct rq_queue *update; 197 struct rq_message *message; 198 struct rq_envelope *envelope; 199 uint32_t msgid; 200 201 currtime = time(NULL); 202 203 msgid = evpid_to_msgid(si->evpid); 204 205 /* find/prepare a ramqueue update */ 206 if ((update = tree_get(&updates, msgid)) == NULL) { 207 update = xcalloc(1, sizeof *update, "scheduler_insert"); 208 stat_increment("scheduler.ramqueue.update", 1); 209 rq_queue_init(update); 210 tree_xset(&updates, msgid, update); 211 } 212 213 /* find/prepare the msgtree message in ramqueue update */ 214 if ((message = tree_get(&update->messages, msgid)) == NULL) { 215 message = xcalloc(1, sizeof *message, "scheduler_insert"); 216 message->msgid = msgid; 217 tree_init(&message->envelopes); 218 tree_xset(&update->messages, msgid, message); 219 stat_increment("scheduler.ramqueue.message", 1); 220 } 221 222 /* create envelope in ramqueue message */ 223 envelope = xcalloc(1, sizeof *envelope, "scheduler_insert"); 224 envelope->evpid = si->evpid; 225 envelope->type = si->type; 226 envelope->message = message; 227 envelope->ctime = si->creation; 228 envelope->expire = si->creation + si->expire; 229 envelope->sched = scheduler_backoff(si->creation, 230 (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); 231 tree_xset(&message->envelopes, envelope->evpid, envelope); 232 233 update->evpcount++; 234 stat_increment("scheduler.ramqueue.envelope", 1); 235 236 envelope->state = RQ_EVPSTATE_PENDING; 237 TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry); 238 239 si->nexttry = envelope->sched; 240 241 return (1); 242 } 243 244 static size_t 245 scheduler_ram_commit(uint32_t msgid) 246 { 247 struct rq_queue *update; 248 size_t r; 249 250 currtime = time(NULL); 251 252 update = tree_xpop(&updates, msgid); 253 r = update->evpcount; 254 255 if (verbose & TRACE_SCHEDULER) 256 rq_queue_dump(update, "update to commit"); 257 258 rq_queue_merge(&ramqueue, update); 259 260 if (verbose & TRACE_SCHEDULER) 261 rq_queue_dump(&ramqueue, "resulting queue"); 262 263 rq_queue_schedule(&ramqueue); 264 265 free(update); 266 stat_decrement("scheduler.ramqueue.update", 1); 267 268 return (r); 269 } 270 271 static size_t 272 scheduler_ram_rollback(uint32_t msgid) 273 { 274 struct rq_queue *update; 275 struct rq_envelope *evp; 276 size_t r; 277 278 currtime = time(NULL); 279 280 if ((update = tree_pop(&updates, msgid)) == NULL) 281 return (0); 282 r = update->evpcount; 283 284 while ((evp = TAILQ_FIRST(&update->q_pending))) { 285 TAILQ_REMOVE(&update->q_pending, evp, entry); 286 rq_envelope_delete(update, evp); 287 } 288 289 free(update); 290 stat_decrement("scheduler.ramqueue.update", 1); 291 292 return (r); 293 } 294 295 static int 296 scheduler_ram_update(struct scheduler_info *si) 297 { 298 struct rq_message *msg; 299 struct rq_envelope *evp; 300 uint32_t msgid; 301 302 currtime = time(NULL); 303 304 msgid = evpid_to_msgid(si->evpid); 305 msg = tree_xget(&ramqueue.messages, msgid); 306 evp = tree_xget(&msg->envelopes, si->evpid); 307 308 /* it *must* be in-flight */ 309 if (evp->state != RQ_EVPSTATE_INFLIGHT) 310 errx(1, "evp:%016" PRIx64 " not in-flight", si->evpid); 311 312 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 313 314 /* 315 * If the envelope was removed while inflight, schedule it for 316 * removal immediately. 317 */ 318 if (evp->flags & RQ_ENVELOPE_REMOVED) { 319 TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry); 320 evp->state = RQ_EVPSTATE_SCHEDULED; 321 evp->t_scheduled = currtime; 322 return (1); 323 } 324 325 evp->sched = scheduler_next(evp->ctime, 326 (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); 327 328 evp->state = RQ_EVPSTATE_PENDING; 329 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 330 sorted_insert(&ramqueue, evp); 331 332 si->nexttry = evp->sched; 333 334 return (1); 335 } 336 337 static int 338 scheduler_ram_delete(uint64_t evpid) 339 { 340 struct rq_message *msg; 341 struct rq_envelope *evp; 342 uint32_t msgid; 343 344 currtime = time(NULL); 345 346 msgid = evpid_to_msgid(evpid); 347 msg = tree_xget(&ramqueue.messages, msgid); 348 evp = tree_xget(&msg->envelopes, evpid); 349 350 /* it *must* be in-flight */ 351 if (evp->state != RQ_EVPSTATE_INFLIGHT) 352 errx(1, "evp:%016" PRIx64 " not in-flight", evpid); 353 354 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 355 356 rq_envelope_delete(&ramqueue, evp); 357 358 return (1); 359 } 360 361 #define HOLDQ_MAXSIZE 1000 362 363 static int 364 scheduler_ram_hold(uint64_t evpid, uint64_t holdq) 365 { 366 struct rq_holdq *hq; 367 struct rq_message *msg; 368 struct rq_envelope *evp; 369 uint32_t msgid; 370 371 currtime = time(NULL); 372 373 msgid = evpid_to_msgid(evpid); 374 msg = tree_xget(&ramqueue.messages, msgid); 375 evp = tree_xget(&msg->envelopes, evpid); 376 377 /* it *must* be in-flight */ 378 if (evp->state != RQ_EVPSTATE_INFLIGHT) 379 errx(1, "evp:%016" PRIx64 " not in-flight", evpid); 380 381 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 382 383 /* If the envelope is suspended, just mark it as pending */ 384 if (evp->flags & RQ_ENVELOPE_SUSPEND) { 385 evp->state = RQ_EVPSTATE_PENDING; 386 return (0); 387 } 388 389 hq = tree_get(&holdqs[evp->type], holdq); 390 if (hq == NULL) { 391 hq = xcalloc(1, sizeof(*hq), "scheduler_hold"); 392 TAILQ_INIT(&hq->q); 393 tree_xset(&holdqs[evp->type], holdq, hq); 394 stat_increment("scheduler.ramqueue.holdq", 1); 395 } 396 397 /* If the holdq is full, just "tempfail" the envelope */ 398 if (hq->count >= HOLDQ_MAXSIZE) { 399 evp->state = RQ_EVPSTATE_PENDING; 400 evp->flags |= RQ_ENVELOPE_UPDATE; 401 evp->flags |= RQ_ENVELOPE_OVERFLOW; 402 sorted_insert(&ramqueue, evp); 403 stat_increment("scheduler.ramqueue.hold-overflow", 1); 404 return (0); 405 } 406 407 evp->state = RQ_EVPSTATE_HELD; 408 evp->holdq = holdq; 409 /* This is an optimization: upon release, the envelopes will be 410 * inserted in the pending queue from the first element to the last. 411 * Since elements already in the queue were received first, they 412 * were scheduled first, so they will be reinserted before the 413 * current element. 414 */ 415 TAILQ_INSERT_HEAD(&hq->q, evp, entry); 416 hq->count += 1; 417 stat_increment("scheduler.ramqueue.hold", 1); 418 419 return (1); 420 } 421 422 static int 423 scheduler_ram_release(int type, uint64_t holdq, int n) 424 { 425 struct rq_holdq *hq; 426 struct rq_envelope *evp; 427 int i, update; 428 429 currtime = time(NULL); 430 431 hq = tree_get(&holdqs[type], holdq); 432 if (hq == NULL) 433 return (0); 434 435 if (n == -1) { 436 n = 0; 437 update = 1; 438 } 439 else 440 update = 0; 441 442 for (i = 0; n == 0 || i < n; i++) { 443 evp = TAILQ_FIRST(&hq->q); 444 if (evp == NULL) 445 break; 446 447 TAILQ_REMOVE(&hq->q, evp, entry); 448 hq->count -= 1; 449 evp->holdq = 0; 450 451 /* When released, all envelopes are put in the pending queue 452 * and will be rescheduled immediately. As an optimization, 453 * we could just schedule them directly. 454 */ 455 evp->state = RQ_EVPSTATE_PENDING; 456 if (update) 457 evp->flags |= RQ_ENVELOPE_UPDATE; 458 sorted_insert(&ramqueue, evp); 459 } 460 461 if (TAILQ_EMPTY(&hq->q)) { 462 tree_xpop(&holdqs[type], holdq); 463 free(hq); 464 stat_decrement("scheduler.ramqueue.holdq", 1); 465 } 466 stat_decrement("scheduler.ramqueue.hold", i); 467 468 return (i); 469 } 470 471 static int 472 scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types) 473 { 474 struct rq_envelope *evp; 475 size_t i, n; 476 time_t t; 477 478 currtime = time(NULL); 479 480 rq_queue_schedule(&ramqueue); 481 if (verbose & TRACE_SCHEDULER) 482 rq_queue_dump(&ramqueue, "scheduler_ram_batch()"); 483 484 i = 0; 485 n = 0; 486 487 for (;;) { 488 489 if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) { 490 TAILQ_REMOVE(&ramqueue.q_removed, evp, entry); 491 types[i] = SCHED_REMOVE; 492 evpids[i] = evp->evpid; 493 rq_envelope_delete(&ramqueue, evp); 494 495 if (++i == *count) 496 break; 497 } 498 499 if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) { 500 TAILQ_REMOVE(&ramqueue.q_expired, evp, entry); 501 types[i] = SCHED_EXPIRE; 502 evpids[i] = evp->evpid; 503 rq_envelope_delete(&ramqueue, evp); 504 505 if (++i == *count) 506 break; 507 } 508 509 if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) { 510 TAILQ_REMOVE(&ramqueue.q_update, evp, entry); 511 types[i] = SCHED_UPDATE; 512 evpids[i] = evp->evpid; 513 514 if (evp->flags & RQ_ENVELOPE_OVERFLOW) 515 t = BACKOFF_OVERFLOW; 516 else if (evp->type == D_MTA) 517 t = BACKOFF_TRANSFER; 518 else 519 t = BACKOFF_DELIVERY; 520 521 evp->sched = scheduler_next(evp->ctime, t, 0); 522 evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW); 523 evp->state = RQ_EVPSTATE_PENDING; 524 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 525 sorted_insert(&ramqueue, evp); 526 527 if (++i == *count) 528 break; 529 } 530 531 if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) { 532 TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry); 533 types[i] = SCHED_BOUNCE; 534 evpids[i] = evp->evpid; 535 536 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 537 evp->state = RQ_EVPSTATE_INFLIGHT; 538 evp->t_inflight = currtime; 539 540 if (++i == *count) 541 break; 542 } 543 544 if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) { 545 TAILQ_REMOVE(&ramqueue.q_mda, evp, entry); 546 types[i] = SCHED_MDA; 547 evpids[i] = evp->evpid; 548 549 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 550 evp->state = RQ_EVPSTATE_INFLIGHT; 551 evp->t_inflight = currtime; 552 553 if (++i == *count) 554 break; 555 } 556 557 if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) { 558 TAILQ_REMOVE(&ramqueue.q_mta, evp, entry); 559 types[i] = SCHED_MTA; 560 evpids[i] = evp->evpid; 561 562 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 563 evp->state = RQ_EVPSTATE_INFLIGHT; 564 evp->t_inflight = currtime; 565 566 if (++i == *count) 567 break; 568 } 569 570 /* nothing seen this round */ 571 if (i == n) 572 break; 573 574 n = i; 575 } 576 577 if (i) { 578 *count = i; 579 return (1); 580 } 581 582 if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) { 583 if (evp->sched < evp->expire) 584 t = evp->sched; 585 else 586 t = evp->expire; 587 *delay = (t < currtime) ? 0 : (t - currtime); 588 } 589 else 590 *delay = -1; 591 592 return (0); 593 } 594 595 static size_t 596 scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size) 597 { 598 uint64_t id; 599 size_t n; 600 void *i; 601 602 for (n = 0, i = NULL; n < size; n++) { 603 if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0) 604 break; 605 dst[n] = id; 606 } 607 608 return (n); 609 } 610 611 static size_t 612 scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size) 613 { 614 struct rq_message *msg; 615 struct rq_envelope *evp; 616 void *i; 617 size_t n; 618 619 if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL) 620 return (0); 621 622 for (n = 0, i = NULL; n < size; ) { 623 624 if (tree_iterfrom(&msg->envelopes, &i, from, NULL, 625 (void**)&evp) == 0) 626 break; 627 628 if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) 629 continue; 630 631 dst[n].evpid = evp->evpid; 632 dst[n].flags = 0; 633 dst[n].retry = 0; 634 dst[n].time = 0; 635 636 if (evp->state == RQ_EVPSTATE_PENDING) { 637 dst[n].time = evp->sched; 638 dst[n].flags = EF_PENDING; 639 } 640 else if (evp->state == RQ_EVPSTATE_SCHEDULED) { 641 dst[n].time = evp->t_scheduled; 642 dst[n].flags = EF_PENDING; 643 } 644 else if (evp->state == RQ_EVPSTATE_INFLIGHT) { 645 dst[n].time = evp->t_inflight; 646 dst[n].flags = EF_INFLIGHT; 647 } 648 else if (evp->state == RQ_EVPSTATE_HELD) { 649 /* same as scheduled */ 650 dst[n].time = evp->t_scheduled; 651 dst[n].flags = EF_PENDING; 652 dst[n].flags |= EF_HOLD; 653 } 654 if (evp->flags & RQ_ENVELOPE_SUSPEND) 655 dst[n].flags |= EF_SUSPEND; 656 657 n++; 658 } 659 660 return (n); 661 } 662 663 static int 664 scheduler_ram_schedule(uint64_t evpid) 665 { 666 struct rq_message *msg; 667 struct rq_envelope *evp; 668 uint32_t msgid; 669 void *i; 670 int r; 671 672 currtime = time(NULL); 673 674 if (evpid > 0xffffffff) { 675 msgid = evpid_to_msgid(evpid); 676 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 677 return (0); 678 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 679 return (0); 680 if (evp->state == RQ_EVPSTATE_INFLIGHT) 681 return (0); 682 rq_envelope_schedule(&ramqueue, evp); 683 return (1); 684 } 685 else { 686 msgid = evpid; 687 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 688 return (0); 689 i = NULL; 690 r = 0; 691 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) { 692 if (evp->state == RQ_EVPSTATE_INFLIGHT) 693 continue; 694 rq_envelope_schedule(&ramqueue, evp); 695 r++; 696 } 697 return (r); 698 } 699 } 700 701 static int 702 scheduler_ram_remove(uint64_t evpid) 703 { 704 struct rq_message *msg; 705 struct rq_envelope *evp; 706 uint32_t msgid; 707 void *i; 708 int r; 709 710 currtime = time(NULL); 711 712 if (evpid > 0xffffffff) { 713 msgid = evpid_to_msgid(evpid); 714 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 715 return (0); 716 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 717 return (0); 718 if (rq_envelope_remove(&ramqueue, evp)) 719 return (1); 720 return (0); 721 } 722 else { 723 msgid = evpid; 724 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 725 return (0); 726 i = NULL; 727 r = 0; 728 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 729 if (rq_envelope_remove(&ramqueue, evp)) 730 r++; 731 return (r); 732 } 733 } 734 735 static int 736 scheduler_ram_suspend(uint64_t evpid) 737 { 738 struct rq_message *msg; 739 struct rq_envelope *evp; 740 uint32_t msgid; 741 void *i; 742 int r; 743 744 currtime = time(NULL); 745 746 if (evpid > 0xffffffff) { 747 msgid = evpid_to_msgid(evpid); 748 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 749 return (0); 750 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 751 return (0); 752 if (rq_envelope_suspend(&ramqueue, evp)) 753 return (1); 754 return (0); 755 } 756 else { 757 msgid = evpid; 758 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 759 return (0); 760 i = NULL; 761 r = 0; 762 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 763 if (rq_envelope_suspend(&ramqueue, evp)) 764 r++; 765 return (r); 766 } 767 } 768 769 static int 770 scheduler_ram_resume(uint64_t evpid) 771 { 772 struct rq_message *msg; 773 struct rq_envelope *evp; 774 uint32_t msgid; 775 void *i; 776 int r; 777 778 currtime = time(NULL); 779 780 if (evpid > 0xffffffff) { 781 msgid = evpid_to_msgid(evpid); 782 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 783 return (0); 784 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 785 return (0); 786 if (rq_envelope_resume(&ramqueue, evp)) 787 return (1); 788 return (0); 789 } 790 else { 791 msgid = evpid; 792 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 793 return (0); 794 i = NULL; 795 r = 0; 796 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 797 if (rq_envelope_resume(&ramqueue, evp)) 798 r++; 799 return (r); 800 } 801 } 802 803 static void 804 sorted_insert(struct rq_queue *rq, struct rq_envelope *evp) 805 { 806 struct rq_envelope *evp2; 807 808 SPLAY_INSERT(prioqtree, &rq->q_priotree, evp); 809 evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp); 810 if (evp2) 811 TAILQ_INSERT_BEFORE(evp2, evp, entry); 812 else 813 TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry); 814 } 815 816 static void 817 rq_queue_init(struct rq_queue *rq) 818 { 819 memset(rq, 0, sizeof *rq); 820 tree_init(&rq->messages); 821 TAILQ_INIT(&rq->q_pending); 822 TAILQ_INIT(&rq->q_inflight); 823 TAILQ_INIT(&rq->q_mta); 824 TAILQ_INIT(&rq->q_mda); 825 TAILQ_INIT(&rq->q_bounce); 826 TAILQ_INIT(&rq->q_update); 827 TAILQ_INIT(&rq->q_expired); 828 TAILQ_INIT(&rq->q_removed); 829 SPLAY_INIT(&rq->q_priotree); 830 } 831 832 static void 833 rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) 834 { 835 struct rq_message *message, *tomessage; 836 struct rq_envelope *envelope; 837 uint64_t id; 838 void *i; 839 840 while (tree_poproot(&update->messages, &id, (void*)&message)) { 841 if ((tomessage = tree_get(&rq->messages, id)) == NULL) { 842 /* message does not exist. re-use structure */ 843 tree_xset(&rq->messages, id, message); 844 continue; 845 } 846 /* need to re-link all envelopes before merging them */ 847 i = NULL; 848 while ((tree_iter(&message->envelopes, &i, &id, 849 (void*)&envelope))) 850 envelope->message = tomessage; 851 tree_merge(&tomessage->envelopes, &message->envelopes); 852 free(message); 853 stat_decrement("scheduler.ramqueue.message", 1); 854 } 855 856 /* Sorted insert in the pending queue */ 857 while ((envelope = TAILQ_FIRST(&update->q_pending))) { 858 TAILQ_REMOVE(&update->q_pending, envelope, entry); 859 sorted_insert(rq, envelope); 860 } 861 862 rq->evpcount += update->evpcount; 863 } 864 865 #define SCHEDULEMAX 1024 866 867 static void 868 rq_queue_schedule(struct rq_queue *rq) 869 { 870 struct rq_envelope *evp; 871 size_t n; 872 873 n = 0; 874 while ((evp = TAILQ_FIRST(&rq->q_pending))) { 875 if (evp->sched > currtime && evp->expire > currtime) 876 break; 877 878 if (n == SCHEDULEMAX) 879 break; 880 881 if (evp->state != RQ_EVPSTATE_PENDING) 882 errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid, 883 evp->flags); 884 885 if (evp->expire <= currtime) { 886 TAILQ_REMOVE(&rq->q_pending, evp, entry); 887 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 888 TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry); 889 evp->state = RQ_EVPSTATE_SCHEDULED; 890 evp->flags |= RQ_ENVELOPE_EXPIRED; 891 evp->t_scheduled = currtime; 892 continue; 893 } 894 rq_envelope_schedule(rq, evp); 895 n += 1; 896 } 897 } 898 899 static struct evplist * 900 rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp) 901 { 902 switch (evp->state) { 903 case RQ_EVPSTATE_PENDING: 904 return &rq->q_pending; 905 906 case RQ_EVPSTATE_SCHEDULED: 907 if (evp->flags & RQ_ENVELOPE_EXPIRED) 908 return &rq->q_expired; 909 if (evp->flags & RQ_ENVELOPE_REMOVED) 910 return &rq->q_removed; 911 if (evp->flags & RQ_ENVELOPE_UPDATE) 912 return &rq->q_update; 913 if (evp->type == D_MTA) 914 return &rq->q_mta; 915 if (evp->type == D_MDA) 916 return &rq->q_mda; 917 if (evp->type == D_BOUNCE) 918 return &rq->q_bounce; 919 errx(1, "%016" PRIx64 " bad evp type %d", evp->evpid, evp->type); 920 921 case RQ_EVPSTATE_INFLIGHT: 922 return &rq->q_inflight; 923 924 case RQ_EVPSTATE_HELD: 925 return (NULL); 926 } 927 928 errx(1, "%016" PRIx64 " bad state %d", evp->evpid, evp->state); 929 return (NULL); 930 } 931 932 static void 933 rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) 934 { 935 struct rq_holdq *hq; 936 struct evplist *q = NULL; 937 938 switch (evp->type) { 939 case D_MTA: 940 q = &rq->q_mta; 941 break; 942 case D_MDA: 943 q = &rq->q_mda; 944 break; 945 case D_BOUNCE: 946 q = &rq->q_bounce; 947 break; 948 } 949 950 if (evp->flags & RQ_ENVELOPE_UPDATE) 951 q = &rq->q_update; 952 953 if (evp->state == RQ_EVPSTATE_HELD) { 954 hq = tree_xget(&holdqs[evp->type], evp->holdq); 955 TAILQ_REMOVE(&hq->q, evp, entry); 956 hq->count -= 1; 957 if (TAILQ_EMPTY(&hq->q)) { 958 tree_xpop(&holdqs[evp->type], evp->holdq); 959 free(hq); 960 } 961 evp->holdq = 0; 962 stat_decrement("scheduler.ramqueue.hold", 1); 963 } 964 else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { 965 TAILQ_REMOVE(&rq->q_pending, evp, entry); 966 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 967 } 968 969 TAILQ_INSERT_TAIL(q, evp, entry); 970 evp->state = RQ_EVPSTATE_SCHEDULED; 971 evp->t_scheduled = currtime; 972 } 973 974 static int 975 rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) 976 { 977 struct rq_holdq *hq; 978 struct evplist *evl; 979 980 if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) 981 return (0); 982 /* 983 * If envelope is inflight, mark it envelope for removal. 984 */ 985 if (evp->state == RQ_EVPSTATE_INFLIGHT) { 986 evp->flags |= RQ_ENVELOPE_REMOVED; 987 return (1); 988 } 989 990 if (evp->state == RQ_EVPSTATE_HELD) { 991 hq = tree_xget(&holdqs[evp->type], evp->holdq); 992 TAILQ_REMOVE(&hq->q, evp, entry); 993 hq->count -= 1; 994 if (TAILQ_EMPTY(&hq->q)) { 995 tree_xpop(&holdqs[evp->type], evp->holdq); 996 free(hq); 997 } 998 evp->holdq = 0; 999 stat_decrement("scheduler.ramqueue.hold", 1); 1000 } 1001 else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { 1002 evl = rq_envelope_list(rq, evp); 1003 TAILQ_REMOVE(evl, evp, entry); 1004 if (evl == &rq->q_pending) 1005 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 1006 } 1007 1008 TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry); 1009 evp->state = RQ_EVPSTATE_SCHEDULED; 1010 evp->flags |= RQ_ENVELOPE_REMOVED; 1011 evp->t_scheduled = currtime; 1012 1013 return (1); 1014 } 1015 1016 static int 1017 rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) 1018 { 1019 struct rq_holdq *hq; 1020 struct evplist *evl; 1021 1022 if (evp->flags & RQ_ENVELOPE_SUSPEND) 1023 return (0); 1024 1025 if (evp->state == RQ_EVPSTATE_HELD) { 1026 hq = tree_xget(&holdqs[evp->type], evp->holdq); 1027 TAILQ_REMOVE(&hq->q, evp, entry); 1028 hq->count -= 1; 1029 if (TAILQ_EMPTY(&hq->q)) { 1030 tree_xpop(&holdqs[evp->type], evp->holdq); 1031 free(hq); 1032 } 1033 evp->holdq = 0; 1034 evp->state = RQ_EVPSTATE_PENDING; 1035 stat_decrement("scheduler.ramqueue.hold", 1); 1036 } 1037 else if (evp->state != RQ_EVPSTATE_INFLIGHT) { 1038 evl = rq_envelope_list(rq, evp); 1039 TAILQ_REMOVE(evl, evp, entry); 1040 if (evl == &rq->q_pending) 1041 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 1042 } 1043 1044 evp->flags |= RQ_ENVELOPE_SUSPEND; 1045 1046 return (1); 1047 } 1048 1049 static int 1050 rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp) 1051 { 1052 struct evplist *evl; 1053 1054 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 1055 return (0); 1056 1057 if (evp->state != RQ_EVPSTATE_INFLIGHT) { 1058 evl = rq_envelope_list(rq, evp); 1059 if (evl == &rq->q_pending) 1060 sorted_insert(rq, evp); 1061 else 1062 TAILQ_INSERT_TAIL(evl, evp, entry); 1063 } 1064 1065 evp->flags &= ~RQ_ENVELOPE_SUSPEND; 1066 1067 return (1); 1068 } 1069 1070 static void 1071 rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp) 1072 { 1073 tree_xpop(&evp->message->envelopes, evp->evpid); 1074 if (tree_empty(&evp->message->envelopes)) { 1075 tree_xpop(&rq->messages, evp->message->msgid); 1076 free(evp->message); 1077 stat_decrement("scheduler.ramqueue.message", 1); 1078 } 1079 1080 free(evp); 1081 rq->evpcount--; 1082 stat_decrement("scheduler.ramqueue.envelope", 1); 1083 } 1084 1085 static const char * 1086 rq_envelope_to_text(struct rq_envelope *e) 1087 { 1088 static char buf[256]; 1089 char t[64]; 1090 1091 (void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid); 1092 1093 if (e->type == D_BOUNCE) 1094 (void)strlcat(buf, "bounce", sizeof buf); 1095 else if (e->type == D_MDA) 1096 (void)strlcat(buf, "mda", sizeof buf); 1097 else if (e->type == D_MTA) 1098 (void)strlcat(buf, "mta", sizeof buf); 1099 1100 (void)snprintf(t, sizeof t, ",expire=%s", 1101 duration_to_text(e->expire - currtime)); 1102 (void)strlcat(buf, t, sizeof buf); 1103 1104 1105 switch (e->state) { 1106 case RQ_EVPSTATE_PENDING: 1107 (void)snprintf(t, sizeof t, ",pending=%s", 1108 duration_to_text(e->sched - currtime)); 1109 (void)strlcat(buf, t, sizeof buf); 1110 break; 1111 1112 case RQ_EVPSTATE_SCHEDULED: 1113 (void)snprintf(t, sizeof t, ",scheduled=%s", 1114 duration_to_text(currtime - e->t_scheduled)); 1115 (void)strlcat(buf, t, sizeof buf); 1116 break; 1117 1118 case RQ_EVPSTATE_INFLIGHT: 1119 (void)snprintf(t, sizeof t, ",inflight=%s", 1120 duration_to_text(currtime - e->t_inflight)); 1121 (void)strlcat(buf, t, sizeof buf); 1122 break; 1123 1124 case RQ_EVPSTATE_HELD: 1125 (void)snprintf(t, sizeof t, ",held=%s", 1126 duration_to_text(currtime - e->t_inflight)); 1127 (void)strlcat(buf, t, sizeof buf); 1128 break; 1129 default: 1130 errx(1, "%016" PRIx64 " bad state %d", e->evpid, e->state); 1131 } 1132 1133 if (e->flags & RQ_ENVELOPE_REMOVED) 1134 (void)strlcat(buf, ",removed", sizeof buf); 1135 if (e->flags & RQ_ENVELOPE_EXPIRED) 1136 (void)strlcat(buf, ",expired", sizeof buf); 1137 if (e->flags & RQ_ENVELOPE_SUSPEND) 1138 (void)strlcat(buf, ",suspended", sizeof buf); 1139 1140 (void)strlcat(buf, "]", sizeof buf); 1141 1142 return (buf); 1143 } 1144 1145 static void 1146 rq_queue_dump(struct rq_queue *rq, const char * name) 1147 { 1148 struct rq_message *message; 1149 struct rq_envelope *envelope; 1150 void *i, *j; 1151 uint64_t id; 1152 1153 log_debug("debug: /--- ramqueue: %s", name); 1154 1155 i = NULL; 1156 while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) { 1157 log_debug("debug: | msg:%08" PRIx32, message->msgid); 1158 j = NULL; 1159 while ((tree_iter(&message->envelopes, &j, &id, 1160 (void*)&envelope))) 1161 log_debug("debug: | %s", 1162 rq_envelope_to_text(envelope)); 1163 } 1164 log_debug("debug: \\---"); 1165 } 1166 1167 static int 1168 rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2) 1169 { 1170 time_t ref1, ref2; 1171 1172 ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire; 1173 ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire; 1174 if (ref1 != ref2) 1175 return (ref1 < ref2) ? -1 : 1; 1176 1177 if (e1->evpid != e2->evpid) 1178 return (e1->evpid < e2->evpid) ? -1 : 1; 1179 1180 return 0; 1181 } 1182 1183 SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); 1184