Lines Matching full:queue
18 /* void qmgr_queue_done(queue)
19 /* QMGR_QUEUE *queue;
25 /* void qmgr_queue_throttle(queue, dsn)
26 /* QMGR_QUEUE *queue;
29 /* void qmgr_queue_unthrottle(queue)
30 /* QMGR_QUEUE *queue;
32 /* void qmgr_queue_suspend(queue, delay)
33 /* QMGR_QUEUE *queue;
37 /* Each queue corresponds to a specific transport and destination.
38 /* Each queue has a `todo' list of delivery requests for that
42 /* of in-core queue structures.
44 /* qmgr_queue_create() creates an empty named queue for the named
45 /* transport and destination. The queue is given an initial
51 /* qmgr_queue_done() disposes of a per-destination queue after all
53 /* of a dead queue.
55 /* qmgr_queue_find() looks up the named queue for the named
56 /* transport. A null result means that the queue was not found.
72 /* that are based on the present queue's concurrency window.
74 /* status of blocker jobs is re-evaluated after the queue is
122 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \ argument
123 (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
124 || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
127 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
130 #define QMGR_LOG_WINDOW(queue) \ argument
131 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
132 msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
133 myname, queue->name, queue->transport->dest_concurrency_limit, \
134 queue->window, queue->success, queue->failure, queue->fail_cohorts);
140 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; in qmgr_queue_resume() local
146 if (!QMGR_QUEUE_SUSPENDED(queue)) in qmgr_queue_resume()
147 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_resume()
150 * We can't simply force delivery on this queue: the transport's pending in qmgr_queue_resume()
156 queue->window = 1; in qmgr_queue_resume()
159 * Every event handler that leaves a queue in the "ready" state should in qmgr_queue_resume()
160 * remove the queue when it is empty. in qmgr_queue_resume()
171 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) in qmgr_queue_resume()
172 qmgr_queue_done(queue); in qmgr_queue_resume()
174 qmgr_job_blocker_update(queue); in qmgr_queue_resume()
179 void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay) in qmgr_queue_suspend() argument
186 if (!QMGR_QUEUE_READY(queue)) in qmgr_queue_suspend()
187 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_suspend()
188 if (queue->busy_refcount > 0) in qmgr_queue_suspend()
189 msg_panic("%s: queue is busy", myname); in qmgr_queue_suspend()
192 * Set the queue status to "suspended". No-one is supposed to remove a in qmgr_queue_suspend()
193 * queue in suspended state. in qmgr_queue_suspend()
195 queue->window = QMGR_QUEUE_STAT_SUSPENDED; in qmgr_queue_suspend()
196 event_request_timer(qmgr_queue_resume, (void *) queue, delay); in qmgr_queue_suspend()
203 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; in qmgr_queue_unthrottle_wrapper() local
207 * context of some queue manipulation. Therefore, it is safe to discard in qmgr_queue_unthrottle_wrapper()
208 * this in-core queue when it is empty and when this site is not dead. in qmgr_queue_unthrottle_wrapper()
210 qmgr_queue_unthrottle(queue); in qmgr_queue_unthrottle_wrapper()
211 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) in qmgr_queue_unthrottle_wrapper()
212 qmgr_queue_done(queue); in qmgr_queue_unthrottle_wrapper()
217 void qmgr_queue_unthrottle(QMGR_QUEUE *queue) in qmgr_queue_unthrottle() argument
220 QMGR_TRANSPORT *transport = queue->transport; in qmgr_queue_unthrottle()
224 msg_info("%s: queue %s", myname, queue->name); in qmgr_queue_unthrottle()
229 if (!QMGR_QUEUE_READY(queue) && !QMGR_QUEUE_THROTTLED(queue)) in qmgr_queue_unthrottle()
230 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_unthrottle()
239 queue->fail_cohorts = 0; in qmgr_queue_unthrottle()
244 if (QMGR_QUEUE_THROTTLED(queue)) { in qmgr_queue_unthrottle()
245 event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue); in qmgr_queue_unthrottle()
246 if (queue->dsn == 0) in qmgr_queue_unthrottle()
247 msg_panic("%s: queue %s: window 0 status 0", myname, queue->name); in qmgr_queue_unthrottle()
248 dsn_free(queue->dsn); in qmgr_queue_unthrottle()
249 queue->dsn = 0; in qmgr_queue_unthrottle()
251 if (queue->busy_refcount > 0) in qmgr_queue_unthrottle()
252 queue->window = queue->busy_refcount; in qmgr_queue_unthrottle()
254 queue->window = transport->init_dest_concurrency; in qmgr_queue_unthrottle()
255 queue->success = queue->failure = 0; in qmgr_queue_unthrottle()
256 QMGR_LOG_WINDOW(queue); in qmgr_queue_unthrottle()
276 || transport->dest_concurrency_limit > queue->window) in qmgr_queue_unthrottle()
277 if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) { in qmgr_queue_unthrottle()
278 feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window); in qmgr_queue_unthrottle()
280 queue->success += feedback; in qmgr_queue_unthrottle()
282 while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) { in qmgr_queue_unthrottle()
283 queue->window += transport->pos_feedback.hysteresis; in qmgr_queue_unthrottle()
284 queue->success -= transport->pos_feedback.hysteresis; in qmgr_queue_unthrottle()
285 queue->failure = 0; in qmgr_queue_unthrottle()
289 && queue->window > transport->dest_concurrency_limit) in qmgr_queue_unthrottle()
290 queue->window = transport->dest_concurrency_limit; in qmgr_queue_unthrottle()
292 QMGR_LOG_WINDOW(queue); in qmgr_queue_unthrottle()
297 void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn) in qmgr_queue_throttle() argument
300 QMGR_TRANSPORT *transport = queue->transport; in qmgr_queue_throttle()
306 if (!QMGR_QUEUE_READY(queue)) in qmgr_queue_throttle()
307 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_throttle()
308 if (queue->dsn) in qmgr_queue_throttle()
309 msg_panic("%s: queue %s: spurious reason %s", in qmgr_queue_throttle()
310 myname, queue->name, queue->dsn->reason); in qmgr_queue_throttle()
312 msg_info("%s: queue %s: %s %s", in qmgr_queue_throttle()
313 myname, queue->name, dsn->status, dsn->reason); in qmgr_queue_throttle()
324 * This queue is declared dead after a configurable number of in qmgr_queue_throttle()
327 if (QMGR_QUEUE_READY(queue)) { in qmgr_queue_throttle()
328 queue->fail_cohorts += 1.0 / queue->window; in qmgr_queue_throttle()
330 && queue->fail_cohorts >= transport->fail_cohort_limit) in qmgr_queue_throttle()
331 queue->window = QMGR_QUEUE_STAT_THROTTLED; in qmgr_queue_throttle()
343 if (QMGR_QUEUE_READY(queue)) { in qmgr_queue_throttle()
344 feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window); in qmgr_queue_throttle()
346 queue->failure -= feedback; in qmgr_queue_throttle()
348 while (queue->failure - feedback / 2 < 0) { in qmgr_queue_throttle()
349 queue->window -= transport->neg_feedback.hysteresis; in qmgr_queue_throttle()
350 queue->success = 0; in qmgr_queue_throttle()
351 queue->failure += transport->neg_feedback.hysteresis; in qmgr_queue_throttle()
354 if (queue->window < 1) in qmgr_queue_throttle()
355 queue->window = 1; in qmgr_queue_throttle()
361 if (QMGR_QUEUE_THROTTLED(queue)) { in qmgr_queue_throttle()
362 queue->dsn = DSN_COPY(dsn); in qmgr_queue_throttle()
364 (void *) queue, var_min_backoff_time); in qmgr_queue_throttle()
365 queue->dflags = 0; in qmgr_queue_throttle()
367 QMGR_LOG_WINDOW(queue); in qmgr_queue_throttle()
370 /* qmgr_queue_done - delete in-core queue for site */
372 void qmgr_queue_done(QMGR_QUEUE *queue) in qmgr_queue_done() argument
375 QMGR_TRANSPORT *transport = queue->transport; in qmgr_queue_done()
378 * Sanity checks. It is an error to delete an in-core queue with pending in qmgr_queue_done()
381 if (queue->busy_refcount != 0 || queue->todo_refcount != 0) in qmgr_queue_done()
383 queue->busy_refcount + queue->todo_refcount); in qmgr_queue_done()
384 if (queue->todo.next || queue->busy.next) in qmgr_queue_done()
385 msg_panic("%s: queue not empty: %s", myname, queue->name); in qmgr_queue_done()
386 if (!QMGR_QUEUE_READY(queue)) in qmgr_queue_done()
387 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_done()
388 if (queue->dsn) in qmgr_queue_done()
389 msg_panic("%s: queue %s: spurious reason %s", in qmgr_queue_done()
390 myname, queue->name, queue->dsn->reason); in qmgr_queue_done()
393 * Clean up this in-core queue. in qmgr_queue_done()
395 QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue, peers); in qmgr_queue_done()
396 htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0); in qmgr_queue_done()
397 myfree(queue->name); in qmgr_queue_done()
398 myfree(queue->nexthop); in qmgr_queue_done()
400 myfree((void *) queue); in qmgr_queue_done()
403 /* qmgr_queue_create - create in-core queue for site */
408 QMGR_QUEUE *queue; in qmgr_queue_create() local
415 queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE)); in qmgr_queue_create()
417 queue->dflags = 0; in qmgr_queue_create()
418 queue->last_done = 0; in qmgr_queue_create()
419 queue->name = mystrdup(name); in qmgr_queue_create()
420 queue->nexthop = mystrdup(nexthop); in qmgr_queue_create()
421 queue->todo_refcount = 0; in qmgr_queue_create()
422 queue->busy_refcount = 0; in qmgr_queue_create()
423 queue->transport = transport; in qmgr_queue_create()
424 queue->window = transport->init_dest_concurrency; in qmgr_queue_create()
425 queue->success = queue->failure = queue->fail_cohorts = 0; in qmgr_queue_create()
426 QMGR_LIST_INIT(queue->todo); in qmgr_queue_create()
427 QMGR_LIST_INIT(queue->busy); in qmgr_queue_create()
428 queue->dsn = 0; in qmgr_queue_create()
429 queue->clog_time_to_warn = 0; in qmgr_queue_create()
430 queue->blocker_tag = 0; in qmgr_queue_create()
431 QMGR_LIST_APPEND(transport->queue_list, queue, peers); in qmgr_queue_create()
432 htable_enter(transport->queue_byname, name, (void *) queue); in qmgr_queue_create()
433 return (queue); in qmgr_queue_create()
436 /* qmgr_queue_find - find in-core named queue */