1 /* $NetBSD: qmgr_queue.c,v 1.2 2017/02/14 01:16:46 christos Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmgr_queue 3 6 /* SUMMARY 7 /* per-destination queues 8 /* SYNOPSIS 9 /* #include "qmgr.h" 10 /* 11 /* int qmgr_queue_count; 12 /* 13 /* QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop) 14 /* QMGR_TRANSPORT *transport; 15 /* const char *name; 16 /* const char *nexthop; 17 /* 18 /* void qmgr_queue_done(queue) 19 /* QMGR_QUEUE *queue; 20 /* 21 /* QMGR_QUEUE *qmgr_queue_find(transport, name) 22 /* QMGR_TRANSPORT *transport; 23 /* const char *name; 24 /* 25 /* QMGR_QUEUE *qmgr_queue_select(transport) 26 /* QMGR_TRANSPORT *transport; 27 /* 28 /* void qmgr_queue_throttle(queue, dsn) 29 /* QMGR_QUEUE *queue; 30 /* DSN *dsn; 31 /* 32 /* void qmgr_queue_unthrottle(queue) 33 /* QMGR_QUEUE *queue; 34 /* 35 /* void qmgr_queue_suspend(queue, delay) 36 /* QMGR_QUEUE *queue; 37 /* int delay; 38 /* DESCRIPTION 39 /* These routines add/delete/manipulate per-destination queues. 40 /* Each queue corresponds to a specific transport and destination. 41 /* Each queue has a `todo' list of delivery requests for that 42 /* destination, and a `busy' list of delivery requests in progress. 43 /* 44 /* qmgr_queue_count is a global counter for the total number 45 /* of in-core queue structures. 46 /* 47 /* qmgr_queue_create() creates an empty named queue for the named 48 /* transport and destination. The queue is given an initial 49 /* concurrency limit as specified with the 50 /* \fIinitial_destination_concurrency\fR configuration parameter, 51 /* provided that it does not exceed the transport-specific 52 /* concurrency limit. 53 /* 54 /* qmgr_queue_done() disposes of a per-destination queue after all 55 /* its entries have been taken care of. It is an error to dispose 56 /* of a dead queue. 57 /* 58 /* qmgr_queue_find() looks up the named queue for the named 59 /* transport. A null result means that the queue was not found. 60 /* 61 /* qmgr_queue_select() uses a round-robin strategy to select 62 /* from the named transport one per-destination queue with a 63 /* non-empty `todo' list. 64 /* 65 /* qmgr_queue_throttle() handles a delivery error, and decrements the 66 /* concurrency limit for the destination, with a lower bound of 1. 67 /* When the cohort failure bound is reached, qmgr_queue_throttle() 68 /* sets the concurrency limit to zero and starts a timer 69 /* to re-enable delivery to the destination after a configurable delay. 70 /* 71 /* qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects. 72 /* The concurrency limit for the destination is incremented, 73 /* provided that it does not exceed the destination concurrency 74 /* limit specified for the transport. This routine implements 75 /* "slow open" mode, and eliminates the "thundering herd" problem. 76 /* 77 /* qmgr_queue_suspend() suspends delivery for this destination 78 /* briefly. 79 /* DIAGNOSTICS 80 /* Panic: consistency check failure. 81 /* LICENSE 82 /* .ad 83 /* .fi 84 /* The Secure Mailer license must be distributed with this software. 85 /* AUTHOR(S) 86 /* Wietse Venema 87 /* IBM T.J. Watson Research 88 /* P.O. Box 704 89 /* Yorktown Heights, NY 10598, USA 90 /*--*/ 91 92 /* System library. */ 93 94 #include <sys_defs.h> 95 #include <time.h> 96 97 /* Utility library. */ 98 99 #include <msg.h> 100 #include <mymalloc.h> 101 #include <events.h> 102 #include <htable.h> 103 104 /* Global library. */ 105 106 #include <mail_params.h> 107 #include <recipient_list.h> 108 #include <mail_proto.h> /* QMGR_LOG_WINDOW */ 109 110 /* Application-specific. */ 111 112 #include "qmgr.h" 113 114 int qmgr_queue_count; 115 116 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \ 117 (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \ 118 || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0) 119 120 #define QMGR_LOG_FEEDBACK(feedback) \ 121 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \ 122 msg_info("%s: feedback %g", myname, feedback); 123 124 #define QMGR_LOG_WINDOW(queue) \ 125 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \ 126 msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \ 127 myname, queue->name, queue->transport->dest_concurrency_limit, \ 128 queue->window, queue->success, queue->failure, queue->fail_cohorts); 129 130 /* qmgr_queue_resume - resume delivery to destination */ 131 132 static void qmgr_queue_resume(int event, void *context) 133 { 134 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; 135 const char *myname = "qmgr_queue_resume"; 136 137 /* 138 * Sanity checks. 139 */ 140 if (!QMGR_QUEUE_SUSPENDED(queue)) 141 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 142 143 /* 144 * We can't simply force delivery on this queue: the transport's pending 145 * count may already be maxed out, and there may be other constraints 146 * that definitely should be none of our business. The best we can do is 147 * to play by the same rules as everyone else: let qmgr_active_drain() 148 * and round-robin selection take care of message selection. 149 */ 150 queue->window = 1; 151 152 /* 153 * Every event handler that leaves a queue in the "ready" state should 154 * remove the queue when it is empty. 155 */ 156 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) 157 qmgr_queue_done(queue); 158 } 159 160 /* qmgr_queue_suspend - briefly suspend a destination */ 161 162 void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay) 163 { 164 const char *myname = "qmgr_queue_suspend"; 165 166 /* 167 * Sanity checks. 168 */ 169 if (!QMGR_QUEUE_READY(queue)) 170 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 171 if (queue->busy_refcount > 0) 172 msg_panic("%s: queue is busy", myname); 173 174 /* 175 * Set the queue status to "suspended". No-one is supposed to remove a 176 * queue in suspended state. 177 */ 178 queue->window = QMGR_QUEUE_STAT_SUSPENDED; 179 event_request_timer(qmgr_queue_resume, (void *) queue, delay); 180 } 181 182 /* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */ 183 184 static void qmgr_queue_unthrottle_wrapper(int unused_event, void *context) 185 { 186 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; 187 188 /* 189 * This routine runs when a wakeup timer goes off; it does not run in the 190 * context of some queue manipulation. Therefore, it is safe to discard 191 * this in-core queue when it is empty and when this site is not dead. 192 */ 193 qmgr_queue_unthrottle(queue); 194 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) 195 qmgr_queue_done(queue); 196 } 197 198 /* qmgr_queue_unthrottle - give this destination another chance */ 199 200 void qmgr_queue_unthrottle(QMGR_QUEUE *queue) 201 { 202 const char *myname = "qmgr_queue_unthrottle"; 203 QMGR_TRANSPORT *transport = queue->transport; 204 double feedback; 205 206 if (msg_verbose) 207 msg_info("%s: queue %s", myname, queue->name); 208 209 /* 210 * Sanity checks. 211 */ 212 if (!QMGR_QUEUE_THROTTLED(queue) && !QMGR_QUEUE_READY(queue)) 213 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 214 215 /* 216 * Don't restart the negative feedback hysteresis cycle with every 217 * positive feedback. Restart it only when we make a positive concurrency 218 * adjustment (i.e. at the end of a positive feedback hysteresis cycle). 219 * Otherwise negative feedback would be too aggressive: negative feedback 220 * takes effect immediately at the start of its hysteresis cycle. 221 */ 222 queue->fail_cohorts = 0; 223 224 /* 225 * Special case when this site was dead. 226 */ 227 if (QMGR_QUEUE_THROTTLED(queue)) { 228 event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue); 229 if (queue->dsn == 0) 230 msg_panic("%s: queue %s: window 0 status 0", myname, queue->name); 231 dsn_free(queue->dsn); 232 queue->dsn = 0; 233 /* Back from the almost grave, best concurrency is anyone's guess. */ 234 if (queue->busy_refcount > 0) 235 queue->window = queue->busy_refcount; 236 else 237 queue->window = transport->init_dest_concurrency; 238 queue->success = queue->failure = 0; 239 QMGR_LOG_WINDOW(queue); 240 return; 241 } 242 243 /* 244 * Increase the destination's concurrency limit until we reach the 245 * transport's concurrency limit. Allow for a margin the size of the 246 * initial destination concurrency, so that we're not too gentle. 247 * 248 * Why is the concurrency increment based on preferred concurrency and not 249 * on the number of outstanding delivery requests? The latter fluctuates 250 * wildly when deliveries complete in bursts (artificial benchmark 251 * measurements), and does not account for cached connections. 252 * 253 * Keep the window within reasonable distance from actual concurrency 254 * otherwise negative feedback will be ineffective. This expression 255 * assumes that busy_refcount changes gradually. This is invalid when 256 * deliveries complete in bursts (artificial benchmark measurements). 257 */ 258 if (transport->dest_concurrency_limit == 0 259 || transport->dest_concurrency_limit > queue->window) 260 if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) { 261 feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window); 262 QMGR_LOG_FEEDBACK(feedback); 263 queue->success += feedback; 264 /* Prepare for overshoot (feedback > hysteresis, rounding error). */ 265 while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) { 266 queue->window += transport->pos_feedback.hysteresis; 267 queue->success -= transport->pos_feedback.hysteresis; 268 queue->failure = 0; 269 } 270 /* Prepare for overshoot. */ 271 if (transport->dest_concurrency_limit > 0 272 && queue->window > transport->dest_concurrency_limit) 273 queue->window = transport->dest_concurrency_limit; 274 } 275 QMGR_LOG_WINDOW(queue); 276 } 277 278 /* qmgr_queue_throttle - handle destination delivery failure */ 279 280 void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn) 281 { 282 const char *myname = "qmgr_queue_throttle"; 283 QMGR_TRANSPORT *transport = queue->transport; 284 double feedback; 285 286 /* 287 * Sanity checks. 288 */ 289 if (!QMGR_QUEUE_READY(queue)) 290 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 291 if (queue->dsn) 292 msg_panic("%s: queue %s: spurious reason %s", 293 myname, queue->name, queue->dsn->reason); 294 if (msg_verbose) 295 msg_info("%s: queue %s: %s %s", 296 myname, queue->name, dsn->status, dsn->reason); 297 298 /* 299 * Don't restart the positive feedback hysteresis cycle with every 300 * negative feedback. Restart it only when we make a negative concurrency 301 * adjustment (i.e. at the start of a negative feedback hysteresis 302 * cycle). Otherwise positive feedback would be too weak (positive 303 * feedback does not take effect until the end of its hysteresis cycle). 304 */ 305 306 /* 307 * This queue is declared dead after a configurable number of 308 * pseudo-cohort failures. 309 */ 310 if (QMGR_QUEUE_READY(queue)) { 311 queue->fail_cohorts += 1.0 / queue->window; 312 if (transport->fail_cohort_limit > 0 313 && queue->fail_cohorts >= transport->fail_cohort_limit) 314 queue->window = QMGR_QUEUE_STAT_THROTTLED; 315 } 316 317 /* 318 * Decrease the destination's concurrency limit until we reach 1. Base 319 * adjustments on the concurrency limit itself, instead of using the 320 * actual concurrency. The latter fluctuates wildly when deliveries 321 * complete in bursts (artificial benchmark measurements). 322 * 323 * Even after reaching 1, we maintain the negative hysteresis cycle so that 324 * negative feedback can cancel out positive feedback. 325 */ 326 if (QMGR_QUEUE_READY(queue)) { 327 feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window); 328 QMGR_LOG_FEEDBACK(feedback); 329 queue->failure -= feedback; 330 /* Prepare for overshoot (feedback > hysteresis, rounding error). */ 331 while (queue->failure - feedback / 2 < 0) { 332 queue->window -= transport->neg_feedback.hysteresis; 333 queue->success = 0; 334 queue->failure += transport->neg_feedback.hysteresis; 335 } 336 /* Prepare for overshoot. */ 337 if (queue->window < 1) 338 queue->window = 1; 339 } 340 341 /* 342 * Special case for a site that just was declared dead. 343 */ 344 if (QMGR_QUEUE_THROTTLED(queue)) { 345 queue->dsn = DSN_COPY(dsn); 346 event_request_timer(qmgr_queue_unthrottle_wrapper, 347 (void *) queue, var_min_backoff_time); 348 queue->dflags = 0; 349 } 350 QMGR_LOG_WINDOW(queue); 351 } 352 353 /* qmgr_queue_select - select in-core queue for delivery */ 354 355 QMGR_QUEUE *qmgr_queue_select(QMGR_TRANSPORT *transport) 356 { 357 QMGR_QUEUE *queue; 358 359 /* 360 * If we find a suitable site, rotate the list to enforce round-robin 361 * selection. See similar selection code in qmgr_transport_select(). 362 */ 363 for (queue = transport->queue_list.next; queue; queue = queue->peers.next) { 364 if (queue->window > queue->busy_refcount && queue->todo.next != 0) { 365 QMGR_LIST_ROTATE(transport->queue_list, queue); 366 if (msg_verbose) 367 msg_info("qmgr_queue_select: %s", queue->name); 368 return (queue); 369 } 370 } 371 return (0); 372 } 373 374 /* qmgr_queue_done - delete in-core queue for site */ 375 376 void qmgr_queue_done(QMGR_QUEUE *queue) 377 { 378 const char *myname = "qmgr_queue_done"; 379 QMGR_TRANSPORT *transport = queue->transport; 380 381 /* 382 * Sanity checks. It is an error to delete an in-core queue with pending 383 * messages or timers. 384 */ 385 if (queue->busy_refcount != 0 || queue->todo_refcount != 0) 386 msg_panic("%s: refcount: %d", myname, 387 queue->busy_refcount + queue->todo_refcount); 388 if (queue->todo.next || queue->busy.next) 389 msg_panic("%s: queue not empty: %s", myname, queue->name); 390 if (!QMGR_QUEUE_READY(queue)) 391 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 392 if (queue->dsn) 393 msg_panic("%s: queue %s: spurious reason %s", 394 myname, queue->name, queue->dsn->reason); 395 396 /* 397 * Clean up this in-core queue. 398 */ 399 QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue); 400 htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0); 401 myfree(queue->name); 402 myfree(queue->nexthop); 403 qmgr_queue_count--; 404 myfree((void *) queue); 405 } 406 407 /* qmgr_queue_create - create in-core queue for site */ 408 409 QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name, 410 const char *nexthop) 411 { 412 QMGR_QUEUE *queue; 413 414 /* 415 * If possible, choose an initial concurrency of > 1 so that one bad 416 * message or one bad network won't slow us down unnecessarily. 417 */ 418 419 queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE)); 420 qmgr_queue_count++; 421 queue->dflags = 0; 422 queue->last_done = 0; 423 queue->name = mystrdup(name); 424 queue->nexthop = mystrdup(nexthop); 425 queue->todo_refcount = 0; 426 queue->busy_refcount = 0; 427 queue->transport = transport; 428 queue->window = transport->init_dest_concurrency; 429 queue->success = queue->failure = queue->fail_cohorts = 0; 430 QMGR_LIST_INIT(queue->todo); 431 QMGR_LIST_INIT(queue->busy); 432 queue->dsn = 0; 433 queue->clog_time_to_warn = 0; 434 QMGR_LIST_PREPEND(transport->queue_list, queue); 435 htable_enter(transport->queue_byname, name, (void *) queue); 436 return (queue); 437 } 438 439 /* qmgr_queue_find - find in-core named queue */ 440 441 QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name) 442 { 443 return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name)); 444 } 445