1 /* $OpenBSD: scheduler_ramqueue.c,v 1.47 2021/06/14 17:58:16 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 6 * 7 * Permission to use, copy, modify, and distribute this software for any 8 * purpose with or without fee is hereby granted, provided that the above 9 * copyright notice and this permission notice appear in all copies. 10 * 11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 18 */ 19 20 #include <inttypes.h> 21 #include <stdlib.h> 22 #include <string.h> 23 24 #include "smtpd.h" 25 #include "log.h" 26 27 TAILQ_HEAD(evplist, rq_envelope); 28 29 struct rq_message { 30 uint32_t msgid; 31 struct tree envelopes; 32 }; 33 34 struct rq_envelope { 35 TAILQ_ENTRY(rq_envelope) entry; 36 SPLAY_ENTRY(rq_envelope) t_entry; 37 38 uint64_t evpid; 39 uint64_t holdq; 40 enum delivery_type type; 41 42 #define RQ_EVPSTATE_PENDING 0 43 #define RQ_EVPSTATE_SCHEDULED 1 44 #define RQ_EVPSTATE_INFLIGHT 2 45 #define RQ_EVPSTATE_HELD 3 46 uint8_t state; 47 48 #define RQ_ENVELOPE_EXPIRED 0x01 49 #define RQ_ENVELOPE_REMOVED 0x02 50 #define RQ_ENVELOPE_SUSPEND 0x04 51 #define RQ_ENVELOPE_UPDATE 0x08 52 #define RQ_ENVELOPE_OVERFLOW 0x10 53 uint8_t flags; 54 55 time_t ctime; 56 time_t sched; 57 time_t expire; 58 59 struct rq_message *message; 60 61 time_t t_inflight; 62 time_t t_scheduled; 63 }; 64 65 struct rq_holdq { 66 struct evplist q; 67 size_t count; 68 }; 69 70 struct rq_queue { 71 size_t evpcount; 72 struct tree messages; 73 SPLAY_HEAD(prioqtree, rq_envelope) q_priotree; 74 75 struct evplist q_pending; 76 struct evplist q_inflight; 77 78 struct evplist q_mta; 79 struct evplist q_mda; 80 struct evplist q_bounce; 81 struct evplist q_update; 82 struct evplist q_expired; 83 struct evplist q_removed; 84 }; 85 86 static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *); 87 88 SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); 89 static int scheduler_ram_init(const char *); 90 static int scheduler_ram_insert(struct scheduler_info *); 91 static size_t scheduler_ram_commit(uint32_t); 92 static size_t scheduler_ram_rollback(uint32_t); 93 static int scheduler_ram_update(struct scheduler_info *); 94 static int scheduler_ram_delete(uint64_t); 95 static int scheduler_ram_hold(uint64_t, uint64_t); 96 static int scheduler_ram_release(int, uint64_t, int); 97 static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *); 98 static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t); 99 static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t); 100 static int scheduler_ram_schedule(uint64_t); 101 static int scheduler_ram_remove(uint64_t); 102 static int scheduler_ram_suspend(uint64_t); 103 static int scheduler_ram_resume(uint64_t); 104 static int scheduler_ram_query(uint64_t); 105 106 static void sorted_insert(struct rq_queue *, struct rq_envelope *); 107 108 static void rq_queue_init(struct rq_queue *); 109 static void rq_queue_merge(struct rq_queue *, struct rq_queue *); 110 static void rq_queue_dump(struct rq_queue *, const char *); 111 static void rq_queue_schedule(struct rq_queue *rq); 112 static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *); 113 static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *); 114 static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *); 115 static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *); 116 static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *); 117 static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *); 118 static const char *rq_envelope_to_text(struct rq_envelope *); 119 120 struct scheduler_backend scheduler_backend_ramqueue = { 121 scheduler_ram_init, 122 123 scheduler_ram_insert, 124 scheduler_ram_commit, 125 scheduler_ram_rollback, 126 127 scheduler_ram_update, 128 scheduler_ram_delete, 129 scheduler_ram_hold, 130 scheduler_ram_release, 131 132 scheduler_ram_batch, 133 134 scheduler_ram_messages, 135 scheduler_ram_envelopes, 136 scheduler_ram_schedule, 137 scheduler_ram_remove, 138 scheduler_ram_suspend, 139 scheduler_ram_resume, 140 scheduler_ram_query, 141 }; 142 143 static struct rq_queue ramqueue; 144 static struct tree updates; 145 static struct tree holdqs[3]; /* delivery type */ 146 147 static time_t currtime; 148 149 #define BACKOFF_TRANSFER 400 150 #define BACKOFF_DELIVERY 10 151 #define BACKOFF_OVERFLOW 3 152 153 static time_t 154 scheduler_backoff(time_t t0, time_t base, uint32_t step) 155 { 156 return (t0 + base * step * step); 157 } 158 159 static time_t 160 scheduler_next(time_t t0, time_t base, uint32_t step) 161 { 162 time_t t; 163 164 /* XXX be more efficient */ 165 while ((t = scheduler_backoff(t0, base, step)) <= currtime) 166 step++; 167 168 return (t); 169 } 170 171 static int 172 scheduler_ram_init(const char *arg) 173 { 174 rq_queue_init(&ramqueue); 175 tree_init(&updates); 176 tree_init(&holdqs[D_MDA]); 177 tree_init(&holdqs[D_MTA]); 178 tree_init(&holdqs[D_BOUNCE]); 179 180 return (1); 181 } 182 183 static int 184 scheduler_ram_insert(struct scheduler_info *si) 185 { 186 struct rq_queue *update; 187 struct rq_message *message; 188 struct rq_envelope *envelope; 189 uint32_t msgid; 190 191 currtime = time(NULL); 192 193 msgid = evpid_to_msgid(si->evpid); 194 195 /* find/prepare a ramqueue update */ 196 if ((update = tree_get(&updates, msgid)) == NULL) { 197 update = xcalloc(1, sizeof *update); 198 stat_increment("scheduler.ramqueue.update", 1); 199 rq_queue_init(update); 200 tree_xset(&updates, msgid, update); 201 } 202 203 /* find/prepare the msgtree message in ramqueue update */ 204 if ((message = tree_get(&update->messages, msgid)) == NULL) { 205 message = xcalloc(1, sizeof *message); 206 message->msgid = msgid; 207 tree_init(&message->envelopes); 208 tree_xset(&update->messages, msgid, message); 209 stat_increment("scheduler.ramqueue.message", 1); 210 } 211 212 /* create envelope in ramqueue message */ 213 envelope = xcalloc(1, sizeof *envelope); 214 envelope->evpid = si->evpid; 215 envelope->type = si->type; 216 envelope->message = message; 217 envelope->ctime = si->creation; 218 envelope->expire = si->creation + si->ttl; 219 envelope->sched = scheduler_backoff(si->creation, 220 (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); 221 tree_xset(&message->envelopes, envelope->evpid, envelope); 222 223 update->evpcount++; 224 stat_increment("scheduler.ramqueue.envelope", 1); 225 226 envelope->state = RQ_EVPSTATE_PENDING; 227 TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry); 228 229 si->nexttry = envelope->sched; 230 231 return (1); 232 } 233 234 static size_t 235 scheduler_ram_commit(uint32_t msgid) 236 { 237 struct rq_queue *update; 238 size_t r; 239 240 currtime = time(NULL); 241 242 update = tree_xpop(&updates, msgid); 243 r = update->evpcount; 244 245 if (tracing & TRACE_SCHEDULER) 246 rq_queue_dump(update, "update to commit"); 247 248 rq_queue_merge(&ramqueue, update); 249 250 if (tracing & TRACE_SCHEDULER) 251 rq_queue_dump(&ramqueue, "resulting queue"); 252 253 rq_queue_schedule(&ramqueue); 254 255 free(update); 256 stat_decrement("scheduler.ramqueue.update", 1); 257 258 return (r); 259 } 260 261 static size_t 262 scheduler_ram_rollback(uint32_t msgid) 263 { 264 struct rq_queue *update; 265 struct rq_envelope *evp; 266 size_t r; 267 268 currtime = time(NULL); 269 270 if ((update = tree_pop(&updates, msgid)) == NULL) 271 return (0); 272 r = update->evpcount; 273 274 while ((evp = TAILQ_FIRST(&update->q_pending))) { 275 TAILQ_REMOVE(&update->q_pending, evp, entry); 276 rq_envelope_delete(update, evp); 277 } 278 279 free(update); 280 stat_decrement("scheduler.ramqueue.update", 1); 281 282 return (r); 283 } 284 285 static int 286 scheduler_ram_update(struct scheduler_info *si) 287 { 288 struct rq_message *msg; 289 struct rq_envelope *evp; 290 uint32_t msgid; 291 292 currtime = time(NULL); 293 294 msgid = evpid_to_msgid(si->evpid); 295 msg = tree_xget(&ramqueue.messages, msgid); 296 evp = tree_xget(&msg->envelopes, si->evpid); 297 298 /* it *must* be in-flight */ 299 if (evp->state != RQ_EVPSTATE_INFLIGHT) 300 fatalx("evp:%016" PRIx64 " not in-flight", si->evpid); 301 302 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 303 304 /* 305 * If the envelope was removed while inflight, schedule it for 306 * removal immediately. 307 */ 308 if (evp->flags & RQ_ENVELOPE_REMOVED) { 309 TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry); 310 evp->state = RQ_EVPSTATE_SCHEDULED; 311 evp->t_scheduled = currtime; 312 return (1); 313 } 314 315 evp->sched = scheduler_next(evp->ctime, 316 (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); 317 318 evp->state = RQ_EVPSTATE_PENDING; 319 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 320 sorted_insert(&ramqueue, evp); 321 322 si->nexttry = evp->sched; 323 324 return (1); 325 } 326 327 static int 328 scheduler_ram_delete(uint64_t evpid) 329 { 330 struct rq_message *msg; 331 struct rq_envelope *evp; 332 uint32_t msgid; 333 334 currtime = time(NULL); 335 336 msgid = evpid_to_msgid(evpid); 337 msg = tree_xget(&ramqueue.messages, msgid); 338 evp = tree_xget(&msg->envelopes, evpid); 339 340 /* it *must* be in-flight */ 341 if (evp->state != RQ_EVPSTATE_INFLIGHT) 342 fatalx("evp:%016" PRIx64 " not in-flight", evpid); 343 344 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 345 346 rq_envelope_delete(&ramqueue, evp); 347 348 return (1); 349 } 350 351 #define HOLDQ_MAXSIZE 1000 352 353 static int 354 scheduler_ram_hold(uint64_t evpid, uint64_t holdq) 355 { 356 struct rq_holdq *hq; 357 struct rq_message *msg; 358 struct rq_envelope *evp; 359 uint32_t msgid; 360 361 currtime = time(NULL); 362 363 msgid = evpid_to_msgid(evpid); 364 msg = tree_xget(&ramqueue.messages, msgid); 365 evp = tree_xget(&msg->envelopes, evpid); 366 367 /* it *must* be in-flight */ 368 if (evp->state != RQ_EVPSTATE_INFLIGHT) 369 fatalx("evp:%016" PRIx64 " not in-flight", evpid); 370 371 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 372 373 /* If the envelope is suspended, just mark it as pending */ 374 if (evp->flags & RQ_ENVELOPE_SUSPEND) { 375 evp->state = RQ_EVPSTATE_PENDING; 376 return (0); 377 } 378 379 hq = tree_get(&holdqs[evp->type], holdq); 380 if (hq == NULL) { 381 hq = xcalloc(1, sizeof(*hq)); 382 TAILQ_INIT(&hq->q); 383 tree_xset(&holdqs[evp->type], holdq, hq); 384 stat_increment("scheduler.ramqueue.holdq", 1); 385 } 386 387 /* If the holdq is full, just "tempfail" the envelope */ 388 if (hq->count >= HOLDQ_MAXSIZE) { 389 evp->state = RQ_EVPSTATE_PENDING; 390 evp->flags |= RQ_ENVELOPE_UPDATE; 391 evp->flags |= RQ_ENVELOPE_OVERFLOW; 392 sorted_insert(&ramqueue, evp); 393 stat_increment("scheduler.ramqueue.hold-overflow", 1); 394 return (0); 395 } 396 397 evp->state = RQ_EVPSTATE_HELD; 398 evp->holdq = holdq; 399 /* This is an optimization: upon release, the envelopes will be 400 * inserted in the pending queue from the first element to the last. 401 * Since elements already in the queue were received first, they 402 * were scheduled first, so they will be reinserted before the 403 * current element. 404 */ 405 TAILQ_INSERT_HEAD(&hq->q, evp, entry); 406 hq->count += 1; 407 stat_increment("scheduler.ramqueue.hold", 1); 408 409 return (1); 410 } 411 412 static int 413 scheduler_ram_release(int type, uint64_t holdq, int n) 414 { 415 struct rq_holdq *hq; 416 struct rq_envelope *evp; 417 int i, update; 418 419 currtime = time(NULL); 420 421 hq = tree_get(&holdqs[type], holdq); 422 if (hq == NULL) 423 return (0); 424 425 if (n == -1) { 426 n = 0; 427 update = 1; 428 } 429 else 430 update = 0; 431 432 for (i = 0; n == 0 || i < n; i++) { 433 evp = TAILQ_FIRST(&hq->q); 434 if (evp == NULL) 435 break; 436 437 TAILQ_REMOVE(&hq->q, evp, entry); 438 hq->count -= 1; 439 evp->holdq = 0; 440 441 /* When released, all envelopes are put in the pending queue 442 * and will be rescheduled immediately. As an optimization, 443 * we could just schedule them directly. 444 */ 445 evp->state = RQ_EVPSTATE_PENDING; 446 if (update) 447 evp->flags |= RQ_ENVELOPE_UPDATE; 448 sorted_insert(&ramqueue, evp); 449 } 450 451 if (TAILQ_EMPTY(&hq->q)) { 452 tree_xpop(&holdqs[type], holdq); 453 free(hq); 454 stat_decrement("scheduler.ramqueue.holdq", 1); 455 } 456 stat_decrement("scheduler.ramqueue.hold", i); 457 458 return (i); 459 } 460 461 static int 462 scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types) 463 { 464 struct rq_envelope *evp; 465 size_t i, n; 466 time_t t; 467 468 currtime = time(NULL); 469 470 rq_queue_schedule(&ramqueue); 471 if (tracing & TRACE_SCHEDULER) 472 rq_queue_dump(&ramqueue, "scheduler_ram_batch()"); 473 474 i = 0; 475 n = 0; 476 477 for (;;) { 478 479 if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) { 480 TAILQ_REMOVE(&ramqueue.q_removed, evp, entry); 481 types[i] = SCHED_REMOVE; 482 evpids[i] = evp->evpid; 483 rq_envelope_delete(&ramqueue, evp); 484 485 if (++i == *count) 486 break; 487 } 488 489 if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) { 490 TAILQ_REMOVE(&ramqueue.q_expired, evp, entry); 491 types[i] = SCHED_EXPIRE; 492 evpids[i] = evp->evpid; 493 rq_envelope_delete(&ramqueue, evp); 494 495 if (++i == *count) 496 break; 497 } 498 499 if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) { 500 TAILQ_REMOVE(&ramqueue.q_update, evp, entry); 501 types[i] = SCHED_UPDATE; 502 evpids[i] = evp->evpid; 503 504 if (evp->flags & RQ_ENVELOPE_OVERFLOW) 505 t = BACKOFF_OVERFLOW; 506 else if (evp->type == D_MTA) 507 t = BACKOFF_TRANSFER; 508 else 509 t = BACKOFF_DELIVERY; 510 511 evp->sched = scheduler_next(evp->ctime, t, 0); 512 evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW); 513 evp->state = RQ_EVPSTATE_PENDING; 514 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 515 sorted_insert(&ramqueue, evp); 516 517 if (++i == *count) 518 break; 519 } 520 521 if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) { 522 TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry); 523 types[i] = SCHED_BOUNCE; 524 evpids[i] = evp->evpid; 525 526 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 527 evp->state = RQ_EVPSTATE_INFLIGHT; 528 evp->t_inflight = currtime; 529 530 if (++i == *count) 531 break; 532 } 533 534 if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) { 535 TAILQ_REMOVE(&ramqueue.q_mda, evp, entry); 536 types[i] = SCHED_MDA; 537 evpids[i] = evp->evpid; 538 539 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 540 evp->state = RQ_EVPSTATE_INFLIGHT; 541 evp->t_inflight = currtime; 542 543 if (++i == *count) 544 break; 545 } 546 547 if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) { 548 TAILQ_REMOVE(&ramqueue.q_mta, evp, entry); 549 types[i] = SCHED_MTA; 550 evpids[i] = evp->evpid; 551 552 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 553 evp->state = RQ_EVPSTATE_INFLIGHT; 554 evp->t_inflight = currtime; 555 556 if (++i == *count) 557 break; 558 } 559 560 /* nothing seen this round */ 561 if (i == n) 562 break; 563 564 n = i; 565 } 566 567 if (i) { 568 *count = i; 569 return (1); 570 } 571 572 if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) { 573 if (evp->sched < evp->expire) 574 t = evp->sched; 575 else 576 t = evp->expire; 577 *delay = (t < currtime) ? 0 : (t - currtime); 578 } 579 else 580 *delay = -1; 581 582 return (0); 583 } 584 585 static size_t 586 scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size) 587 { 588 uint64_t id; 589 size_t n; 590 void *i; 591 592 for (n = 0, i = NULL; n < size; n++) { 593 if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0) 594 break; 595 dst[n] = id; 596 } 597 598 return (n); 599 } 600 601 static size_t 602 scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size) 603 { 604 struct rq_message *msg; 605 struct rq_envelope *evp; 606 void *i; 607 size_t n; 608 609 if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL) 610 return (0); 611 612 for (n = 0, i = NULL; n < size; ) { 613 614 if (tree_iterfrom(&msg->envelopes, &i, from, NULL, 615 (void**)&evp) == 0) 616 break; 617 618 if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) 619 continue; 620 621 dst[n].evpid = evp->evpid; 622 dst[n].flags = 0; 623 dst[n].retry = 0; 624 dst[n].time = 0; 625 626 if (evp->state == RQ_EVPSTATE_PENDING) { 627 dst[n].time = evp->sched; 628 dst[n].flags = EF_PENDING; 629 } 630 else if (evp->state == RQ_EVPSTATE_SCHEDULED) { 631 dst[n].time = evp->t_scheduled; 632 dst[n].flags = EF_PENDING; 633 } 634 else if (evp->state == RQ_EVPSTATE_INFLIGHT) { 635 dst[n].time = evp->t_inflight; 636 dst[n].flags = EF_INFLIGHT; 637 } 638 else if (evp->state == RQ_EVPSTATE_HELD) { 639 /* same as scheduled */ 640 dst[n].time = evp->t_scheduled; 641 dst[n].flags = EF_PENDING; 642 dst[n].flags |= EF_HOLD; 643 } 644 if (evp->flags & RQ_ENVELOPE_SUSPEND) 645 dst[n].flags |= EF_SUSPEND; 646 647 n++; 648 } 649 650 return (n); 651 } 652 653 static int 654 scheduler_ram_schedule(uint64_t evpid) 655 { 656 struct rq_message *msg; 657 struct rq_envelope *evp; 658 uint32_t msgid; 659 void *i; 660 int r; 661 662 currtime = time(NULL); 663 664 if (evpid > 0xffffffff) { 665 msgid = evpid_to_msgid(evpid); 666 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 667 return (0); 668 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 669 return (0); 670 if (evp->state == RQ_EVPSTATE_INFLIGHT) 671 return (0); 672 rq_envelope_schedule(&ramqueue, evp); 673 return (1); 674 } 675 else { 676 msgid = evpid; 677 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 678 return (0); 679 i = NULL; 680 r = 0; 681 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) { 682 if (evp->state == RQ_EVPSTATE_INFLIGHT) 683 continue; 684 rq_envelope_schedule(&ramqueue, evp); 685 r++; 686 } 687 return (r); 688 } 689 } 690 691 static int 692 scheduler_ram_remove(uint64_t evpid) 693 { 694 struct rq_message *msg; 695 struct rq_envelope *evp; 696 uint32_t msgid; 697 void *i; 698 int r; 699 700 currtime = time(NULL); 701 702 if (evpid > 0xffffffff) { 703 msgid = evpid_to_msgid(evpid); 704 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 705 return (0); 706 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 707 return (0); 708 if (rq_envelope_remove(&ramqueue, evp)) 709 return (1); 710 return (0); 711 } 712 else { 713 msgid = evpid; 714 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 715 return (0); 716 i = NULL; 717 r = 0; 718 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 719 if (rq_envelope_remove(&ramqueue, evp)) 720 r++; 721 return (r); 722 } 723 } 724 725 static int 726 scheduler_ram_suspend(uint64_t evpid) 727 { 728 struct rq_message *msg; 729 struct rq_envelope *evp; 730 uint32_t msgid; 731 void *i; 732 int r; 733 734 currtime = time(NULL); 735 736 if (evpid > 0xffffffff) { 737 msgid = evpid_to_msgid(evpid); 738 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 739 return (0); 740 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 741 return (0); 742 if (rq_envelope_suspend(&ramqueue, evp)) 743 return (1); 744 return (0); 745 } 746 else { 747 msgid = evpid; 748 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 749 return (0); 750 i = NULL; 751 r = 0; 752 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 753 if (rq_envelope_suspend(&ramqueue, evp)) 754 r++; 755 return (r); 756 } 757 } 758 759 static int 760 scheduler_ram_resume(uint64_t evpid) 761 { 762 struct rq_message *msg; 763 struct rq_envelope *evp; 764 uint32_t msgid; 765 void *i; 766 int r; 767 768 currtime = time(NULL); 769 770 if (evpid > 0xffffffff) { 771 msgid = evpid_to_msgid(evpid); 772 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 773 return (0); 774 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 775 return (0); 776 if (rq_envelope_resume(&ramqueue, evp)) 777 return (1); 778 return (0); 779 } 780 else { 781 msgid = evpid; 782 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 783 return (0); 784 i = NULL; 785 r = 0; 786 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 787 if (rq_envelope_resume(&ramqueue, evp)) 788 r++; 789 return (r); 790 } 791 } 792 793 static int 794 scheduler_ram_query(uint64_t evpid) 795 { 796 uint32_t msgid; 797 798 if (evpid > 0xffffffff) 799 msgid = evpid_to_msgid(evpid); 800 else 801 msgid = evpid; 802 803 if (tree_get(&ramqueue.messages, msgid) == NULL) 804 return (0); 805 806 return (1); 807 } 808 809 static void 810 sorted_insert(struct rq_queue *rq, struct rq_envelope *evp) 811 { 812 struct rq_envelope *evp2; 813 814 SPLAY_INSERT(prioqtree, &rq->q_priotree, evp); 815 evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp); 816 if (evp2) 817 TAILQ_INSERT_BEFORE(evp2, evp, entry); 818 else 819 TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry); 820 } 821 822 static void 823 rq_queue_init(struct rq_queue *rq) 824 { 825 memset(rq, 0, sizeof *rq); 826 tree_init(&rq->messages); 827 TAILQ_INIT(&rq->q_pending); 828 TAILQ_INIT(&rq->q_inflight); 829 TAILQ_INIT(&rq->q_mta); 830 TAILQ_INIT(&rq->q_mda); 831 TAILQ_INIT(&rq->q_bounce); 832 TAILQ_INIT(&rq->q_update); 833 TAILQ_INIT(&rq->q_expired); 834 TAILQ_INIT(&rq->q_removed); 835 SPLAY_INIT(&rq->q_priotree); 836 } 837 838 static void 839 rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) 840 { 841 struct rq_message *message, *tomessage; 842 struct rq_envelope *envelope; 843 uint64_t id; 844 void *i; 845 846 while (tree_poproot(&update->messages, &id, (void*)&message)) { 847 if ((tomessage = tree_get(&rq->messages, id)) == NULL) { 848 /* message does not exist. re-use structure */ 849 tree_xset(&rq->messages, id, message); 850 continue; 851 } 852 /* need to re-link all envelopes before merging them */ 853 i = NULL; 854 while ((tree_iter(&message->envelopes, &i, &id, 855 (void*)&envelope))) 856 envelope->message = tomessage; 857 tree_merge(&tomessage->envelopes, &message->envelopes); 858 free(message); 859 stat_decrement("scheduler.ramqueue.message", 1); 860 } 861 862 /* Sorted insert in the pending queue */ 863 while ((envelope = TAILQ_FIRST(&update->q_pending))) { 864 TAILQ_REMOVE(&update->q_pending, envelope, entry); 865 sorted_insert(rq, envelope); 866 } 867 868 rq->evpcount += update->evpcount; 869 } 870 871 #define SCHEDULEMAX 1024 872 873 static void 874 rq_queue_schedule(struct rq_queue *rq) 875 { 876 struct rq_envelope *evp; 877 size_t n; 878 879 n = 0; 880 while ((evp = TAILQ_FIRST(&rq->q_pending))) { 881 if (evp->sched > currtime && evp->expire > currtime) 882 break; 883 884 if (n == SCHEDULEMAX) 885 break; 886 887 if (evp->state != RQ_EVPSTATE_PENDING) 888 fatalx("evp:%016" PRIx64 " flags=0x%x", evp->evpid, 889 evp->flags); 890 891 if (evp->expire <= currtime) { 892 TAILQ_REMOVE(&rq->q_pending, evp, entry); 893 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 894 TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry); 895 evp->state = RQ_EVPSTATE_SCHEDULED; 896 evp->flags |= RQ_ENVELOPE_EXPIRED; 897 evp->t_scheduled = currtime; 898 continue; 899 } 900 rq_envelope_schedule(rq, evp); 901 n += 1; 902 } 903 } 904 905 static struct evplist * 906 rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp) 907 { 908 switch (evp->state) { 909 case RQ_EVPSTATE_PENDING: 910 return &rq->q_pending; 911 912 case RQ_EVPSTATE_SCHEDULED: 913 if (evp->flags & RQ_ENVELOPE_EXPIRED) 914 return &rq->q_expired; 915 if (evp->flags & RQ_ENVELOPE_REMOVED) 916 return &rq->q_removed; 917 if (evp->flags & RQ_ENVELOPE_UPDATE) 918 return &rq->q_update; 919 if (evp->type == D_MTA) 920 return &rq->q_mta; 921 if (evp->type == D_MDA) 922 return &rq->q_mda; 923 if (evp->type == D_BOUNCE) 924 return &rq->q_bounce; 925 fatalx("%016" PRIx64 " bad evp type %d", evp->evpid, evp->type); 926 927 case RQ_EVPSTATE_INFLIGHT: 928 return &rq->q_inflight; 929 930 case RQ_EVPSTATE_HELD: 931 return (NULL); 932 } 933 934 fatalx("%016" PRIx64 " bad state %d", evp->evpid, evp->state); 935 return (NULL); 936 } 937 938 static void 939 rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) 940 { 941 struct rq_holdq *hq; 942 struct evplist *q = NULL; 943 944 switch (evp->type) { 945 case D_MTA: 946 q = &rq->q_mta; 947 break; 948 case D_MDA: 949 q = &rq->q_mda; 950 break; 951 case D_BOUNCE: 952 q = &rq->q_bounce; 953 break; 954 } 955 956 if (evp->flags & RQ_ENVELOPE_UPDATE) 957 q = &rq->q_update; 958 959 if (evp->state == RQ_EVPSTATE_HELD) { 960 hq = tree_xget(&holdqs[evp->type], evp->holdq); 961 TAILQ_REMOVE(&hq->q, evp, entry); 962 hq->count -= 1; 963 if (TAILQ_EMPTY(&hq->q)) { 964 tree_xpop(&holdqs[evp->type], evp->holdq); 965 free(hq); 966 } 967 evp->holdq = 0; 968 stat_decrement("scheduler.ramqueue.hold", 1); 969 } 970 else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { 971 TAILQ_REMOVE(&rq->q_pending, evp, entry); 972 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 973 } 974 975 TAILQ_INSERT_TAIL(q, evp, entry); 976 evp->state = RQ_EVPSTATE_SCHEDULED; 977 evp->t_scheduled = currtime; 978 } 979 980 static int 981 rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) 982 { 983 struct rq_holdq *hq; 984 struct evplist *evl; 985 986 if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) 987 return (0); 988 /* 989 * If envelope is inflight, mark it envelope for removal. 990 */ 991 if (evp->state == RQ_EVPSTATE_INFLIGHT) { 992 evp->flags |= RQ_ENVELOPE_REMOVED; 993 return (1); 994 } 995 996 if (evp->state == RQ_EVPSTATE_HELD) { 997 hq = tree_xget(&holdqs[evp->type], evp->holdq); 998 TAILQ_REMOVE(&hq->q, evp, entry); 999 hq->count -= 1; 1000 if (TAILQ_EMPTY(&hq->q)) { 1001 tree_xpop(&holdqs[evp->type], evp->holdq); 1002 free(hq); 1003 } 1004 evp->holdq = 0; 1005 stat_decrement("scheduler.ramqueue.hold", 1); 1006 } 1007 else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { 1008 evl = rq_envelope_list(rq, evp); 1009 TAILQ_REMOVE(evl, evp, entry); 1010 if (evl == &rq->q_pending) 1011 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 1012 } 1013 1014 TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry); 1015 evp->state = RQ_EVPSTATE_SCHEDULED; 1016 evp->flags |= RQ_ENVELOPE_REMOVED; 1017 evp->t_scheduled = currtime; 1018 1019 return (1); 1020 } 1021 1022 static int 1023 rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) 1024 { 1025 struct rq_holdq *hq; 1026 struct evplist *evl; 1027 1028 if (evp->flags & RQ_ENVELOPE_SUSPEND) 1029 return (0); 1030 1031 if (evp->state == RQ_EVPSTATE_HELD) { 1032 hq = tree_xget(&holdqs[evp->type], evp->holdq); 1033 TAILQ_REMOVE(&hq->q, evp, entry); 1034 hq->count -= 1; 1035 if (TAILQ_EMPTY(&hq->q)) { 1036 tree_xpop(&holdqs[evp->type], evp->holdq); 1037 free(hq); 1038 } 1039 evp->holdq = 0; 1040 evp->state = RQ_EVPSTATE_PENDING; 1041 stat_decrement("scheduler.ramqueue.hold", 1); 1042 } 1043 else if (evp->state != RQ_EVPSTATE_INFLIGHT) { 1044 evl = rq_envelope_list(rq, evp); 1045 TAILQ_REMOVE(evl, evp, entry); 1046 if (evl == &rq->q_pending) 1047 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 1048 } 1049 1050 evp->flags |= RQ_ENVELOPE_SUSPEND; 1051 1052 return (1); 1053 } 1054 1055 static int 1056 rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp) 1057 { 1058 struct evplist *evl; 1059 1060 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 1061 return (0); 1062 1063 if (evp->state != RQ_EVPSTATE_INFLIGHT) { 1064 evl = rq_envelope_list(rq, evp); 1065 if (evl == &rq->q_pending) 1066 sorted_insert(rq, evp); 1067 else 1068 TAILQ_INSERT_TAIL(evl, evp, entry); 1069 } 1070 1071 evp->flags &= ~RQ_ENVELOPE_SUSPEND; 1072 1073 return (1); 1074 } 1075 1076 static void 1077 rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp) 1078 { 1079 tree_xpop(&evp->message->envelopes, evp->evpid); 1080 if (tree_empty(&evp->message->envelopes)) { 1081 tree_xpop(&rq->messages, evp->message->msgid); 1082 free(evp->message); 1083 stat_decrement("scheduler.ramqueue.message", 1); 1084 } 1085 1086 free(evp); 1087 rq->evpcount--; 1088 stat_decrement("scheduler.ramqueue.envelope", 1); 1089 } 1090 1091 static const char * 1092 rq_envelope_to_text(struct rq_envelope *e) 1093 { 1094 static char buf[256]; 1095 char t[64]; 1096 1097 (void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid); 1098 1099 if (e->type == D_BOUNCE) 1100 (void)strlcat(buf, "bounce", sizeof buf); 1101 else if (e->type == D_MDA) 1102 (void)strlcat(buf, "mda", sizeof buf); 1103 else if (e->type == D_MTA) 1104 (void)strlcat(buf, "mta", sizeof buf); 1105 1106 (void)snprintf(t, sizeof t, ",expire=%s", 1107 duration_to_text(e->expire - currtime)); 1108 (void)strlcat(buf, t, sizeof buf); 1109 1110 1111 switch (e->state) { 1112 case RQ_EVPSTATE_PENDING: 1113 (void)snprintf(t, sizeof t, ",pending=%s", 1114 duration_to_text(e->sched - currtime)); 1115 (void)strlcat(buf, t, sizeof buf); 1116 break; 1117 1118 case RQ_EVPSTATE_SCHEDULED: 1119 (void)snprintf(t, sizeof t, ",scheduled=%s", 1120 duration_to_text(currtime - e->t_scheduled)); 1121 (void)strlcat(buf, t, sizeof buf); 1122 break; 1123 1124 case RQ_EVPSTATE_INFLIGHT: 1125 (void)snprintf(t, sizeof t, ",inflight=%s", 1126 duration_to_text(currtime - e->t_inflight)); 1127 (void)strlcat(buf, t, sizeof buf); 1128 break; 1129 1130 case RQ_EVPSTATE_HELD: 1131 (void)snprintf(t, sizeof t, ",held=%s", 1132 duration_to_text(currtime - e->t_inflight)); 1133 (void)strlcat(buf, t, sizeof buf); 1134 break; 1135 default: 1136 fatalx("%016" PRIx64 " bad state %d", e->evpid, e->state); 1137 } 1138 1139 if (e->flags & RQ_ENVELOPE_REMOVED) 1140 (void)strlcat(buf, ",removed", sizeof buf); 1141 if (e->flags & RQ_ENVELOPE_EXPIRED) 1142 (void)strlcat(buf, ",expired", sizeof buf); 1143 if (e->flags & RQ_ENVELOPE_SUSPEND) 1144 (void)strlcat(buf, ",suspended", sizeof buf); 1145 1146 (void)strlcat(buf, "]", sizeof buf); 1147 1148 return (buf); 1149 } 1150 1151 static void 1152 rq_queue_dump(struct rq_queue *rq, const char * name) 1153 { 1154 struct rq_message *message; 1155 struct rq_envelope *envelope; 1156 void *i, *j; 1157 uint64_t id; 1158 1159 log_debug("debug: /--- ramqueue: %s", name); 1160 1161 i = NULL; 1162 while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) { 1163 log_debug("debug: | msg:%08" PRIx32, message->msgid); 1164 j = NULL; 1165 while ((tree_iter(&message->envelopes, &j, &id, 1166 (void*)&envelope))) 1167 log_debug("debug: | %s", 1168 rq_envelope_to_text(envelope)); 1169 } 1170 log_debug("debug: \\---"); 1171 } 1172 1173 static int 1174 rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2) 1175 { 1176 time_t ref1, ref2; 1177 1178 ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire; 1179 ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire; 1180 if (ref1 != ref2) 1181 return (ref1 < ref2) ? -1 : 1; 1182 1183 if (e1->evpid != e2->evpid) 1184 return (e1->evpid < e2->evpid) ? -1 : 1; 1185 1186 return 0; 1187 } 1188 1189 SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); 1190