xref: /netbsd-src/external/ibm-public/postfix/dist/src/oqmgr/qmgr_queue.c (revision e89934bbf778a6d6d6894877c4da59d0c7835b0f)
1 /*	$NetBSD: qmgr_queue.c,v 1.2 2017/02/14 01:16:46 christos Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_queue 3
6 /* SUMMARY
7 /*	per-destination queues
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	int	qmgr_queue_count;
12 /*
13 /*	QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop)
14 /*	QMGR_TRANSPORT *transport;
15 /*	const char *name;
16 /*	const char *nexthop;
17 /*
18 /*	void	qmgr_queue_done(queue)
19 /*	QMGR_QUEUE *queue;
20 /*
21 /*	QMGR_QUEUE *qmgr_queue_find(transport, name)
22 /*	QMGR_TRANSPORT *transport;
23 /*	const char *name;
24 /*
25 /*	QMGR_QUEUE *qmgr_queue_select(transport)
26 /*	QMGR_TRANSPORT *transport;
27 /*
28 /*	void	qmgr_queue_throttle(queue, dsn)
29 /*	QMGR_QUEUE *queue;
30 /*	DSN	*dsn;
31 /*
32 /*	void	qmgr_queue_unthrottle(queue)
33 /*	QMGR_QUEUE *queue;
34 /*
35 /*	void	qmgr_queue_suspend(queue, delay)
36 /*	QMGR_QUEUE *queue;
37 /*	int	delay;
38 /* DESCRIPTION
39 /*	These routines add/delete/manipulate per-destination queues.
40 /*	Each queue corresponds to a specific transport and destination.
41 /*	Each queue has a `todo' list of delivery requests for that
42 /*	destination, and a `busy' list of delivery requests in progress.
43 /*
44 /*	qmgr_queue_count is a global counter for the total number
45 /*	of in-core queue structures.
46 /*
47 /*	qmgr_queue_create() creates an empty named queue for the named
48 /*	transport and destination. The queue is given an initial
49 /*	concurrency limit as specified with the
50 /*	\fIinitial_destination_concurrency\fR configuration parameter,
51 /*	provided that it does not exceed the transport-specific
52 /*	concurrency limit.
53 /*
54 /*	qmgr_queue_done() disposes of a per-destination queue after all
55 /*	its entries have been taken care of. It is an error to dispose
56 /*	of a dead queue.
57 /*
58 /*	qmgr_queue_find() looks up the named queue for the named
59 /*	transport. A null result means that the queue was not found.
60 /*
61 /*	qmgr_queue_select() uses a round-robin strategy to select
62 /*	from the named transport one per-destination queue with a
63 /*	non-empty `todo' list.
64 /*
65 /*	qmgr_queue_throttle() handles a delivery error, and decrements the
66 /*	concurrency limit for the destination, with a lower bound of 1.
67 /*	When the cohort failure bound is reached, qmgr_queue_throttle()
68 /*	sets the concurrency limit to zero and starts a timer
69 /*	to re-enable delivery to the destination after a configurable delay.
70 /*
71 /*	qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects.
72 /*	The concurrency limit for the destination is incremented,
73 /*	provided that it does not exceed the destination concurrency
74 /*	limit specified for the transport. This routine implements
75 /*	"slow open" mode, and eliminates the "thundering herd" problem.
76 /*
77 /*	qmgr_queue_suspend() suspends delivery for this destination
78 /*	briefly.
79 /* DIAGNOSTICS
80 /*	Panic: consistency check failure.
81 /* LICENSE
82 /* .ad
83 /* .fi
84 /*	The Secure Mailer license must be distributed with this software.
85 /* AUTHOR(S)
86 /*	Wietse Venema
87 /*	IBM T.J. Watson Research
88 /*	P.O. Box 704
89 /*	Yorktown Heights, NY 10598, USA
90 /*--*/
91 
92 /* System library. */
93 
94 #include <sys_defs.h>
95 #include <time.h>
96 
97 /* Utility library. */
98 
99 #include <msg.h>
100 #include <mymalloc.h>
101 #include <events.h>
102 #include <htable.h>
103 
104 /* Global library. */
105 
106 #include <mail_params.h>
107 #include <recipient_list.h>
108 #include <mail_proto.h>			/* QMGR_LOG_WINDOW */
109 
110 /* Application-specific. */
111 
112 #include "qmgr.h"
113 
114 int     qmgr_queue_count;
115 
116 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \
117 	(strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
118 	    || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
119 
120 #define QMGR_LOG_FEEDBACK(feedback) \
121 	if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
122 	    msg_info("%s: feedback %g", myname, feedback);
123 
124 #define QMGR_LOG_WINDOW(queue) \
125 	if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
126 	    msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
127 		    myname, queue->name, queue->transport->dest_concurrency_limit, \
128 		    queue->window, queue->success, queue->failure, queue->fail_cohorts);
129 
130 /* qmgr_queue_resume - resume delivery to destination */
131 
qmgr_queue_resume(int event,void * context)132 static void qmgr_queue_resume(int event, void *context)
133 {
134     QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
135     const char *myname = "qmgr_queue_resume";
136 
137     /*
138      * Sanity checks.
139      */
140     if (!QMGR_QUEUE_SUSPENDED(queue))
141 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
142 
143     /*
144      * We can't simply force delivery on this queue: the transport's pending
145      * count may already be maxed out, and there may be other constraints
146      * that definitely should be none of our business. The best we can do is
147      * to play by the same rules as everyone else: let qmgr_active_drain()
148      * and round-robin selection take care of message selection.
149      */
150     queue->window = 1;
151 
152     /*
153      * Every event handler that leaves a queue in the "ready" state should
154      * remove the queue when it is empty.
155      */
156     if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
157 	qmgr_queue_done(queue);
158 }
159 
160 /* qmgr_queue_suspend - briefly suspend a destination */
161 
qmgr_queue_suspend(QMGR_QUEUE * queue,int delay)162 void    qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
163 {
164     const char *myname = "qmgr_queue_suspend";
165 
166     /*
167      * Sanity checks.
168      */
169     if (!QMGR_QUEUE_READY(queue))
170 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
171     if (queue->busy_refcount > 0)
172 	msg_panic("%s: queue is busy", myname);
173 
174     /*
175      * Set the queue status to "suspended". No-one is supposed to remove a
176      * queue in suspended state.
177      */
178     queue->window = QMGR_QUEUE_STAT_SUSPENDED;
179     event_request_timer(qmgr_queue_resume, (void *) queue, delay);
180 }
181 
182 /* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */
183 
qmgr_queue_unthrottle_wrapper(int unused_event,void * context)184 static void qmgr_queue_unthrottle_wrapper(int unused_event, void *context)
185 {
186     QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
187 
188     /*
189      * This routine runs when a wakeup timer goes off; it does not run in the
190      * context of some queue manipulation. Therefore, it is safe to discard
191      * this in-core queue when it is empty and when this site is not dead.
192      */
193     qmgr_queue_unthrottle(queue);
194     if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
195 	qmgr_queue_done(queue);
196 }
197 
198 /* qmgr_queue_unthrottle - give this destination another chance */
199 
qmgr_queue_unthrottle(QMGR_QUEUE * queue)200 void    qmgr_queue_unthrottle(QMGR_QUEUE *queue)
201 {
202     const char *myname = "qmgr_queue_unthrottle";
203     QMGR_TRANSPORT *transport = queue->transport;
204     double  feedback;
205 
206     if (msg_verbose)
207 	msg_info("%s: queue %s", myname, queue->name);
208 
209     /*
210      * Sanity checks.
211      */
212     if (!QMGR_QUEUE_THROTTLED(queue) && !QMGR_QUEUE_READY(queue))
213 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
214 
215     /*
216      * Don't restart the negative feedback hysteresis cycle with every
217      * positive feedback. Restart it only when we make a positive concurrency
218      * adjustment (i.e. at the end of a positive feedback hysteresis cycle).
219      * Otherwise negative feedback would be too aggressive: negative feedback
220      * takes effect immediately at the start of its hysteresis cycle.
221      */
222     queue->fail_cohorts = 0;
223 
224     /*
225      * Special case when this site was dead.
226      */
227     if (QMGR_QUEUE_THROTTLED(queue)) {
228 	event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue);
229 	if (queue->dsn == 0)
230 	    msg_panic("%s: queue %s: window 0 status 0", myname, queue->name);
231 	dsn_free(queue->dsn);
232 	queue->dsn = 0;
233 	/* Back from the almost grave, best concurrency is anyone's guess. */
234 	if (queue->busy_refcount > 0)
235 	    queue->window = queue->busy_refcount;
236 	else
237 	    queue->window = transport->init_dest_concurrency;
238 	queue->success = queue->failure = 0;
239 	QMGR_LOG_WINDOW(queue);
240 	return;
241     }
242 
243     /*
244      * Increase the destination's concurrency limit until we reach the
245      * transport's concurrency limit. Allow for a margin the size of the
246      * initial destination concurrency, so that we're not too gentle.
247      *
248      * Why is the concurrency increment based on preferred concurrency and not
249      * on the number of outstanding delivery requests? The latter fluctuates
250      * wildly when deliveries complete in bursts (artificial benchmark
251      * measurements), and does not account for cached connections.
252      *
253      * Keep the window within reasonable distance from actual concurrency
254      * otherwise negative feedback will be ineffective. This expression
255      * assumes that busy_refcount changes gradually. This is invalid when
256      * deliveries complete in bursts (artificial benchmark measurements).
257      */
258     if (transport->dest_concurrency_limit == 0
259 	|| transport->dest_concurrency_limit > queue->window)
260 	if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) {
261 	    feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window);
262 	    QMGR_LOG_FEEDBACK(feedback);
263 	    queue->success += feedback;
264 	    /* Prepare for overshoot (feedback > hysteresis, rounding error). */
265 	    while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) {
266 		queue->window += transport->pos_feedback.hysteresis;
267 		queue->success -= transport->pos_feedback.hysteresis;
268 		queue->failure = 0;
269 	    }
270 	    /* Prepare for overshoot. */
271 	    if (transport->dest_concurrency_limit > 0
272 		&& queue->window > transport->dest_concurrency_limit)
273 		queue->window = transport->dest_concurrency_limit;
274 	}
275     QMGR_LOG_WINDOW(queue);
276 }
277 
278 /* qmgr_queue_throttle - handle destination delivery failure */
279 
qmgr_queue_throttle(QMGR_QUEUE * queue,DSN * dsn)280 void    qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn)
281 {
282     const char *myname = "qmgr_queue_throttle";
283     QMGR_TRANSPORT *transport = queue->transport;
284     double  feedback;
285 
286     /*
287      * Sanity checks.
288      */
289     if (!QMGR_QUEUE_READY(queue))
290 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
291     if (queue->dsn)
292 	msg_panic("%s: queue %s: spurious reason %s",
293 		  myname, queue->name, queue->dsn->reason);
294     if (msg_verbose)
295 	msg_info("%s: queue %s: %s %s",
296 		 myname, queue->name, dsn->status, dsn->reason);
297 
298     /*
299      * Don't restart the positive feedback hysteresis cycle with every
300      * negative feedback. Restart it only when we make a negative concurrency
301      * adjustment (i.e. at the start of a negative feedback hysteresis
302      * cycle). Otherwise positive feedback would be too weak (positive
303      * feedback does not take effect until the end of its hysteresis cycle).
304      */
305 
306     /*
307      * This queue is declared dead after a configurable number of
308      * pseudo-cohort failures.
309      */
310     if (QMGR_QUEUE_READY(queue)) {
311 	queue->fail_cohorts += 1.0 / queue->window;
312 	if (transport->fail_cohort_limit > 0
313 	    && queue->fail_cohorts >= transport->fail_cohort_limit)
314 	    queue->window = QMGR_QUEUE_STAT_THROTTLED;
315     }
316 
317     /*
318      * Decrease the destination's concurrency limit until we reach 1. Base
319      * adjustments on the concurrency limit itself, instead of using the
320      * actual concurrency. The latter fluctuates wildly when deliveries
321      * complete in bursts (artificial benchmark measurements).
322      *
323      * Even after reaching 1, we maintain the negative hysteresis cycle so that
324      * negative feedback can cancel out positive feedback.
325      */
326     if (QMGR_QUEUE_READY(queue)) {
327 	feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window);
328 	QMGR_LOG_FEEDBACK(feedback);
329 	queue->failure -= feedback;
330 	/* Prepare for overshoot (feedback > hysteresis, rounding error). */
331 	while (queue->failure - feedback / 2 < 0) {
332 	    queue->window -= transport->neg_feedback.hysteresis;
333 	    queue->success = 0;
334 	    queue->failure += transport->neg_feedback.hysteresis;
335 	}
336 	/* Prepare for overshoot. */
337 	if (queue->window < 1)
338 	    queue->window = 1;
339     }
340 
341     /*
342      * Special case for a site that just was declared dead.
343      */
344     if (QMGR_QUEUE_THROTTLED(queue)) {
345 	queue->dsn = DSN_COPY(dsn);
346 	event_request_timer(qmgr_queue_unthrottle_wrapper,
347 			    (void *) queue, var_min_backoff_time);
348 	queue->dflags = 0;
349     }
350     QMGR_LOG_WINDOW(queue);
351 }
352 
353 /* qmgr_queue_select - select in-core queue for delivery */
354 
qmgr_queue_select(QMGR_TRANSPORT * transport)355 QMGR_QUEUE *qmgr_queue_select(QMGR_TRANSPORT *transport)
356 {
357     QMGR_QUEUE *queue;
358 
359     /*
360      * If we find a suitable site, rotate the list to enforce round-robin
361      * selection. See similar selection code in qmgr_transport_select().
362      */
363     for (queue = transport->queue_list.next; queue; queue = queue->peers.next) {
364 	if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
365 	    QMGR_LIST_ROTATE(transport->queue_list, queue);
366 	    if (msg_verbose)
367 		msg_info("qmgr_queue_select: %s", queue->name);
368 	    return (queue);
369 	}
370     }
371     return (0);
372 }
373 
374 /* qmgr_queue_done - delete in-core queue for site */
375 
qmgr_queue_done(QMGR_QUEUE * queue)376 void    qmgr_queue_done(QMGR_QUEUE *queue)
377 {
378     const char *myname = "qmgr_queue_done";
379     QMGR_TRANSPORT *transport = queue->transport;
380 
381     /*
382      * Sanity checks. It is an error to delete an in-core queue with pending
383      * messages or timers.
384      */
385     if (queue->busy_refcount != 0 || queue->todo_refcount != 0)
386 	msg_panic("%s: refcount: %d", myname,
387 		  queue->busy_refcount + queue->todo_refcount);
388     if (queue->todo.next || queue->busy.next)
389 	msg_panic("%s: queue not empty: %s", myname, queue->name);
390     if (!QMGR_QUEUE_READY(queue))
391 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
392     if (queue->dsn)
393 	msg_panic("%s: queue %s: spurious reason %s",
394 		  myname, queue->name, queue->dsn->reason);
395 
396     /*
397      * Clean up this in-core queue.
398      */
399     QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue);
400     htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0);
401     myfree(queue->name);
402     myfree(queue->nexthop);
403     qmgr_queue_count--;
404     myfree((void *) queue);
405 }
406 
407 /* qmgr_queue_create - create in-core queue for site */
408 
qmgr_queue_create(QMGR_TRANSPORT * transport,const char * name,const char * nexthop)409 QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name,
410 			              const char *nexthop)
411 {
412     QMGR_QUEUE *queue;
413 
414     /*
415      * If possible, choose an initial concurrency of > 1 so that one bad
416      * message or one bad network won't slow us down unnecessarily.
417      */
418 
419     queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE));
420     qmgr_queue_count++;
421     queue->dflags = 0;
422     queue->last_done = 0;
423     queue->name = mystrdup(name);
424     queue->nexthop = mystrdup(nexthop);
425     queue->todo_refcount = 0;
426     queue->busy_refcount = 0;
427     queue->transport = transport;
428     queue->window = transport->init_dest_concurrency;
429     queue->success = queue->failure = queue->fail_cohorts = 0;
430     QMGR_LIST_INIT(queue->todo);
431     QMGR_LIST_INIT(queue->busy);
432     queue->dsn = 0;
433     queue->clog_time_to_warn = 0;
434     QMGR_LIST_PREPEND(transport->queue_list, queue);
435     htable_enter(transport->queue_byname, name, (void *) queue);
436     return (queue);
437 }
438 
439 /* qmgr_queue_find - find in-core named queue */
440 
qmgr_queue_find(QMGR_TRANSPORT * transport,const char * name)441 QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
442 {
443     return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name));
444 }
445