1 /* $NetBSD: qmgr_job.c,v 1.2 2017/02/14 01:16:47 christos Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmgr_job 3 6 /* SUMMARY 7 /* per-transport jobs 8 /* SYNOPSIS 9 /* #include "qmgr.h" 10 /* 11 /* QMGR_JOB *qmgr_job_obtain(message, transport) 12 /* QMGR_MESSAGE *message; 13 /* QMGR_TRANSPORT *transport; 14 /* 15 /* void qmgr_job_free(job) 16 /* QMGR_JOB *job; 17 /* 18 /* void qmgr_job_move_limits(job) 19 /* QMGR_JOB *job; 20 /* 21 /* QMGR_ENTRY *qmgr_job_entry_select(transport) 22 /* QMGR_TRANSPORT *transport; 23 /* 24 /* void qmgr_job_blocker_update(queue) 25 /* QMGR_QUEUE *queue; 26 /* DESCRIPTION 27 /* These routines add/delete/manipulate per-transport jobs. 28 /* Each job corresponds to a specific transport and message. 29 /* Each job has a peer list containing all pending delivery 30 /* requests for that message. 31 /* 32 /* qmgr_job_obtain() finds an existing job for named message and 33 /* transport combination. New empty job is created if no existing can 34 /* be found. In either case, the job is prepared for assignment of 35 /* (more) message recipients. 36 /* 37 /* qmgr_job_free() disposes of a per-transport job after all 38 /* its entries have been taken care of. It is an error to dispose 39 /* of a job that is still in use. 40 /* 41 /* qmgr_job_entry_select() attempts to find the next entry suitable 42 /* for delivery. The job preempting algorithm is also exercised. 43 /* If necessary, an attempt to read more recipients into core is made. 44 /* This can result in creation of more job, queue and entry structures. 45 /* 46 /* qmgr_job_blocker_update() updates the status of blocked 47 /* jobs after a decrease in the queue's concurrency level, 48 /* after the queue is throttled, or after the queue is resumed 49 /* from suspension. 50 /* 51 /* qmgr_job_move_limits() takes care of proper distribution of the 52 /* per-transport recipients limit among the per-transport jobs. 53 /* Should be called whenever a job's recipient slot becomes available. 54 /* DIAGNOSTICS 55 /* Panic: consistency check failure. 56 /* LICENSE 57 /* .ad 58 /* .fi 59 /* The Secure Mailer license must be distributed with this software. 60 /* AUTHOR(S) 61 /* Patrik Rak 62 /* patrik@raxoft.cz 63 /*--*/ 64 65 /* System library. */ 66 67 #include <sys_defs.h> 68 69 /* Utility library. */ 70 71 #include <msg.h> 72 #include <htable.h> 73 #include <mymalloc.h> 74 #include <sane_time.h> 75 76 /* Application-specific. */ 77 78 #include "qmgr.h" 79 80 /* Forward declarations */ 81 82 static void qmgr_job_pop(QMGR_JOB *); 83 84 /* Helper macros */ 85 86 #define HAS_ENTRIES(job) ((job)->selected_entries < (job)->read_entries) 87 88 /* 89 * The MIN_ENTRIES macro may underestimate a lot but we can't use message->rcpt_unread 90 * because we don't know if all those unread recipients go to our transport yet. 91 */ 92 93 #define MIN_ENTRIES(job) ((job)->read_entries) 94 #define MAX_ENTRIES(job) ((job)->read_entries + (job)->message->rcpt_unread) 95 96 #define RESET_CANDIDATE_CACHE(transport) ((transport)->candidate_cache_current = 0) 97 98 #define IS_BLOCKER(job,transport) ((job)->blocker_tag == (transport)->blocker_tag) 99 100 /* qmgr_job_create - create and initialize message job structure */ 101 102 static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) 103 { 104 QMGR_JOB *job; 105 106 job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB)); 107 job->message = message; 108 QMGR_LIST_APPEND(message->job_list, job, message_peers); 109 htable_enter(transport->job_byname, message->queue_id, (void *) job); 110 job->transport = transport; 111 QMGR_LIST_INIT(job->transport_peers); 112 QMGR_LIST_INIT(job->time_peers); 113 job->stack_parent = 0; 114 QMGR_LIST_INIT(job->stack_children); 115 QMGR_LIST_INIT(job->stack_siblings); 116 job->stack_level = -1; 117 job->blocker_tag = 0; 118 job->peer_byname = htable_create(0); 119 QMGR_LIST_INIT(job->peer_list); 120 job->slots_used = 0; 121 job->slots_available = 0; 122 job->selected_entries = 0; 123 job->read_entries = 0; 124 job->rcpt_count = 0; 125 job->rcpt_limit = 0; 126 return (job); 127 } 128 129 /* qmgr_job_link - append the job to the job lists based on the time it was queued */ 130 131 static void qmgr_job_link(QMGR_JOB *job) 132 { 133 QMGR_TRANSPORT *transport = job->transport; 134 QMGR_MESSAGE *message = job->message; 135 QMGR_JOB *prev, *next, *list_prev, *list_next, *unread, *current; 136 int delay; 137 138 /* 139 * Sanity checks. 140 */ 141 if (job->stack_level >= 0) 142 msg_panic("qmgr_job_link: already on the job lists (%d)", job->stack_level); 143 144 /* 145 * Traverse the time list and the scheduler list from the end and stop 146 * when we found job older than the one being linked. 147 * 148 * During the traversals keep track if we have come across either the 149 * current job or the first unread job on the job list. If this is the 150 * case, these pointers will be adjusted below as required. 151 * 152 * Although both lists are exactly the same when only jobs on the stack 153 * level zero are considered, it's easier to traverse them separately. 154 * Otherwise it's impossible to keep track of the current job pointer 155 * effectively. 156 * 157 * This may look inefficient but under normal operation it is expected that 158 * the loops will stop right away, resulting in normal list appends 159 * below. However, this code is necessary for reviving retired jobs and 160 * for jobs which are created long after the first chunk of recipients 161 * was read in-core (either of these can happen only for multi-transport 162 * messages). 163 * 164 * XXX Note that we test stack_parent rather than stack_level below. This 165 * subtle difference allows us to enqueue the job in correct time order 166 * with respect to orphaned children even after their original parent on 167 * level zero is gone. Consequently, the early loop stop in candidate 168 * selection works reliably, too. These are the reasons why we care to 169 * bother with children adoption at all. 170 */ 171 current = transport->job_current; 172 for (next = 0, prev = transport->job_list.prev; prev; 173 next = prev, prev = prev->transport_peers.prev) { 174 if (prev->stack_parent == 0) { 175 delay = message->queued_time - prev->message->queued_time; 176 if (delay >= 0) 177 break; 178 } 179 if (current == prev) 180 current = 0; 181 } 182 list_prev = prev; 183 list_next = next; 184 185 unread = transport->job_next_unread; 186 for (next = 0, prev = transport->job_bytime.prev; prev; 187 next = prev, prev = prev->time_peers.prev) { 188 delay = message->queued_time - prev->message->queued_time; 189 if (delay >= 0) 190 break; 191 if (unread == prev) 192 unread = 0; 193 } 194 195 /* 196 * Link the job into the proper place on the job lists and mark it so we 197 * know it has been linked. 198 */ 199 job->stack_level = 0; 200 QMGR_LIST_LINK(transport->job_list, list_prev, job, list_next, transport_peers); 201 QMGR_LIST_LINK(transport->job_bytime, prev, job, next, time_peers); 202 203 /* 204 * Update the current job pointer if necessary. 205 */ 206 if (current == 0) 207 transport->job_current = job; 208 209 /* 210 * Update the pointer to the first unread job on the job list and steal 211 * the unused recipient slots from the old one. 212 */ 213 if (unread == 0) { 214 unread = transport->job_next_unread; 215 transport->job_next_unread = job; 216 if (unread != 0) 217 qmgr_job_move_limits(unread); 218 } 219 220 /* 221 * Get as much recipient slots as possible. The excess will be returned 222 * to the transport pool as soon as the exact amount required is known 223 * (which is usually after all recipients have been read in core). 224 */ 225 if (transport->rcpt_unused > 0) { 226 job->rcpt_limit += transport->rcpt_unused; 227 message->rcpt_limit += transport->rcpt_unused; 228 transport->rcpt_unused = 0; 229 } 230 } 231 232 /* qmgr_job_find - lookup job associated with named message and transport */ 233 234 static QMGR_JOB *qmgr_job_find(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) 235 { 236 237 /* 238 * Instead of traversing the message job list, we use single per 239 * transport hash table. This is better (at least with respect to memory 240 * usage) than having single hash table (usually almost empty) for each 241 * message. 242 */ 243 return ((QMGR_JOB *) htable_find(transport->job_byname, message->queue_id)); 244 } 245 246 /* qmgr_job_obtain - find/create the appropriate job and make it ready for new recipients */ 247 248 QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) 249 { 250 QMGR_JOB *job; 251 252 /* 253 * Try finding an existing job, reviving it if it was already retired. 254 * Create a new job for this transport/message combination otherwise. In 255 * either case, the job ends linked on the job lists. 256 */ 257 if ((job = qmgr_job_find(message, transport)) == 0) 258 job = qmgr_job_create(message, transport); 259 if (job->stack_level < 0) 260 qmgr_job_link(job); 261 262 /* 263 * Reset the candidate cache because of the new expected recipients. Make 264 * sure the job is not marked as a blocker for the same reason. Note that 265 * this can result in having a non-blocker followed by more blockers. 266 * Consequently, we can't just update the current job pointer, we have to 267 * reset it. Fortunately qmgr_job_entry_select() will easily deal with 268 * this and will lookup the real current job for us. 269 */ 270 RESET_CANDIDATE_CACHE(transport); 271 if (IS_BLOCKER(job, transport)) { 272 job->blocker_tag = 0; 273 transport->job_current = transport->job_list.next; 274 } 275 return (job); 276 } 277 278 /* qmgr_job_move_limits - move unused recipient slots to the next unread job */ 279 280 void qmgr_job_move_limits(QMGR_JOB *job) 281 { 282 QMGR_TRANSPORT *transport = job->transport; 283 QMGR_MESSAGE *message = job->message; 284 QMGR_JOB *next = transport->job_next_unread; 285 int rcpt_unused, msg_rcpt_unused; 286 287 /* 288 * Find next unread job on the job list if necessary. Cache it for later. 289 * This makes the amortized efficiency of this routine O(1) per job. Note 290 * that we use the time list whose ordering doesn't change over time. 291 */ 292 if (job == next) { 293 for (next = next->time_peers.next; next; next = next->time_peers.next) 294 if (next->message->rcpt_offset != 0) 295 break; 296 transport->job_next_unread = next; 297 } 298 299 /* 300 * Calculate the number of available unused slots. 301 */ 302 rcpt_unused = job->rcpt_limit - job->rcpt_count; 303 msg_rcpt_unused = message->rcpt_limit - message->rcpt_count; 304 if (msg_rcpt_unused < rcpt_unused) 305 rcpt_unused = msg_rcpt_unused; 306 307 /* 308 * Transfer the unused recipient slots back to the transport pool and to 309 * the next not-fully-read job. Job's message limits are adjusted 310 * accordingly. Note that the transport pool can be negative if we used 311 * some of the rcpt_per_stack slots. 312 */ 313 if (rcpt_unused > 0) { 314 job->rcpt_limit -= rcpt_unused; 315 message->rcpt_limit -= rcpt_unused; 316 transport->rcpt_unused += rcpt_unused; 317 if (next != 0 && (rcpt_unused = transport->rcpt_unused) > 0) { 318 next->rcpt_limit += rcpt_unused; 319 next->message->rcpt_limit += rcpt_unused; 320 transport->rcpt_unused = 0; 321 } 322 } 323 } 324 325 /* qmgr_job_parent_gone - take care of orphaned stack children */ 326 327 static void qmgr_job_parent_gone(QMGR_JOB *job, QMGR_JOB *parent) 328 { 329 QMGR_JOB *child; 330 331 while ((child = job->stack_children.next) != 0) { 332 QMGR_LIST_UNLINK(job->stack_children, QMGR_JOB *, child, stack_siblings); 333 if (parent != 0) 334 QMGR_LIST_APPEND(parent->stack_children, child, stack_siblings); 335 child->stack_parent = parent; 336 } 337 } 338 339 /* qmgr_job_unlink - unlink the job from the job lists */ 340 341 static void qmgr_job_unlink(QMGR_JOB *job) 342 { 343 const char *myname = "qmgr_job_unlink"; 344 QMGR_TRANSPORT *transport = job->transport; 345 346 /* 347 * Sanity checks. 348 */ 349 if (job->stack_level != 0) 350 msg_panic("%s: non-zero stack level (%d)", myname, job->stack_level); 351 if (job->stack_parent != 0) 352 msg_panic("%s: parent present", myname); 353 if (job->stack_siblings.next != 0) 354 msg_panic("%s: siblings present", myname); 355 356 /* 357 * Make sure that children of job on zero stack level are informed that 358 * their parent is gone too. 359 */ 360 qmgr_job_parent_gone(job, 0); 361 362 /* 363 * Update the current job pointer if necessary. 364 */ 365 if (transport->job_current == job) 366 transport->job_current = job->transport_peers.next; 367 368 /* 369 * Invalidate the candidate selection cache if necessary. 370 */ 371 if (job == transport->candidate_cache 372 || job == transport->candidate_cache_current) 373 RESET_CANDIDATE_CACHE(transport); 374 375 /* 376 * Remove the job from the job lists and mark it as unlinked. 377 */ 378 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); 379 QMGR_LIST_UNLINK(transport->job_bytime, QMGR_JOB *, job, time_peers); 380 job->stack_level = -1; 381 } 382 383 /* qmgr_job_retire - remove the job from the job lists while waiting for recipients to deliver */ 384 385 static void qmgr_job_retire(QMGR_JOB *job) 386 { 387 if (msg_verbose) 388 msg_info("qmgr_job_retire: %s", job->message->queue_id); 389 390 /* 391 * Pop the job from the job stack if necessary. 392 */ 393 if (job->stack_level > 0) 394 qmgr_job_pop(job); 395 396 /* 397 * Make sure this job is not cached as the next unread job for this 398 * transport. The qmgr_entry_done() will make sure that the slots donated 399 * by this job are moved back to the transport pool as soon as possible. 400 */ 401 qmgr_job_move_limits(job); 402 403 /* 404 * Remove the job from the job lists. Note that it remains on the message 405 * job list, though, and that it can be revived by using 406 * qmgr_job_obtain(). Also note that the available slot counter is left 407 * intact. 408 */ 409 qmgr_job_unlink(job); 410 } 411 412 /* qmgr_job_free - release the job structure */ 413 414 void qmgr_job_free(QMGR_JOB *job) 415 { 416 const char *myname = "qmgr_job_free"; 417 QMGR_MESSAGE *message = job->message; 418 QMGR_TRANSPORT *transport = job->transport; 419 420 if (msg_verbose) 421 msg_info("%s: %s %s", myname, message->queue_id, transport->name); 422 423 /* 424 * Sanity checks. 425 */ 426 if (job->rcpt_count) 427 msg_panic("%s: non-zero recipient count (%d)", myname, job->rcpt_count); 428 429 /* 430 * Pop the job from the job stack if necessary. 431 */ 432 if (job->stack_level > 0) 433 qmgr_job_pop(job); 434 435 /* 436 * Return any remaining recipient slots back to the recipient slots pool. 437 */ 438 qmgr_job_move_limits(job); 439 if (job->rcpt_limit) 440 msg_panic("%s: recipient slots leak (%d)", myname, job->rcpt_limit); 441 442 /* 443 * Unlink and discard the structure. Check if the job is still linked on 444 * the job lists or if it was already retired before unlinking it. 445 */ 446 if (job->stack_level >= 0) 447 qmgr_job_unlink(job); 448 QMGR_LIST_UNLINK(message->job_list, QMGR_JOB *, job, message_peers); 449 htable_delete(transport->job_byname, message->queue_id, (void (*) (void *)) 0); 450 htable_free(job->peer_byname, (void (*) (void *)) 0); 451 myfree((void *) job); 452 } 453 454 /* qmgr_job_count_slots - maintain the delivery slot counters */ 455 456 static void qmgr_job_count_slots(QMGR_JOB *job) 457 { 458 459 /* 460 * Count the number of delivery slots used during the delivery of the 461 * selected job. Also count the number of delivery slots available for 462 * its preemption. 463 * 464 * Despite its trivial look, this is one of the key parts of the theory 465 * behind this preempting scheduler. 466 */ 467 job->slots_available++; 468 job->slots_used++; 469 470 /* 471 * If the selected job is not the original current job, reset the 472 * candidate cache because the change above have slightly increased the 473 * chance of this job becoming a candidate next time. 474 * 475 * Don't expect that the change of the current jobs this turn will render 476 * the candidate cache invalid the next turn - it can happen that the 477 * next turn the original current job will be selected again and the 478 * cache would be considered valid in such case. 479 */ 480 if (job != job->transport->candidate_cache_current) 481 RESET_CANDIDATE_CACHE(job->transport); 482 } 483 484 /* qmgr_job_candidate - find best job candidate for preempting given job */ 485 486 static QMGR_JOB *qmgr_job_candidate(QMGR_JOB *current) 487 { 488 QMGR_TRANSPORT *transport = current->transport; 489 QMGR_JOB *job, *best_job = 0; 490 double score, best_score = 0.0; 491 int max_slots, max_needed_entries, max_total_entries; 492 int delay; 493 time_t now = sane_time(); 494 495 /* 496 * Fetch the result directly from the cache if the cache is still valid. 497 * 498 * Note that we cache negative results too, so the cache must be invalidated 499 * by resetting the cached current job pointer, not the candidate pointer 500 * itself. 501 * 502 * In case the cache is valid and contains no candidate, we can ignore the 503 * time change, as it affects only which candidate is the best, not if 504 * one exists. However, this feature requires that we no longer relax the 505 * cache resetting rules, depending on the automatic cache timeout. 506 */ 507 if (transport->candidate_cache_current == current 508 && (transport->candidate_cache_time == now 509 || transport->candidate_cache == 0)) 510 return (transport->candidate_cache); 511 512 /* 513 * Estimate the minimum amount of delivery slots that can ever be 514 * accumulated for the given job. All jobs that won't fit into these 515 * slots are excluded from the candidate selection. 516 */ 517 max_slots = (MIN_ENTRIES(current) - current->selected_entries 518 + current->slots_available) / transport->slot_cost; 519 520 /* 521 * Select the candidate with best time_since_queued/total_recipients 522 * score. In addition to jobs which don't meet the max_slots limit, skip 523 * also jobs which don't have any selectable entries at the moment. 524 * 525 * Instead of traversing the whole job list we traverse it just from the 526 * current job forward. This has several advantages. First, we skip some 527 * of the blocker jobs and the current job itself right away. But the 528 * really important advantage is that we are sure that we don't consider 529 * any jobs that are already stack children of the current job. Thanks to 530 * this we can easily include all encountered jobs which are leaf 531 * children of some of the preempting stacks as valid candidates. All we 532 * need to do is to make sure we do not include any of the stack parents. 533 * And, because the leaf children are not ordered by the time since 534 * queued, we have to exclude them from the early loop end test. 535 * 536 * However, don't bother searching if we can't find anything suitable 537 * anyway. 538 */ 539 if (max_slots > 0) { 540 for (job = current->transport_peers.next; job; job = job->transport_peers.next) { 541 if (job->stack_children.next != 0 || IS_BLOCKER(job, transport)) 542 continue; 543 max_total_entries = MAX_ENTRIES(job); 544 max_needed_entries = max_total_entries - job->selected_entries; 545 delay = now - job->message->queued_time + 1; 546 if (max_needed_entries > 0 && max_needed_entries <= max_slots) { 547 score = (double) delay / max_total_entries; 548 if (score > best_score) { 549 best_score = score; 550 best_job = job; 551 } 552 } 553 554 /* 555 * Stop early if the best score is as good as it can get. 556 */ 557 if (delay <= best_score && job->stack_level == 0) 558 break; 559 } 560 } 561 562 /* 563 * Cache the result for later use. 564 */ 565 transport->candidate_cache = best_job; 566 transport->candidate_cache_current = current; 567 transport->candidate_cache_time = now; 568 569 return (best_job); 570 } 571 572 /* qmgr_job_preempt - preempt large message with smaller one */ 573 574 static QMGR_JOB *qmgr_job_preempt(QMGR_JOB *current) 575 { 576 const char *myname = "qmgr_job_preempt"; 577 QMGR_TRANSPORT *transport = current->transport; 578 QMGR_JOB *job, *prev; 579 int expected_slots; 580 int rcpt_slots; 581 582 /* 583 * Suppress preempting completely if the current job is not big enough to 584 * accumulate even the minimal number of slots required. 585 * 586 * Also, don't look for better job candidate if there are no available slots 587 * yet (the count can get negative due to the slot loans below). 588 */ 589 if (current->slots_available <= 0 590 || MAX_ENTRIES(current) < transport->min_slots * transport->slot_cost) 591 return (current); 592 593 /* 594 * Find best candidate for preempting the current job. 595 * 596 * Note that the function also takes care that the candidate fits within the 597 * number of delivery slots which the current job is still able to 598 * accumulate. 599 */ 600 if ((job = qmgr_job_candidate(current)) == 0) 601 return (current); 602 603 /* 604 * Sanity checks. 605 */ 606 if (job == current) 607 msg_panic("%s: attempt to preempt itself", myname); 608 if (job->stack_children.next != 0) 609 msg_panic("%s: already on the job stack (%d)", myname, job->stack_level); 610 if (job->stack_level < 0) 611 msg_panic("%s: not on the job list (%d)", myname, job->stack_level); 612 613 /* 614 * Check if there is enough available delivery slots accumulated to 615 * preempt the current job. 616 * 617 * The slot loaning scheme improves the average message response time. Note 618 * that the loan only allows the preemption happen earlier, though. It 619 * doesn't affect how many slots have to be "paid" - in either case the 620 * full number of slots required has to be accumulated later before the 621 * current job can be preempted again. 622 */ 623 expected_slots = MAX_ENTRIES(job) - job->selected_entries; 624 if (current->slots_available / transport->slot_cost + transport->slot_loan 625 < expected_slots * transport->slot_loan_factor / 100.0) 626 return (current); 627 628 /* 629 * Preempt the current job. 630 * 631 * This involves placing the selected candidate in front of the current job 632 * on the job list and updating the stack parent/child/sibling pointers 633 * appropriately. But first we need to make sure that the candidate is 634 * taken from its previous job stack which it might be top of. 635 */ 636 if (job->stack_level > 0) 637 qmgr_job_pop(job); 638 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); 639 prev = current->transport_peers.prev; 640 QMGR_LIST_LINK(transport->job_list, prev, job, current, transport_peers); 641 job->stack_parent = current; 642 QMGR_LIST_APPEND(current->stack_children, job, stack_siblings); 643 job->stack_level = current->stack_level + 1; 644 645 /* 646 * Update the current job pointer and explicitly reset the candidate 647 * cache. 648 */ 649 transport->job_current = job; 650 RESET_CANDIDATE_CACHE(transport); 651 652 /* 653 * Since the single job can be preempted by several jobs at the same 654 * time, we have to adjust the available slot count now to prevent using 655 * the same slots multiple times. To do that we subtract the number of 656 * slots the preempting job will supposedly use. This number will be 657 * corrected later when that job is popped from the stack to reflect the 658 * number of slots really used. 659 * 660 * As long as we don't need to keep track of how many slots were really 661 * used, we can (ab)use the slots_used counter for counting the 662 * difference between the real and expected amounts instead of the 663 * absolute amount. 664 */ 665 current->slots_available -= expected_slots * transport->slot_cost; 666 job->slots_used = -expected_slots; 667 668 /* 669 * Add part of extra recipient slots reserved for preempting jobs to the 670 * new current job if necessary. 671 * 672 * Note that transport->rcpt_unused is within <-rcpt_per_stack,0> in such 673 * case. 674 */ 675 if (job->message->rcpt_offset != 0) { 676 rcpt_slots = (transport->rcpt_per_stack + transport->rcpt_unused + 1) / 2; 677 job->rcpt_limit += rcpt_slots; 678 job->message->rcpt_limit += rcpt_slots; 679 transport->rcpt_unused -= rcpt_slots; 680 } 681 if (msg_verbose) 682 msg_info("%s: %s by %s, level %d", myname, current->message->queue_id, 683 job->message->queue_id, job->stack_level); 684 685 return (job); 686 } 687 688 /* qmgr_job_pop - remove the job from its job preemption stack */ 689 690 static void qmgr_job_pop(QMGR_JOB *job) 691 { 692 const char *myname = "qmgr_job_pop"; 693 QMGR_TRANSPORT *transport = job->transport; 694 QMGR_JOB *parent; 695 696 if (msg_verbose) 697 msg_info("%s: %s", myname, job->message->queue_id); 698 699 /* 700 * Sanity checks. 701 */ 702 if (job->stack_level <= 0) 703 msg_panic("%s: not on the job stack (%d)", myname, job->stack_level); 704 705 /* 706 * Adjust the number of delivery slots available to preempt job's parent. 707 * Note that the -= actually adds back any unused slots, as we have 708 * already subtracted the expected amount of slots from both counters 709 * when we did the preemption. 710 * 711 * Note that we intentionally do not adjust slots_used of the parent. Doing 712 * so would decrease the maximum per message inflation factor if the 713 * preemption appeared near the end of parent delivery. 714 * 715 * For the same reason we do not adjust parent's slots_available if the 716 * parent is not the original parent that was preempted by this job 717 * (i.e., the original parent job has already completed). 718 * 719 * This is another key part of the theory behind this preempting scheduler. 720 */ 721 if ((parent = job->stack_parent) != 0 722 && job->stack_level == parent->stack_level + 1) 723 parent->slots_available -= job->slots_used * transport->slot_cost; 724 725 /* 726 * Remove the job from its parent's children list. 727 */ 728 if (parent != 0) { 729 QMGR_LIST_UNLINK(parent->stack_children, QMGR_JOB *, job, stack_siblings); 730 job->stack_parent = 0; 731 } 732 733 /* 734 * If there is a parent, let it adopt all those orphaned children. 735 * Otherwise at least notify the children that their parent is gone. 736 */ 737 qmgr_job_parent_gone(job, parent); 738 739 /* 740 * Put the job back to stack level zero. 741 */ 742 job->stack_level = 0; 743 744 /* 745 * Explicitly reset the candidate cache. It's not worth trying to skip 746 * this under some complicated conditions - in most cases the popped job 747 * is the current job so we would have to reset it anyway. 748 */ 749 RESET_CANDIDATE_CACHE(transport); 750 751 /* 752 * Here we leave the remaining work involving the proper placement on the 753 * job list to the caller. The most important reason for this is that it 754 * allows us not to look up where exactly to place the job. 755 * 756 * The caller is also made responsible for invalidating the current job 757 * cache if necessary. 758 */ 759 #if 0 760 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); 761 QMGR_LIST_LINK(transport->job_list, some_prev, job, some_next, transport_peers); 762 763 if (transport->job_current == job) 764 transport->job_current = job->transport_peers.next; 765 #endif 766 } 767 768 /* qmgr_job_peer_select - select next peer suitable for delivery */ 769 770 static QMGR_PEER *qmgr_job_peer_select(QMGR_JOB *job) 771 { 772 QMGR_PEER *peer; 773 QMGR_MESSAGE *message = job->message; 774 775 /* 776 * Try reading in more recipients. We do that as soon as possible 777 * (almost, see below), to make sure there is enough new blood pouring 778 * in. Otherwise single recipient for slow destination might starve the 779 * entire message delivery, leaving lot of fast destination recipients 780 * sitting idle in the queue file. 781 * 782 * Ideally we would like to read in recipients whenever there is a space, 783 * but to prevent excessive I/O, we read them only when enough time has 784 * passed or we can read enough of them at once. 785 * 786 * Note that even if we read the recipients few at a time, the message 787 * loading code tries to put them to existing recipient entries whenever 788 * possible, so the per-destination recipient grouping is not grossly 789 * affected. 790 * 791 * XXX Workaround for logic mismatch. The message->refcount test needs 792 * explanation. If the refcount is zero, it means that qmgr_active_done() 793 * is being completed asynchronously. In such case, we can't read in 794 * more recipients as bad things would happen after qmgr_active_done() 795 * continues processing. Note that this results in the given job being 796 * stalled for some time, but fortunately this particular situation is so 797 * rare that it is not critical. Still we seek for better solution. 798 */ 799 if (message->rcpt_offset != 0 800 && message->refcount > 0 801 && (message->rcpt_limit - message->rcpt_count >= job->transport->refill_limit 802 || (message->rcpt_limit > message->rcpt_count 803 && sane_time() - message->refill_time >= job->transport->refill_delay))) 804 qmgr_message_realloc(message); 805 806 /* 807 * Get the next suitable peer, if there is any. 808 */ 809 if (HAS_ENTRIES(job) && (peer = qmgr_peer_select(job)) != 0) 810 return (peer); 811 812 /* 813 * There is no suitable peer in-core, so try reading in more recipients 814 * if possible. This is our last chance to get suitable peer before 815 * giving up on this job for now. 816 * 817 * XXX For message->refcount, see above. 818 */ 819 if (message->rcpt_offset != 0 820 && message->refcount > 0 821 && message->rcpt_limit > message->rcpt_count) { 822 qmgr_message_realloc(message); 823 if (HAS_ENTRIES(job)) 824 return (qmgr_peer_select(job)); 825 } 826 return (0); 827 } 828 829 /* qmgr_job_entry_select - select next entry suitable for delivery */ 830 831 QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport) 832 { 833 QMGR_JOB *job, *next; 834 QMGR_PEER *peer; 835 QMGR_ENTRY *entry; 836 837 /* 838 * Get the current job if there is one. 839 */ 840 if ((job = transport->job_current) == 0) 841 return (0); 842 843 /* 844 * Exercise the preempting algorithm if enabled. 845 * 846 * The slot_cost equal to 1 causes the algorithm to degenerate and is 847 * therefore disabled too. 848 */ 849 if (transport->slot_cost >= 2) 850 job = qmgr_job_preempt(job); 851 852 /* 853 * Select next entry suitable for delivery. In case the current job can't 854 * provide one because of the per-destination concurrency limits, we mark 855 * it as a "blocker" job and continue with the next job on the job list. 856 * 857 * Note that the loop also takes care of getting the "stall" jobs (job with 858 * no entries currently available) out of the way if necessary. Stall 859 * jobs can appear in case of multi-transport messages whose recipients 860 * don't fit in-core at once. Some jobs created by such message may have 861 * only few recipients and would stay on the job list until all other 862 * jobs of that message are delivered, blocking precious recipient slots 863 * available to this transport. Or it can happen that the job has some 864 * more entries but suddenly they all get deferred. Whatever the reason, 865 * we retire such jobs below if we happen to come across some. 866 */ 867 for ( /* empty */ ; job; job = next) { 868 next = job->transport_peers.next; 869 870 /* 871 * Don't bother if the job is known to have no available entries 872 * because of the per-destination concurrency limits. 873 */ 874 if (IS_BLOCKER(job, transport)) 875 continue; 876 877 if ((peer = qmgr_job_peer_select(job)) != 0) { 878 879 /* 880 * We have found a suitable peer. Select one of its entries and 881 * adjust the delivery slot counters. 882 */ 883 entry = qmgr_entry_select(peer); 884 qmgr_job_count_slots(job); 885 886 /* 887 * Remember the current job for the next time so we don't have to 888 * crawl over all those blockers again. They will be reconsidered 889 * when the concurrency limit permits. 890 */ 891 transport->job_current = job; 892 893 /* 894 * In case we selected the very last job entry, remove the job 895 * from the job lists right now. 896 * 897 * This action uses the assumption that once the job entry has been 898 * selected, it can be unselected only before the message ifself 899 * is deferred. Thus the job with all entries selected can't 900 * re-appear with more entries available for selection again 901 * (without reading in more entries from the queue file, which in 902 * turn invokes qmgr_job_obtain() which re-links the job back on 903 * the lists if necessary). 904 * 905 * Note that qmgr_job_move_limits() transfers the recipients slots 906 * correctly even if the job is unlinked from the job list thanks 907 * to the job_next_unread caching. 908 */ 909 if (!HAS_ENTRIES(job) && job->message->rcpt_offset == 0) 910 qmgr_job_retire(job); 911 912 /* 913 * Finally. Hand back the fruit of our tedious effort. 914 */ 915 return (entry); 916 } else if (HAS_ENTRIES(job)) { 917 918 /* 919 * The job can't be selected due the concurrency limits. Mark it 920 * together with its queues so we know they are blocking the job 921 * list and they get the appropriate treatment. In particular, 922 * all blockers will be reconsidered when one of the problematic 923 * queues will accept more deliveries. And the job itself will be 924 * reconsidered if it is assigned some more entries. 925 */ 926 job->blocker_tag = transport->blocker_tag; 927 for (peer = job->peer_list.next; peer; peer = peer->peers.next) 928 if (peer->entry_list.next != 0) 929 peer->queue->blocker_tag = transport->blocker_tag; 930 } else { 931 932 /* 933 * The job is "stalled". Retire it until it either gets freed or 934 * gets more entries later. 935 */ 936 qmgr_job_retire(job); 937 } 938 } 939 940 /* 941 * We have not found any entry we could use for delivery. Well, things 942 * must have changed since this transport was selected for asynchronous 943 * allocation. Never mind. Clear the current job pointer and reluctantly 944 * report back that we have failed in our task. 945 */ 946 transport->job_current = 0; 947 return (0); 948 } 949 950 /* qmgr_job_blocker_update - update "blocked job" status */ 951 952 void qmgr_job_blocker_update(QMGR_QUEUE *queue) 953 { 954 QMGR_TRANSPORT *transport = queue->transport; 955 956 /* 957 * If the queue was blocking some of the jobs on the job list, check if 958 * the concurrency limit has lifted. If there are still some pending 959 * deliveries, give it a try and unmark all transport blockers at once. 960 * The qmgr_job_entry_select() will do the rest. In either case make sure 961 * the queue is not marked as a blocker anymore, with extra handling of 962 * queues which were declared dead. 963 * 964 * Note that changing the blocker status also affects the candidate cache. 965 * Most of the cases would be automatically recognized by the current job 966 * change, but we play safe and reset the cache explicitly below. 967 * 968 * Keeping the transport blocker tag odd is an easy way to make sure the tag 969 * never matches jobs that are not explicitly marked as blockers. 970 */ 971 if (queue->blocker_tag == transport->blocker_tag) { 972 if (queue->window > queue->busy_refcount && queue->todo.next != 0) { 973 transport->blocker_tag += 2; 974 transport->job_current = transport->job_list.next; 975 transport->candidate_cache_current = 0; 976 } 977 if (queue->window > queue->busy_refcount || QMGR_QUEUE_THROTTLED(queue)) 978 queue->blocker_tag = 0; 979 } 980 } 981