1 /* $OpenBSD: scheduler_ramqueue.c,v 1.45 2018/05/31 21:06:12 gilles Exp $ */ 2 3 /* 4 * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 6 * 7 * Permission to use, copy, modify, and distribute this software for any 8 * purpose with or without fee is hereby granted, provided that the above 9 * copyright notice and this permission notice appear in all copies. 10 * 11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 18 */ 19 20 #include <sys/types.h> 21 #include <sys/queue.h> 22 #include <sys/tree.h> 23 #include <sys/socket.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <event.h> 28 #include <fcntl.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <stdio.h> 32 #include <stdlib.h> 33 #include <string.h> 34 #include <limits.h> 35 #include <time.h> 36 37 #include "smtpd.h" 38 #include "log.h" 39 40 TAILQ_HEAD(evplist, rq_envelope); 41 42 struct rq_message { 43 uint32_t msgid; 44 struct tree envelopes; 45 }; 46 47 struct rq_envelope { 48 TAILQ_ENTRY(rq_envelope) entry; 49 SPLAY_ENTRY(rq_envelope) t_entry; 50 51 uint64_t evpid; 52 uint64_t holdq; 53 enum delivery_type type; 54 55 #define RQ_EVPSTATE_PENDING 0 56 #define RQ_EVPSTATE_SCHEDULED 1 57 #define RQ_EVPSTATE_INFLIGHT 2 58 #define RQ_EVPSTATE_HELD 3 59 uint8_t state; 60 61 #define RQ_ENVELOPE_EXPIRED 0x01 62 #define RQ_ENVELOPE_REMOVED 0x02 63 #define RQ_ENVELOPE_SUSPEND 0x04 64 #define RQ_ENVELOPE_UPDATE 0x08 65 #define RQ_ENVELOPE_OVERFLOW 0x10 66 uint8_t flags; 67 68 time_t ctime; 69 time_t sched; 70 time_t expire; 71 72 struct rq_message *message; 73 74 time_t t_inflight; 75 time_t t_scheduled; 76 }; 77 78 struct rq_holdq { 79 struct evplist q; 80 size_t count; 81 }; 82 83 struct rq_queue { 84 size_t evpcount; 85 struct tree messages; 86 SPLAY_HEAD(prioqtree, rq_envelope) q_priotree; 87 88 struct evplist q_pending; 89 struct evplist q_inflight; 90 91 struct evplist q_mta; 92 struct evplist q_mda; 93 struct evplist q_bounce; 94 struct evplist q_update; 95 struct evplist q_expired; 96 struct evplist q_removed; 97 }; 98 99 static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *); 100 101 SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); 102 static int scheduler_ram_init(const char *); 103 static int scheduler_ram_insert(struct scheduler_info *); 104 static size_t scheduler_ram_commit(uint32_t); 105 static size_t scheduler_ram_rollback(uint32_t); 106 static int scheduler_ram_update(struct scheduler_info *); 107 static int scheduler_ram_delete(uint64_t); 108 static int scheduler_ram_hold(uint64_t, uint64_t); 109 static int scheduler_ram_release(int, uint64_t, int); 110 static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *); 111 static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t); 112 static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t); 113 static int scheduler_ram_schedule(uint64_t); 114 static int scheduler_ram_remove(uint64_t); 115 static int scheduler_ram_suspend(uint64_t); 116 static int scheduler_ram_resume(uint64_t); 117 static int scheduler_ram_query(uint64_t); 118 119 static void sorted_insert(struct rq_queue *, struct rq_envelope *); 120 121 static void rq_queue_init(struct rq_queue *); 122 static void rq_queue_merge(struct rq_queue *, struct rq_queue *); 123 static void rq_queue_dump(struct rq_queue *, const char *); 124 static void rq_queue_schedule(struct rq_queue *rq); 125 static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *); 126 static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *); 127 static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *); 128 static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *); 129 static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *); 130 static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *); 131 static const char *rq_envelope_to_text(struct rq_envelope *); 132 133 struct scheduler_backend scheduler_backend_ramqueue = { 134 scheduler_ram_init, 135 136 scheduler_ram_insert, 137 scheduler_ram_commit, 138 scheduler_ram_rollback, 139 140 scheduler_ram_update, 141 scheduler_ram_delete, 142 scheduler_ram_hold, 143 scheduler_ram_release, 144 145 scheduler_ram_batch, 146 147 scheduler_ram_messages, 148 scheduler_ram_envelopes, 149 scheduler_ram_schedule, 150 scheduler_ram_remove, 151 scheduler_ram_suspend, 152 scheduler_ram_resume, 153 scheduler_ram_query, 154 }; 155 156 static struct rq_queue ramqueue; 157 static struct tree updates; 158 static struct tree holdqs[3]; /* delivery type */ 159 160 static time_t currtime; 161 162 #define BACKOFF_TRANSFER 400 163 #define BACKOFF_DELIVERY 10 164 #define BACKOFF_OVERFLOW 3 165 166 static time_t 167 scheduler_backoff(time_t t0, time_t base, uint32_t step) 168 { 169 return (t0 + base * step * step); 170 } 171 172 static time_t 173 scheduler_next(time_t t0, time_t base, uint32_t step) 174 { 175 time_t t; 176 177 /* XXX be more efficient */ 178 while ((t = scheduler_backoff(t0, base, step)) <= currtime) 179 step++; 180 181 return (t); 182 } 183 184 static int 185 scheduler_ram_init(const char *arg) 186 { 187 rq_queue_init(&ramqueue); 188 tree_init(&updates); 189 tree_init(&holdqs[D_MDA]); 190 tree_init(&holdqs[D_MTA]); 191 tree_init(&holdqs[D_BOUNCE]); 192 193 return (1); 194 } 195 196 static int 197 scheduler_ram_insert(struct scheduler_info *si) 198 { 199 struct rq_queue *update; 200 struct rq_message *message; 201 struct rq_envelope *envelope; 202 uint32_t msgid; 203 204 currtime = time(NULL); 205 206 msgid = evpid_to_msgid(si->evpid); 207 208 /* find/prepare a ramqueue update */ 209 if ((update = tree_get(&updates, msgid)) == NULL) { 210 update = xcalloc(1, sizeof *update); 211 stat_increment("scheduler.ramqueue.update", 1); 212 rq_queue_init(update); 213 tree_xset(&updates, msgid, update); 214 } 215 216 /* find/prepare the msgtree message in ramqueue update */ 217 if ((message = tree_get(&update->messages, msgid)) == NULL) { 218 message = xcalloc(1, sizeof *message); 219 message->msgid = msgid; 220 tree_init(&message->envelopes); 221 tree_xset(&update->messages, msgid, message); 222 stat_increment("scheduler.ramqueue.message", 1); 223 } 224 225 /* create envelope in ramqueue message */ 226 envelope = xcalloc(1, sizeof *envelope); 227 envelope->evpid = si->evpid; 228 envelope->type = si->type; 229 envelope->message = message; 230 envelope->ctime = si->creation; 231 envelope->expire = si->creation + si->ttl; 232 envelope->sched = scheduler_backoff(si->creation, 233 (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); 234 tree_xset(&message->envelopes, envelope->evpid, envelope); 235 236 update->evpcount++; 237 stat_increment("scheduler.ramqueue.envelope", 1); 238 239 envelope->state = RQ_EVPSTATE_PENDING; 240 TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry); 241 242 si->nexttry = envelope->sched; 243 244 return (1); 245 } 246 247 static size_t 248 scheduler_ram_commit(uint32_t msgid) 249 { 250 struct rq_queue *update; 251 size_t r; 252 253 currtime = time(NULL); 254 255 update = tree_xpop(&updates, msgid); 256 r = update->evpcount; 257 258 if (tracing & TRACE_SCHEDULER) 259 rq_queue_dump(update, "update to commit"); 260 261 rq_queue_merge(&ramqueue, update); 262 263 if (tracing & TRACE_SCHEDULER) 264 rq_queue_dump(&ramqueue, "resulting queue"); 265 266 rq_queue_schedule(&ramqueue); 267 268 free(update); 269 stat_decrement("scheduler.ramqueue.update", 1); 270 271 return (r); 272 } 273 274 static size_t 275 scheduler_ram_rollback(uint32_t msgid) 276 { 277 struct rq_queue *update; 278 struct rq_envelope *evp; 279 size_t r; 280 281 currtime = time(NULL); 282 283 if ((update = tree_pop(&updates, msgid)) == NULL) 284 return (0); 285 r = update->evpcount; 286 287 while ((evp = TAILQ_FIRST(&update->q_pending))) { 288 TAILQ_REMOVE(&update->q_pending, evp, entry); 289 rq_envelope_delete(update, evp); 290 } 291 292 free(update); 293 stat_decrement("scheduler.ramqueue.update", 1); 294 295 return (r); 296 } 297 298 static int 299 scheduler_ram_update(struct scheduler_info *si) 300 { 301 struct rq_message *msg; 302 struct rq_envelope *evp; 303 uint32_t msgid; 304 305 currtime = time(NULL); 306 307 msgid = evpid_to_msgid(si->evpid); 308 msg = tree_xget(&ramqueue.messages, msgid); 309 evp = tree_xget(&msg->envelopes, si->evpid); 310 311 /* it *must* be in-flight */ 312 if (evp->state != RQ_EVPSTATE_INFLIGHT) 313 errx(1, "evp:%016" PRIx64 " not in-flight", si->evpid); 314 315 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 316 317 /* 318 * If the envelope was removed while inflight, schedule it for 319 * removal immediately. 320 */ 321 if (evp->flags & RQ_ENVELOPE_REMOVED) { 322 TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry); 323 evp->state = RQ_EVPSTATE_SCHEDULED; 324 evp->t_scheduled = currtime; 325 return (1); 326 } 327 328 evp->sched = scheduler_next(evp->ctime, 329 (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); 330 331 evp->state = RQ_EVPSTATE_PENDING; 332 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 333 sorted_insert(&ramqueue, evp); 334 335 si->nexttry = evp->sched; 336 337 return (1); 338 } 339 340 static int 341 scheduler_ram_delete(uint64_t evpid) 342 { 343 struct rq_message *msg; 344 struct rq_envelope *evp; 345 uint32_t msgid; 346 347 currtime = time(NULL); 348 349 msgid = evpid_to_msgid(evpid); 350 msg = tree_xget(&ramqueue.messages, msgid); 351 evp = tree_xget(&msg->envelopes, evpid); 352 353 /* it *must* be in-flight */ 354 if (evp->state != RQ_EVPSTATE_INFLIGHT) 355 errx(1, "evp:%016" PRIx64 " not in-flight", evpid); 356 357 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 358 359 rq_envelope_delete(&ramqueue, evp); 360 361 return (1); 362 } 363 364 #define HOLDQ_MAXSIZE 1000 365 366 static int 367 scheduler_ram_hold(uint64_t evpid, uint64_t holdq) 368 { 369 struct rq_holdq *hq; 370 struct rq_message *msg; 371 struct rq_envelope *evp; 372 uint32_t msgid; 373 374 currtime = time(NULL); 375 376 msgid = evpid_to_msgid(evpid); 377 msg = tree_xget(&ramqueue.messages, msgid); 378 evp = tree_xget(&msg->envelopes, evpid); 379 380 /* it *must* be in-flight */ 381 if (evp->state != RQ_EVPSTATE_INFLIGHT) 382 errx(1, "evp:%016" PRIx64 " not in-flight", evpid); 383 384 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 385 386 /* If the envelope is suspended, just mark it as pending */ 387 if (evp->flags & RQ_ENVELOPE_SUSPEND) { 388 evp->state = RQ_EVPSTATE_PENDING; 389 return (0); 390 } 391 392 hq = tree_get(&holdqs[evp->type], holdq); 393 if (hq == NULL) { 394 hq = xcalloc(1, sizeof(*hq)); 395 TAILQ_INIT(&hq->q); 396 tree_xset(&holdqs[evp->type], holdq, hq); 397 stat_increment("scheduler.ramqueue.holdq", 1); 398 } 399 400 /* If the holdq is full, just "tempfail" the envelope */ 401 if (hq->count >= HOLDQ_MAXSIZE) { 402 evp->state = RQ_EVPSTATE_PENDING; 403 evp->flags |= RQ_ENVELOPE_UPDATE; 404 evp->flags |= RQ_ENVELOPE_OVERFLOW; 405 sorted_insert(&ramqueue, evp); 406 stat_increment("scheduler.ramqueue.hold-overflow", 1); 407 return (0); 408 } 409 410 evp->state = RQ_EVPSTATE_HELD; 411 evp->holdq = holdq; 412 /* This is an optimization: upon release, the envelopes will be 413 * inserted in the pending queue from the first element to the last. 414 * Since elements already in the queue were received first, they 415 * were scheduled first, so they will be reinserted before the 416 * current element. 417 */ 418 TAILQ_INSERT_HEAD(&hq->q, evp, entry); 419 hq->count += 1; 420 stat_increment("scheduler.ramqueue.hold", 1); 421 422 return (1); 423 } 424 425 static int 426 scheduler_ram_release(int type, uint64_t holdq, int n) 427 { 428 struct rq_holdq *hq; 429 struct rq_envelope *evp; 430 int i, update; 431 432 currtime = time(NULL); 433 434 hq = tree_get(&holdqs[type], holdq); 435 if (hq == NULL) 436 return (0); 437 438 if (n == -1) { 439 n = 0; 440 update = 1; 441 } 442 else 443 update = 0; 444 445 for (i = 0; n == 0 || i < n; i++) { 446 evp = TAILQ_FIRST(&hq->q); 447 if (evp == NULL) 448 break; 449 450 TAILQ_REMOVE(&hq->q, evp, entry); 451 hq->count -= 1; 452 evp->holdq = 0; 453 454 /* When released, all envelopes are put in the pending queue 455 * and will be rescheduled immediately. As an optimization, 456 * we could just schedule them directly. 457 */ 458 evp->state = RQ_EVPSTATE_PENDING; 459 if (update) 460 evp->flags |= RQ_ENVELOPE_UPDATE; 461 sorted_insert(&ramqueue, evp); 462 } 463 464 if (TAILQ_EMPTY(&hq->q)) { 465 tree_xpop(&holdqs[type], holdq); 466 free(hq); 467 stat_decrement("scheduler.ramqueue.holdq", 1); 468 } 469 stat_decrement("scheduler.ramqueue.hold", i); 470 471 return (i); 472 } 473 474 static int 475 scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types) 476 { 477 struct rq_envelope *evp; 478 size_t i, n; 479 time_t t; 480 481 currtime = time(NULL); 482 483 rq_queue_schedule(&ramqueue); 484 if (tracing & TRACE_SCHEDULER) 485 rq_queue_dump(&ramqueue, "scheduler_ram_batch()"); 486 487 i = 0; 488 n = 0; 489 490 for (;;) { 491 492 if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) { 493 TAILQ_REMOVE(&ramqueue.q_removed, evp, entry); 494 types[i] = SCHED_REMOVE; 495 evpids[i] = evp->evpid; 496 rq_envelope_delete(&ramqueue, evp); 497 498 if (++i == *count) 499 break; 500 } 501 502 if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) { 503 TAILQ_REMOVE(&ramqueue.q_expired, evp, entry); 504 types[i] = SCHED_EXPIRE; 505 evpids[i] = evp->evpid; 506 rq_envelope_delete(&ramqueue, evp); 507 508 if (++i == *count) 509 break; 510 } 511 512 if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) { 513 TAILQ_REMOVE(&ramqueue.q_update, evp, entry); 514 types[i] = SCHED_UPDATE; 515 evpids[i] = evp->evpid; 516 517 if (evp->flags & RQ_ENVELOPE_OVERFLOW) 518 t = BACKOFF_OVERFLOW; 519 else if (evp->type == D_MTA) 520 t = BACKOFF_TRANSFER; 521 else 522 t = BACKOFF_DELIVERY; 523 524 evp->sched = scheduler_next(evp->ctime, t, 0); 525 evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW); 526 evp->state = RQ_EVPSTATE_PENDING; 527 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 528 sorted_insert(&ramqueue, evp); 529 530 if (++i == *count) 531 break; 532 } 533 534 if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) { 535 TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry); 536 types[i] = SCHED_BOUNCE; 537 evpids[i] = evp->evpid; 538 539 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 540 evp->state = RQ_EVPSTATE_INFLIGHT; 541 evp->t_inflight = currtime; 542 543 if (++i == *count) 544 break; 545 } 546 547 if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) { 548 TAILQ_REMOVE(&ramqueue.q_mda, evp, entry); 549 types[i] = SCHED_MDA; 550 evpids[i] = evp->evpid; 551 552 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 553 evp->state = RQ_EVPSTATE_INFLIGHT; 554 evp->t_inflight = currtime; 555 556 if (++i == *count) 557 break; 558 } 559 560 if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) { 561 TAILQ_REMOVE(&ramqueue.q_mta, evp, entry); 562 types[i] = SCHED_MTA; 563 evpids[i] = evp->evpid; 564 565 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 566 evp->state = RQ_EVPSTATE_INFLIGHT; 567 evp->t_inflight = currtime; 568 569 if (++i == *count) 570 break; 571 } 572 573 /* nothing seen this round */ 574 if (i == n) 575 break; 576 577 n = i; 578 } 579 580 if (i) { 581 *count = i; 582 return (1); 583 } 584 585 if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) { 586 if (evp->sched < evp->expire) 587 t = evp->sched; 588 else 589 t = evp->expire; 590 *delay = (t < currtime) ? 0 : (t - currtime); 591 } 592 else 593 *delay = -1; 594 595 return (0); 596 } 597 598 static size_t 599 scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size) 600 { 601 uint64_t id; 602 size_t n; 603 void *i; 604 605 for (n = 0, i = NULL; n < size; n++) { 606 if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0) 607 break; 608 dst[n] = id; 609 } 610 611 return (n); 612 } 613 614 static size_t 615 scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size) 616 { 617 struct rq_message *msg; 618 struct rq_envelope *evp; 619 void *i; 620 size_t n; 621 622 if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL) 623 return (0); 624 625 for (n = 0, i = NULL; n < size; ) { 626 627 if (tree_iterfrom(&msg->envelopes, &i, from, NULL, 628 (void**)&evp) == 0) 629 break; 630 631 if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) 632 continue; 633 634 dst[n].evpid = evp->evpid; 635 dst[n].flags = 0; 636 dst[n].retry = 0; 637 dst[n].time = 0; 638 639 if (evp->state == RQ_EVPSTATE_PENDING) { 640 dst[n].time = evp->sched; 641 dst[n].flags = EF_PENDING; 642 } 643 else if (evp->state == RQ_EVPSTATE_SCHEDULED) { 644 dst[n].time = evp->t_scheduled; 645 dst[n].flags = EF_PENDING; 646 } 647 else if (evp->state == RQ_EVPSTATE_INFLIGHT) { 648 dst[n].time = evp->t_inflight; 649 dst[n].flags = EF_INFLIGHT; 650 } 651 else if (evp->state == RQ_EVPSTATE_HELD) { 652 /* same as scheduled */ 653 dst[n].time = evp->t_scheduled; 654 dst[n].flags = EF_PENDING; 655 dst[n].flags |= EF_HOLD; 656 } 657 if (evp->flags & RQ_ENVELOPE_SUSPEND) 658 dst[n].flags |= EF_SUSPEND; 659 660 n++; 661 } 662 663 return (n); 664 } 665 666 static int 667 scheduler_ram_schedule(uint64_t evpid) 668 { 669 struct rq_message *msg; 670 struct rq_envelope *evp; 671 uint32_t msgid; 672 void *i; 673 int r; 674 675 currtime = time(NULL); 676 677 if (evpid > 0xffffffff) { 678 msgid = evpid_to_msgid(evpid); 679 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 680 return (0); 681 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 682 return (0); 683 if (evp->state == RQ_EVPSTATE_INFLIGHT) 684 return (0); 685 rq_envelope_schedule(&ramqueue, evp); 686 return (1); 687 } 688 else { 689 msgid = evpid; 690 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 691 return (0); 692 i = NULL; 693 r = 0; 694 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) { 695 if (evp->state == RQ_EVPSTATE_INFLIGHT) 696 continue; 697 rq_envelope_schedule(&ramqueue, evp); 698 r++; 699 } 700 return (r); 701 } 702 } 703 704 static int 705 scheduler_ram_remove(uint64_t evpid) 706 { 707 struct rq_message *msg; 708 struct rq_envelope *evp; 709 uint32_t msgid; 710 void *i; 711 int r; 712 713 currtime = time(NULL); 714 715 if (evpid > 0xffffffff) { 716 msgid = evpid_to_msgid(evpid); 717 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 718 return (0); 719 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 720 return (0); 721 if (rq_envelope_remove(&ramqueue, evp)) 722 return (1); 723 return (0); 724 } 725 else { 726 msgid = evpid; 727 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 728 return (0); 729 i = NULL; 730 r = 0; 731 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 732 if (rq_envelope_remove(&ramqueue, evp)) 733 r++; 734 return (r); 735 } 736 } 737 738 static int 739 scheduler_ram_suspend(uint64_t evpid) 740 { 741 struct rq_message *msg; 742 struct rq_envelope *evp; 743 uint32_t msgid; 744 void *i; 745 int r; 746 747 currtime = time(NULL); 748 749 if (evpid > 0xffffffff) { 750 msgid = evpid_to_msgid(evpid); 751 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 752 return (0); 753 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 754 return (0); 755 if (rq_envelope_suspend(&ramqueue, evp)) 756 return (1); 757 return (0); 758 } 759 else { 760 msgid = evpid; 761 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 762 return (0); 763 i = NULL; 764 r = 0; 765 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 766 if (rq_envelope_suspend(&ramqueue, evp)) 767 r++; 768 return (r); 769 } 770 } 771 772 static int 773 scheduler_ram_resume(uint64_t evpid) 774 { 775 struct rq_message *msg; 776 struct rq_envelope *evp; 777 uint32_t msgid; 778 void *i; 779 int r; 780 781 currtime = time(NULL); 782 783 if (evpid > 0xffffffff) { 784 msgid = evpid_to_msgid(evpid); 785 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 786 return (0); 787 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 788 return (0); 789 if (rq_envelope_resume(&ramqueue, evp)) 790 return (1); 791 return (0); 792 } 793 else { 794 msgid = evpid; 795 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 796 return (0); 797 i = NULL; 798 r = 0; 799 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 800 if (rq_envelope_resume(&ramqueue, evp)) 801 r++; 802 return (r); 803 } 804 } 805 806 static int 807 scheduler_ram_query(uint64_t evpid) 808 { 809 uint32_t msgid; 810 811 if (evpid > 0xffffffff) 812 msgid = evpid_to_msgid(evpid); 813 else 814 msgid = evpid; 815 816 if (tree_get(&ramqueue.messages, msgid) == NULL) 817 return (0); 818 819 return (1); 820 } 821 822 static void 823 sorted_insert(struct rq_queue *rq, struct rq_envelope *evp) 824 { 825 struct rq_envelope *evp2; 826 827 SPLAY_INSERT(prioqtree, &rq->q_priotree, evp); 828 evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp); 829 if (evp2) 830 TAILQ_INSERT_BEFORE(evp2, evp, entry); 831 else 832 TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry); 833 } 834 835 static void 836 rq_queue_init(struct rq_queue *rq) 837 { 838 memset(rq, 0, sizeof *rq); 839 tree_init(&rq->messages); 840 TAILQ_INIT(&rq->q_pending); 841 TAILQ_INIT(&rq->q_inflight); 842 TAILQ_INIT(&rq->q_mta); 843 TAILQ_INIT(&rq->q_mda); 844 TAILQ_INIT(&rq->q_bounce); 845 TAILQ_INIT(&rq->q_update); 846 TAILQ_INIT(&rq->q_expired); 847 TAILQ_INIT(&rq->q_removed); 848 SPLAY_INIT(&rq->q_priotree); 849 } 850 851 static void 852 rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) 853 { 854 struct rq_message *message, *tomessage; 855 struct rq_envelope *envelope; 856 uint64_t id; 857 void *i; 858 859 while (tree_poproot(&update->messages, &id, (void*)&message)) { 860 if ((tomessage = tree_get(&rq->messages, id)) == NULL) { 861 /* message does not exist. re-use structure */ 862 tree_xset(&rq->messages, id, message); 863 continue; 864 } 865 /* need to re-link all envelopes before merging them */ 866 i = NULL; 867 while ((tree_iter(&message->envelopes, &i, &id, 868 (void*)&envelope))) 869 envelope->message = tomessage; 870 tree_merge(&tomessage->envelopes, &message->envelopes); 871 free(message); 872 stat_decrement("scheduler.ramqueue.message", 1); 873 } 874 875 /* Sorted insert in the pending queue */ 876 while ((envelope = TAILQ_FIRST(&update->q_pending))) { 877 TAILQ_REMOVE(&update->q_pending, envelope, entry); 878 sorted_insert(rq, envelope); 879 } 880 881 rq->evpcount += update->evpcount; 882 } 883 884 #define SCHEDULEMAX 1024 885 886 static void 887 rq_queue_schedule(struct rq_queue *rq) 888 { 889 struct rq_envelope *evp; 890 size_t n; 891 892 n = 0; 893 while ((evp = TAILQ_FIRST(&rq->q_pending))) { 894 if (evp->sched > currtime && evp->expire > currtime) 895 break; 896 897 if (n == SCHEDULEMAX) 898 break; 899 900 if (evp->state != RQ_EVPSTATE_PENDING) 901 errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid, 902 evp->flags); 903 904 if (evp->expire <= currtime) { 905 TAILQ_REMOVE(&rq->q_pending, evp, entry); 906 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 907 TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry); 908 evp->state = RQ_EVPSTATE_SCHEDULED; 909 evp->flags |= RQ_ENVELOPE_EXPIRED; 910 evp->t_scheduled = currtime; 911 continue; 912 } 913 rq_envelope_schedule(rq, evp); 914 n += 1; 915 } 916 } 917 918 static struct evplist * 919 rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp) 920 { 921 switch (evp->state) { 922 case RQ_EVPSTATE_PENDING: 923 return &rq->q_pending; 924 925 case RQ_EVPSTATE_SCHEDULED: 926 if (evp->flags & RQ_ENVELOPE_EXPIRED) 927 return &rq->q_expired; 928 if (evp->flags & RQ_ENVELOPE_REMOVED) 929 return &rq->q_removed; 930 if (evp->flags & RQ_ENVELOPE_UPDATE) 931 return &rq->q_update; 932 if (evp->type == D_MTA) 933 return &rq->q_mta; 934 if (evp->type == D_MDA) 935 return &rq->q_mda; 936 if (evp->type == D_BOUNCE) 937 return &rq->q_bounce; 938 errx(1, "%016" PRIx64 " bad evp type %d", evp->evpid, evp->type); 939 940 case RQ_EVPSTATE_INFLIGHT: 941 return &rq->q_inflight; 942 943 case RQ_EVPSTATE_HELD: 944 return (NULL); 945 } 946 947 errx(1, "%016" PRIx64 " bad state %d", evp->evpid, evp->state); 948 return (NULL); 949 } 950 951 static void 952 rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) 953 { 954 struct rq_holdq *hq; 955 struct evplist *q = NULL; 956 957 switch (evp->type) { 958 case D_MTA: 959 q = &rq->q_mta; 960 break; 961 case D_MDA: 962 q = &rq->q_mda; 963 break; 964 case D_BOUNCE: 965 q = &rq->q_bounce; 966 break; 967 } 968 969 if (evp->flags & RQ_ENVELOPE_UPDATE) 970 q = &rq->q_update; 971 972 if (evp->state == RQ_EVPSTATE_HELD) { 973 hq = tree_xget(&holdqs[evp->type], evp->holdq); 974 TAILQ_REMOVE(&hq->q, evp, entry); 975 hq->count -= 1; 976 if (TAILQ_EMPTY(&hq->q)) { 977 tree_xpop(&holdqs[evp->type], evp->holdq); 978 free(hq); 979 } 980 evp->holdq = 0; 981 stat_decrement("scheduler.ramqueue.hold", 1); 982 } 983 else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { 984 TAILQ_REMOVE(&rq->q_pending, evp, entry); 985 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 986 } 987 988 TAILQ_INSERT_TAIL(q, evp, entry); 989 evp->state = RQ_EVPSTATE_SCHEDULED; 990 evp->t_scheduled = currtime; 991 } 992 993 static int 994 rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) 995 { 996 struct rq_holdq *hq; 997 struct evplist *evl; 998 999 if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) 1000 return (0); 1001 /* 1002 * If envelope is inflight, mark it envelope for removal. 1003 */ 1004 if (evp->state == RQ_EVPSTATE_INFLIGHT) { 1005 evp->flags |= RQ_ENVELOPE_REMOVED; 1006 return (1); 1007 } 1008 1009 if (evp->state == RQ_EVPSTATE_HELD) { 1010 hq = tree_xget(&holdqs[evp->type], evp->holdq); 1011 TAILQ_REMOVE(&hq->q, evp, entry); 1012 hq->count -= 1; 1013 if (TAILQ_EMPTY(&hq->q)) { 1014 tree_xpop(&holdqs[evp->type], evp->holdq); 1015 free(hq); 1016 } 1017 evp->holdq = 0; 1018 stat_decrement("scheduler.ramqueue.hold", 1); 1019 } 1020 else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { 1021 evl = rq_envelope_list(rq, evp); 1022 TAILQ_REMOVE(evl, evp, entry); 1023 if (evl == &rq->q_pending) 1024 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 1025 } 1026 1027 TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry); 1028 evp->state = RQ_EVPSTATE_SCHEDULED; 1029 evp->flags |= RQ_ENVELOPE_REMOVED; 1030 evp->t_scheduled = currtime; 1031 1032 return (1); 1033 } 1034 1035 static int 1036 rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) 1037 { 1038 struct rq_holdq *hq; 1039 struct evplist *evl; 1040 1041 if (evp->flags & RQ_ENVELOPE_SUSPEND) 1042 return (0); 1043 1044 if (evp->state == RQ_EVPSTATE_HELD) { 1045 hq = tree_xget(&holdqs[evp->type], evp->holdq); 1046 TAILQ_REMOVE(&hq->q, evp, entry); 1047 hq->count -= 1; 1048 if (TAILQ_EMPTY(&hq->q)) { 1049 tree_xpop(&holdqs[evp->type], evp->holdq); 1050 free(hq); 1051 } 1052 evp->holdq = 0; 1053 evp->state = RQ_EVPSTATE_PENDING; 1054 stat_decrement("scheduler.ramqueue.hold", 1); 1055 } 1056 else if (evp->state != RQ_EVPSTATE_INFLIGHT) { 1057 evl = rq_envelope_list(rq, evp); 1058 TAILQ_REMOVE(evl, evp, entry); 1059 if (evl == &rq->q_pending) 1060 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 1061 } 1062 1063 evp->flags |= RQ_ENVELOPE_SUSPEND; 1064 1065 return (1); 1066 } 1067 1068 static int 1069 rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp) 1070 { 1071 struct evplist *evl; 1072 1073 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 1074 return (0); 1075 1076 if (evp->state != RQ_EVPSTATE_INFLIGHT) { 1077 evl = rq_envelope_list(rq, evp); 1078 if (evl == &rq->q_pending) 1079 sorted_insert(rq, evp); 1080 else 1081 TAILQ_INSERT_TAIL(evl, evp, entry); 1082 } 1083 1084 evp->flags &= ~RQ_ENVELOPE_SUSPEND; 1085 1086 return (1); 1087 } 1088 1089 static void 1090 rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp) 1091 { 1092 tree_xpop(&evp->message->envelopes, evp->evpid); 1093 if (tree_empty(&evp->message->envelopes)) { 1094 tree_xpop(&rq->messages, evp->message->msgid); 1095 free(evp->message); 1096 stat_decrement("scheduler.ramqueue.message", 1); 1097 } 1098 1099 free(evp); 1100 rq->evpcount--; 1101 stat_decrement("scheduler.ramqueue.envelope", 1); 1102 } 1103 1104 static const char * 1105 rq_envelope_to_text(struct rq_envelope *e) 1106 { 1107 static char buf[256]; 1108 char t[64]; 1109 1110 (void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid); 1111 1112 if (e->type == D_BOUNCE) 1113 (void)strlcat(buf, "bounce", sizeof buf); 1114 else if (e->type == D_MDA) 1115 (void)strlcat(buf, "mda", sizeof buf); 1116 else if (e->type == D_MTA) 1117 (void)strlcat(buf, "mta", sizeof buf); 1118 1119 (void)snprintf(t, sizeof t, ",expire=%s", 1120 duration_to_text(e->expire - currtime)); 1121 (void)strlcat(buf, t, sizeof buf); 1122 1123 1124 switch (e->state) { 1125 case RQ_EVPSTATE_PENDING: 1126 (void)snprintf(t, sizeof t, ",pending=%s", 1127 duration_to_text(e->sched - currtime)); 1128 (void)strlcat(buf, t, sizeof buf); 1129 break; 1130 1131 case RQ_EVPSTATE_SCHEDULED: 1132 (void)snprintf(t, sizeof t, ",scheduled=%s", 1133 duration_to_text(currtime - e->t_scheduled)); 1134 (void)strlcat(buf, t, sizeof buf); 1135 break; 1136 1137 case RQ_EVPSTATE_INFLIGHT: 1138 (void)snprintf(t, sizeof t, ",inflight=%s", 1139 duration_to_text(currtime - e->t_inflight)); 1140 (void)strlcat(buf, t, sizeof buf); 1141 break; 1142 1143 case RQ_EVPSTATE_HELD: 1144 (void)snprintf(t, sizeof t, ",held=%s", 1145 duration_to_text(currtime - e->t_inflight)); 1146 (void)strlcat(buf, t, sizeof buf); 1147 break; 1148 default: 1149 errx(1, "%016" PRIx64 " bad state %d", e->evpid, e->state); 1150 } 1151 1152 if (e->flags & RQ_ENVELOPE_REMOVED) 1153 (void)strlcat(buf, ",removed", sizeof buf); 1154 if (e->flags & RQ_ENVELOPE_EXPIRED) 1155 (void)strlcat(buf, ",expired", sizeof buf); 1156 if (e->flags & RQ_ENVELOPE_SUSPEND) 1157 (void)strlcat(buf, ",suspended", sizeof buf); 1158 1159 (void)strlcat(buf, "]", sizeof buf); 1160 1161 return (buf); 1162 } 1163 1164 static void 1165 rq_queue_dump(struct rq_queue *rq, const char * name) 1166 { 1167 struct rq_message *message; 1168 struct rq_envelope *envelope; 1169 void *i, *j; 1170 uint64_t id; 1171 1172 log_debug("debug: /--- ramqueue: %s", name); 1173 1174 i = NULL; 1175 while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) { 1176 log_debug("debug: | msg:%08" PRIx32, message->msgid); 1177 j = NULL; 1178 while ((tree_iter(&message->envelopes, &j, &id, 1179 (void*)&envelope))) 1180 log_debug("debug: | %s", 1181 rq_envelope_to_text(envelope)); 1182 } 1183 log_debug("debug: \\---"); 1184 } 1185 1186 static int 1187 rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2) 1188 { 1189 time_t ref1, ref2; 1190 1191 ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire; 1192 ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire; 1193 if (ref1 != ref2) 1194 return (ref1 < ref2) ? -1 : 1; 1195 1196 if (e1->evpid != e2->evpid) 1197 return (e1->evpid < e2->evpid) ? -1 : 1; 1198 1199 return 0; 1200 } 1201 1202 SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); 1203