Lines Matching refs:queue
116 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \ argument
117 (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
118 || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
121 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
124 #define QMGR_LOG_WINDOW(queue) \ argument
125 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
127 myname, queue->name, queue->transport->dest_concurrency_limit, \
128 queue->window, queue->success, queue->failure, queue->fail_cohorts);
134 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; in qmgr_queue_resume() local
140 if (!QMGR_QUEUE_SUSPENDED(queue)) in qmgr_queue_resume()
141 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_resume()
150 queue->window = 1; in qmgr_queue_resume()
156 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) in qmgr_queue_resume()
157 qmgr_queue_done(queue); in qmgr_queue_resume()
162 void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay) in qmgr_queue_suspend() argument
169 if (!QMGR_QUEUE_READY(queue)) in qmgr_queue_suspend()
170 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_suspend()
171 if (queue->busy_refcount > 0) in qmgr_queue_suspend()
178 queue->window = QMGR_QUEUE_STAT_SUSPENDED; in qmgr_queue_suspend()
179 event_request_timer(qmgr_queue_resume, (void *) queue, delay); in qmgr_queue_suspend()
186 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; in qmgr_queue_unthrottle_wrapper() local
193 qmgr_queue_unthrottle(queue); in qmgr_queue_unthrottle_wrapper()
194 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) in qmgr_queue_unthrottle_wrapper()
195 qmgr_queue_done(queue); in qmgr_queue_unthrottle_wrapper()
200 void qmgr_queue_unthrottle(QMGR_QUEUE *queue) in qmgr_queue_unthrottle() argument
203 QMGR_TRANSPORT *transport = queue->transport; in qmgr_queue_unthrottle()
207 msg_info("%s: queue %s", myname, queue->name); in qmgr_queue_unthrottle()
212 if (!QMGR_QUEUE_THROTTLED(queue) && !QMGR_QUEUE_READY(queue)) in qmgr_queue_unthrottle()
213 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_unthrottle()
222 queue->fail_cohorts = 0; in qmgr_queue_unthrottle()
227 if (QMGR_QUEUE_THROTTLED(queue)) { in qmgr_queue_unthrottle()
228 event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue); in qmgr_queue_unthrottle()
229 if (queue->dsn == 0) in qmgr_queue_unthrottle()
230 msg_panic("%s: queue %s: window 0 status 0", myname, queue->name); in qmgr_queue_unthrottle()
231 dsn_free(queue->dsn); in qmgr_queue_unthrottle()
232 queue->dsn = 0; in qmgr_queue_unthrottle()
234 if (queue->busy_refcount > 0) in qmgr_queue_unthrottle()
235 queue->window = queue->busy_refcount; in qmgr_queue_unthrottle()
237 queue->window = transport->init_dest_concurrency; in qmgr_queue_unthrottle()
238 queue->success = queue->failure = 0; in qmgr_queue_unthrottle()
239 QMGR_LOG_WINDOW(queue); in qmgr_queue_unthrottle()
259 || transport->dest_concurrency_limit > queue->window) in qmgr_queue_unthrottle()
260 if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) { in qmgr_queue_unthrottle()
261 feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window); in qmgr_queue_unthrottle()
263 queue->success += feedback; in qmgr_queue_unthrottle()
265 while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) { in qmgr_queue_unthrottle()
266 queue->window += transport->pos_feedback.hysteresis; in qmgr_queue_unthrottle()
267 queue->success -= transport->pos_feedback.hysteresis; in qmgr_queue_unthrottle()
268 queue->failure = 0; in qmgr_queue_unthrottle()
272 && queue->window > transport->dest_concurrency_limit) in qmgr_queue_unthrottle()
273 queue->window = transport->dest_concurrency_limit; in qmgr_queue_unthrottle()
275 QMGR_LOG_WINDOW(queue); in qmgr_queue_unthrottle()
280 void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn) in qmgr_queue_throttle() argument
283 QMGR_TRANSPORT *transport = queue->transport; in qmgr_queue_throttle()
289 if (!QMGR_QUEUE_READY(queue)) in qmgr_queue_throttle()
290 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_throttle()
291 if (queue->dsn) in qmgr_queue_throttle()
293 myname, queue->name, queue->dsn->reason); in qmgr_queue_throttle()
296 myname, queue->name, dsn->status, dsn->reason); in qmgr_queue_throttle()
310 if (QMGR_QUEUE_READY(queue)) { in qmgr_queue_throttle()
311 queue->fail_cohorts += 1.0 / queue->window; in qmgr_queue_throttle()
313 && queue->fail_cohorts >= transport->fail_cohort_limit) in qmgr_queue_throttle()
314 queue->window = QMGR_QUEUE_STAT_THROTTLED; in qmgr_queue_throttle()
326 if (QMGR_QUEUE_READY(queue)) { in qmgr_queue_throttle()
327 feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window); in qmgr_queue_throttle()
329 queue->failure -= feedback; in qmgr_queue_throttle()
331 while (queue->failure - feedback / 2 < 0) { in qmgr_queue_throttle()
332 queue->window -= transport->neg_feedback.hysteresis; in qmgr_queue_throttle()
333 queue->success = 0; in qmgr_queue_throttle()
334 queue->failure += transport->neg_feedback.hysteresis; in qmgr_queue_throttle()
337 if (queue->window < 1) in qmgr_queue_throttle()
338 queue->window = 1; in qmgr_queue_throttle()
344 if (QMGR_QUEUE_THROTTLED(queue)) { in qmgr_queue_throttle()
345 queue->dsn = DSN_COPY(dsn); in qmgr_queue_throttle()
347 (void *) queue, var_min_backoff_time); in qmgr_queue_throttle()
348 queue->dflags = 0; in qmgr_queue_throttle()
350 QMGR_LOG_WINDOW(queue); in qmgr_queue_throttle()
357 QMGR_QUEUE *queue; in qmgr_queue_select() local
363 for (queue = transport->queue_list.next; queue; queue = queue->peers.next) { in qmgr_queue_select()
364 if (queue->window > queue->busy_refcount && queue->todo.next != 0) { in qmgr_queue_select()
365 QMGR_LIST_ROTATE(transport->queue_list, queue); in qmgr_queue_select()
367 msg_info("qmgr_queue_select: %s", queue->name); in qmgr_queue_select()
368 return (queue); in qmgr_queue_select()
376 void qmgr_queue_done(QMGR_QUEUE *queue) in qmgr_queue_done() argument
379 QMGR_TRANSPORT *transport = queue->transport; in qmgr_queue_done()
385 if (queue->busy_refcount != 0 || queue->todo_refcount != 0) in qmgr_queue_done()
387 queue->busy_refcount + queue->todo_refcount); in qmgr_queue_done()
388 if (queue->todo.next || queue->busy.next) in qmgr_queue_done()
389 msg_panic("%s: queue not empty: %s", myname, queue->name); in qmgr_queue_done()
390 if (!QMGR_QUEUE_READY(queue)) in qmgr_queue_done()
391 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); in qmgr_queue_done()
392 if (queue->dsn) in qmgr_queue_done()
394 myname, queue->name, queue->dsn->reason); in qmgr_queue_done()
399 QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue); in qmgr_queue_done()
400 htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0); in qmgr_queue_done()
401 myfree(queue->name); in qmgr_queue_done()
402 myfree(queue->nexthop); in qmgr_queue_done()
404 myfree((void *) queue); in qmgr_queue_done()
412 QMGR_QUEUE *queue; in qmgr_queue_create() local
419 queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE)); in qmgr_queue_create()
421 queue->dflags = 0; in qmgr_queue_create()
422 queue->last_done = 0; in qmgr_queue_create()
423 queue->name = mystrdup(name); in qmgr_queue_create()
424 queue->nexthop = mystrdup(nexthop); in qmgr_queue_create()
425 queue->todo_refcount = 0; in qmgr_queue_create()
426 queue->busy_refcount = 0; in qmgr_queue_create()
427 queue->transport = transport; in qmgr_queue_create()
428 queue->window = transport->init_dest_concurrency; in qmgr_queue_create()
429 queue->success = queue->failure = queue->fail_cohorts = 0; in qmgr_queue_create()
430 QMGR_LIST_INIT(queue->todo); in qmgr_queue_create()
431 QMGR_LIST_INIT(queue->busy); in qmgr_queue_create()
432 queue->dsn = 0; in qmgr_queue_create()
433 queue->clog_time_to_warn = 0; in qmgr_queue_create()
434 QMGR_LIST_PREPEND(transport->queue_list, queue); in qmgr_queue_create()
435 htable_enter(transport->queue_byname, name, (void *) queue); in qmgr_queue_create()
436 return (queue); in qmgr_queue_create()