Lines Matching full:queue

18 /*	void	qmgr_queue_done(queue)
19 /* QMGR_QUEUE *queue;
28 /* void qmgr_queue_throttle(queue, dsn)
29 /* QMGR_QUEUE *queue;
32 /* void qmgr_queue_unthrottle(queue)
33 /* QMGR_QUEUE *queue;
35 /* void qmgr_queue_suspend(queue, delay)
36 /* QMGR_QUEUE *queue;
40 /* Each queue corresponds to a specific transport and destination.
41 /* Each queue has a `todo' list of delivery requests for that
45 /* of in-core queue structures.
47 /* qmgr_queue_create() creates an empty named queue for the named
48 /* transport and destination. The queue is given an initial
54 /* qmgr_queue_done() disposes of a per-destination queue after all
56 /* of a dead queue.
58 /* qmgr_queue_find() looks up the named queue for the named
59 /* transport. A null result means that the queue was not found.
62 /* from the named transport one per-destination queue with a
116 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \ argument
117 (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
118 || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
121 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
124 #define QMGR_LOG_WINDOW(queue) \ argument
125 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
126 msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
127 myname, queue->name, queue->transport->dest_concurrency_limit, \
128 queue->window, queue->success, queue->failure, queue->fail_cohorts);
134 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; in qmgr_queue_resume() local
140 if (!QMGR_QUEUE_SUSPENDED(queue)) in qmgr_queue_resume()
141 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_resume()
144 * We can't simply force delivery on this queue: the transport's pending in qmgr_queue_resume()
150 queue->window = 1; in qmgr_queue_resume()
153 * Every event handler that leaves a queue in the "ready" state should in qmgr_queue_resume()
154 * remove the queue when it is empty. in qmgr_queue_resume()
156 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) in qmgr_queue_resume()
157 qmgr_queue_done(queue); in qmgr_queue_resume()
162 void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay) in qmgr_queue_suspend() argument
169 if (!QMGR_QUEUE_READY(queue)) in qmgr_queue_suspend()
170 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_suspend()
171 if (queue->busy_refcount > 0) in qmgr_queue_suspend()
172 msg_panic("%s: queue is busy", myname); in qmgr_queue_suspend()
175 * Set the queue status to "suspended". No-one is supposed to remove a in qmgr_queue_suspend()
176 * queue in suspended state. in qmgr_queue_suspend()
178 queue->window = QMGR_QUEUE_STAT_SUSPENDED; in qmgr_queue_suspend()
179 event_request_timer(qmgr_queue_resume, (void *) queue, delay); in qmgr_queue_suspend()
186 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; in qmgr_queue_unthrottle_wrapper() local
190 * context of some queue manipulation. Therefore, it is safe to discard in qmgr_queue_unthrottle_wrapper()
191 * this in-core queue when it is empty and when this site is not dead. in qmgr_queue_unthrottle_wrapper()
193 qmgr_queue_unthrottle(queue); in qmgr_queue_unthrottle_wrapper()
194 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) in qmgr_queue_unthrottle_wrapper()
195 qmgr_queue_done(queue); in qmgr_queue_unthrottle_wrapper()
200 void qmgr_queue_unthrottle(QMGR_QUEUE *queue) in qmgr_queue_unthrottle() argument
203 QMGR_TRANSPORT *transport = queue->transport; in qmgr_queue_unthrottle()
207 msg_info("%s: queue %s", myname, queue->name); in qmgr_queue_unthrottle()
212 if (!QMGR_QUEUE_THROTTLED(queue) && !QMGR_QUEUE_READY(queue)) in qmgr_queue_unthrottle()
213 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_unthrottle()
222 queue->fail_cohorts = 0; in qmgr_queue_unthrottle()
227 if (QMGR_QUEUE_THROTTLED(queue)) { in qmgr_queue_unthrottle()
228 event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue); in qmgr_queue_unthrottle()
229 if (queue->dsn == 0) in qmgr_queue_unthrottle()
230 msg_panic("%s: queue %s: window 0 status 0", myname, queue->name); in qmgr_queue_unthrottle()
231 dsn_free(queue->dsn); in qmgr_queue_unthrottle()
232 queue->dsn = 0; in qmgr_queue_unthrottle()
234 if (queue->busy_refcount > 0) in qmgr_queue_unthrottle()
235 queue->window = queue->busy_refcount; in qmgr_queue_unthrottle()
237 queue->window = transport->init_dest_concurrency; in qmgr_queue_unthrottle()
238 queue->success = queue->failure = 0; in qmgr_queue_unthrottle()
239 QMGR_LOG_WINDOW(queue); in qmgr_queue_unthrottle()
259 || transport->dest_concurrency_limit > queue->window) in qmgr_queue_unthrottle()
260 if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) { in qmgr_queue_unthrottle()
261 feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window); in qmgr_queue_unthrottle()
263 queue->success += feedback; in qmgr_queue_unthrottle()
265 while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) { in qmgr_queue_unthrottle()
266 queue->window += transport->pos_feedback.hysteresis; in qmgr_queue_unthrottle()
267 queue->success -= transport->pos_feedback.hysteresis; in qmgr_queue_unthrottle()
268 queue->failure = 0; in qmgr_queue_unthrottle()
272 && queue->window > transport->dest_concurrency_limit) in qmgr_queue_unthrottle()
273 queue->window = transport->dest_concurrency_limit; in qmgr_queue_unthrottle()
275 QMGR_LOG_WINDOW(queue); in qmgr_queue_unthrottle()
280 void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn) in qmgr_queue_throttle() argument
283 QMGR_TRANSPORT *transport = queue->transport; in qmgr_queue_throttle()
289 if (!QMGR_QUEUE_READY(queue)) in qmgr_queue_throttle()
290 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_throttle()
291 if (queue->dsn) in qmgr_queue_throttle()
292 msg_panic("%s: queue %s: spurious reason %s", in qmgr_queue_throttle()
293 myname, queue->name, queue->dsn->reason); in qmgr_queue_throttle()
295 msg_info("%s: queue %s: %s %s", in qmgr_queue_throttle()
296 myname, queue->name, dsn->status, dsn->reason); in qmgr_queue_throttle()
307 * This queue is declared dead after a configurable number of in qmgr_queue_throttle()
310 if (QMGR_QUEUE_READY(queue)) { in qmgr_queue_throttle()
311 queue->fail_cohorts += 1.0 / queue->window; in qmgr_queue_throttle()
313 && queue->fail_cohorts >= transport->fail_cohort_limit) in qmgr_queue_throttle()
314 queue->window = QMGR_QUEUE_STAT_THROTTLED; in qmgr_queue_throttle()
326 if (QMGR_QUEUE_READY(queue)) { in qmgr_queue_throttle()
327 feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window); in qmgr_queue_throttle()
329 queue->failure -= feedback; in qmgr_queue_throttle()
331 while (queue->failure - feedback / 2 < 0) { in qmgr_queue_throttle()
332 queue->window -= transport->neg_feedback.hysteresis; in qmgr_queue_throttle()
333 queue->success = 0; in qmgr_queue_throttle()
334 queue->failure += transport->neg_feedback.hysteresis; in qmgr_queue_throttle()
337 if (queue->window < 1) in qmgr_queue_throttle()
338 queue->window = 1; in qmgr_queue_throttle()
344 if (QMGR_QUEUE_THROTTLED(queue)) { in qmgr_queue_throttle()
345 queue->dsn = DSN_COPY(dsn); in qmgr_queue_throttle()
347 (void *) queue, var_min_backoff_time); in qmgr_queue_throttle()
348 queue->dflags = 0; in qmgr_queue_throttle()
350 QMGR_LOG_WINDOW(queue); in qmgr_queue_throttle()
353 /* qmgr_queue_select - select in-core queue for delivery */
357 QMGR_QUEUE *queue; in qmgr_queue_select() local
363 for (queue = transport->queue_list.next; queue; queue = queue->peers.next) { in qmgr_queue_select()
364 if (queue->window > queue->busy_refcount && queue->todo.next != 0) { in qmgr_queue_select()
365 QMGR_LIST_ROTATE(transport->queue_list, queue); in qmgr_queue_select()
367 msg_info("qmgr_queue_select: %s", queue->name); in qmgr_queue_select()
368 return (queue); in qmgr_queue_select()
374 /* qmgr_queue_done - delete in-core queue for site */
376 void qmgr_queue_done(QMGR_QUEUE *queue) in qmgr_queue_done() argument
379 QMGR_TRANSPORT *transport = queue->transport; in qmgr_queue_done()
382 * Sanity checks. It is an error to delete an in-core queue with pending in qmgr_queue_done()
385 if (queue->busy_refcount != 0 || queue->todo_refcount != 0) in qmgr_queue_done()
387 queue->busy_refcount + queue->todo_refcount); in qmgr_queue_done()
388 if (queue->todo.next || queue->busy.next) in qmgr_queue_done()
389 msg_panic("%s: queue not empty: %s", myname, queue->name); in qmgr_queue_done()
390 if (!QMGR_QUEUE_READY(queue)) in qmgr_queue_done()
391 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_done()
392 if (queue->dsn) in qmgr_queue_done()
393 msg_panic("%s: queue %s: spurious reason %s", in qmgr_queue_done()
394 myname, queue->name, queue->dsn->reason); in qmgr_queue_done()
397 * Clean up this in-core queue. in qmgr_queue_done()
399 QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue); in qmgr_queue_done()
400 htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0); in qmgr_queue_done()
401 myfree(queue->name); in qmgr_queue_done()
402 myfree(queue->nexthop); in qmgr_queue_done()
404 myfree((void *) queue); in qmgr_queue_done()
407 /* qmgr_queue_create - create in-core queue for site */
412 QMGR_QUEUE *queue; in qmgr_queue_create() local
419 queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE)); in qmgr_queue_create()
421 queue->dflags = 0; in qmgr_queue_create()
422 queue->last_done = 0; in qmgr_queue_create()
423 queue->name = mystrdup(name); in qmgr_queue_create()
424 queue->nexthop = mystrdup(nexthop); in qmgr_queue_create()
425 queue->todo_refcount = 0; in qmgr_queue_create()
426 queue->busy_refcount = 0; in qmgr_queue_create()
427 queue->transport = transport; in qmgr_queue_create()
428 queue->window = transport->init_dest_concurrency; in qmgr_queue_create()
429 queue->success = queue->failure = queue->fail_cohorts = 0; in qmgr_queue_create()
430 QMGR_LIST_INIT(queue->todo); in qmgr_queue_create()
431 QMGR_LIST_INIT(queue->busy); in qmgr_queue_create()
432 queue->dsn = 0; in qmgr_queue_create()
433 queue->clog_time_to_warn = 0; in qmgr_queue_create()
434 QMGR_LIST_PREPEND(transport->queue_list, queue); in qmgr_queue_create()
435 htable_enter(transport->queue_byname, name, (void *) queue); in qmgr_queue_create()
436 return (queue); in qmgr_queue_create()
439 /* qmgr_queue_find - find in-core named queue */