xref: /netbsd-src/external/ibm-public/postfix/dist/src/oqmgr/qmgr_entry.c (revision 67b9b338a7386232ac596b5fd0cd5a9cc8a03c71)
1 /*	$NetBSD: qmgr_entry.c,v 1.3 2022/10/08 16:12:46 christos Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_entry 3
6 /* SUMMARY
7 /*	per-site queue entries
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	QMGR_ENTRY *qmgr_entry_create(queue, message)
12 /*	QMGR_QUEUE *queue;
13 /*	QMGR_MESSAGE *message;
14 /*
15 /*	void	qmgr_entry_done(entry, which)
16 /*	QMGR_ENTRY *entry;
17 /*	int	which;
18 /*
19 /*	QMGR_ENTRY *qmgr_entry_select(queue)
20 /*	QMGR_QUEUE *queue;
21 /*
22 /*	void	qmgr_entry_unselect(queue, entry)
23 /*	QMGR_QUEUE *queue;
24 /*	QMGR_ENTRY *entry;
25 /*
26 /*	void	qmgr_entry_move_todo(dst, entry)
27 /*	QMGR_QUEUE *dst;
28 /*	QMGR_ENTRY *entry;
29 /* DESCRIPTION
30 /*	These routines add/delete/manipulate per-site message
31 /*	delivery requests.
32 /*
33 /*	qmgr_entry_create() creates an entry for the named queue and
34 /*	message, and appends the entry to the queue's todo list.
35 /*	Filling in and cleaning up the recipients is the responsibility
36 /*	of the caller.
37 /*
38 /*	qmgr_entry_done() discards a per-site queue entry.  The
39 /*	\fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry
40 /*	of the site's `busy' list (i.e. queue entries that have been
41 /*	selected for actual delivery), or QMGR_QUEUE_TODO for an entry
42 /*	of the site's `todo' list (i.e. queue entries awaiting selection
43 /*	for actual delivery).
44 /*
45 /*	qmgr_entry_done() triggers cleanup of the per-site queue when
46 /*	the site has no pending deliveries, and the site is either
47 /*	alive, or the site is dead and the number of in-core queues
48 /*	exceeds a configurable limit (see qmgr_queue_done()).
49 /*
50 /*	qmgr_entry_done() triggers special action when the last in-core
51 /*	queue entry for a message is done with: either read more
52 /*	recipients from the queue file, delete the queue file, or move
53 /*	the queue file to the deferred queue; send bounce reports to the
54 /*	message originator (see qmgr_active_done()).
55 /*
56 /*	qmgr_entry_select() selects the next entry from the named
57 /*	per-site queue's `todo' list for actual delivery. The entry is
58 /*	moved to the queue's `busy' list: the list of messages being
59 /*	delivered.
60 /*
61 /*	qmgr_entry_unselect() takes the named entry off the named
62 /*	per-site queue's `busy' list and moves it to the queue's
63 /*	`todo' list.
64 /*
65 /*	qmgr_entry_move_todo() moves the specified "todo" queue entry
66 /*	to the specified "todo" queue.
67 /* DIAGNOSTICS
68 /*	Panic: interface violations, internal inconsistencies.
69 /* LICENSE
70 /* .ad
71 /* .fi
72 /*	The Secure Mailer license must be distributed with this software.
73 /* AUTHOR(S)
74 /*	Wietse Venema
75 /*	IBM T.J. Watson Research
76 /*	P.O. Box 704
77 /*	Yorktown Heights, NY 10598, USA
78 /*--*/
79 
80 /* System library. */
81 
82 #include <sys_defs.h>
83 #include <stdlib.h>
84 #include <time.h>
85 
86 /* Utility library. */
87 
88 #include <msg.h>
89 #include <mymalloc.h>
90 #include <events.h>
91 #include <vstream.h>
92 
93 /* Global library. */
94 
95 #include <mail_params.h>
96 #include <deliver_request.h>		/* opportunistic session caching */
97 
98 /* Application-specific. */
99 
100 #include "qmgr.h"
101 
102 /* qmgr_entry_select - select queue entry for delivery */
103 
qmgr_entry_select(QMGR_QUEUE * queue)104 QMGR_ENTRY *qmgr_entry_select(QMGR_QUEUE *queue)
105 {
106     const char *myname = "qmgr_entry_select";
107     QMGR_ENTRY *entry;
108 
109     if ((entry = queue->todo.prev) != 0) {
110 	QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry);
111 	queue->todo_refcount--;
112 	QMGR_LIST_APPEND(queue->busy, entry);
113 	queue->busy_refcount++;
114 
115 	/*
116 	 * With opportunistic session caching, the delivery agent must not
117 	 * only 1) save a session upon completion, but also 2) reuse a cached
118 	 * session upon the next delivery request. In order to not miss out
119 	 * on 2), we have to make caching sticky or else we get silly
120 	 * behavior when the in-memory queue drains. Specifically, new
121 	 * connections must not be made as long as cached connections exist.
122 	 *
123 	 * Safety: don't enable opportunistic session caching unless the queue
124 	 * manager is able to schedule concurrent or back-to-back deliveries
125 	 * (we need to recognize back-to-back deliveries for transports with
126 	 * concurrency 1).
127 	 *
128 	 * If caching has previously been enabled, but is not now, fetch any
129 	 * existing entries from the cache, but don't add new ones.
130 	 */
131 #define CONCURRENT_OR_BACK_TO_BACK_DELIVERY() \
132 	    (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY())
133 
134 #define BACK_TO_BACK_DELIVERY() \
135 		(queue->last_done + 1 >= event_time())
136 
137 	/*
138 	 * Turn on session caching after we get up to speed. Don't enable
139 	 * session caching just because we have concurrent deliveries. This
140 	 * prevents unnecessary session caching when we have a burst of mail
141 	 * <= the initial concurrency limit.
142 	 */
143 	if ((queue->dflags & DEL_REQ_FLAG_CONN_STORE) == 0) {
144 	    if (BACK_TO_BACK_DELIVERY()) {
145 		if (msg_verbose)
146 		    msg_info("%s: allowing on-demand session caching for %s",
147 			     myname, queue->name);
148 		queue->dflags |= DEL_REQ_FLAG_CONN_MASK;
149 	    }
150 	}
151 
152 	/*
153 	 * Turn off session caching when concurrency drops and we're running
154 	 * out of steam. This is what prevents from turning off session
155 	 * caching too early, and from making new connections while old ones
156 	 * are still cached.
157 	 */
158 	else {
159 	    if (!CONCURRENT_OR_BACK_TO_BACK_DELIVERY()) {
160 		if (msg_verbose)
161 		    msg_info("%s: disallowing on-demand session caching for %s",
162 			     myname, queue->name);
163 		queue->dflags &= ~DEL_REQ_FLAG_CONN_STORE;
164 	    }
165 	}
166     }
167     return (entry);
168 }
169 
170 /* qmgr_entry_unselect - unselect queue entry for delivery */
171 
qmgr_entry_unselect(QMGR_QUEUE * queue,QMGR_ENTRY * entry)172 void    qmgr_entry_unselect(QMGR_QUEUE *queue, QMGR_ENTRY *entry)
173 {
174     QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry);
175     queue->busy_refcount--;
176     QMGR_LIST_APPEND(queue->todo, entry);
177     queue->todo_refcount++;
178 }
179 
180 /* qmgr_entry_move_todo - move entry between todo queues */
181 
qmgr_entry_move_todo(QMGR_QUEUE * dst,QMGR_ENTRY * entry)182 void    qmgr_entry_move_todo(QMGR_QUEUE *dst, QMGR_ENTRY *entry)
183 {
184     const char *myname = "qmgr_entry_move_todo";
185     QMGR_MESSAGE *message = entry->message;
186     QMGR_QUEUE *src = entry->queue;
187     QMGR_ENTRY *new_entry;
188 
189     if (entry->stream != 0)
190 	msg_panic("%s: queue %s entry is busy", myname, src->name);
191     if (QMGR_QUEUE_THROTTLED(dst))
192 	msg_panic("%s: destination queue %s is throttled", myname, dst->name);
193     if (QMGR_TRANSPORT_THROTTLED(dst->transport))
194 	msg_panic("%s: destination transport %s is throttled",
195 		  myname, dst->transport->name);
196 
197     /*
198      * Create new entry, swap the recipients between the old and new entries,
199      * then dispose of the old entry. This gives us any end-game actions that
200      * are implemented by qmgr_entry_done(), so we don't have to duplicate
201      * those actions here.
202      *
203      * XXX This does not enforce the per-entry recipient limit, but that is not
204      * a problem as long as qmgr_entry_move_todo() is called only to bounce
205      * or defer mail.
206      */
207     new_entry = qmgr_entry_create(dst, message);
208     recipient_list_swap(&entry->rcpt_list, &new_entry->rcpt_list);
209     qmgr_entry_done(entry, QMGR_QUEUE_TODO);
210 }
211 
212 /* qmgr_entry_done - dispose of queue entry */
213 
qmgr_entry_done(QMGR_ENTRY * entry,int which)214 void    qmgr_entry_done(QMGR_ENTRY *entry, int which)
215 {
216     const char *myname = "qmgr_entry_done";
217     QMGR_QUEUE *queue = entry->queue;
218     QMGR_MESSAGE *message = entry->message;
219     QMGR_TRANSPORT *transport = queue->transport;
220 
221     /*
222      * Take this entry off the in-core queue.
223      */
224     if (entry->stream != 0)
225 	msg_panic("%s: file is open", myname);
226     if (which == QMGR_QUEUE_BUSY) {
227 	QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry);
228 	queue->busy_refcount--;
229     } else if (which == QMGR_QUEUE_TODO) {
230 	QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry);
231 	queue->todo_refcount--;
232     } else {
233 	msg_panic("%s: bad queue spec: %d", myname, which);
234     }
235 
236     /*
237      * Free the recipient list and decrease the in-core recipient count
238      * accordingly.
239      */
240     qmgr_recipient_count -= entry->rcpt_list.len;
241     recipient_list_free(&entry->rcpt_list);
242 
243     myfree((void *) entry);
244 
245     /*
246      * Maintain back-to-back delivery status.
247      */
248     if (which == QMGR_QUEUE_BUSY)
249 	queue->last_done = event_time();
250 
251     /*
252      * Suspend a rate-limited queue, so that mail trickles out.
253      */
254     if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) {
255 	if (queue->window > 1)
256 	    msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service",
257 		      myname, transport->name, queue->name, queue->window);
258 	if (QMGR_QUEUE_THROTTLED(queue))	/* XXX */
259 	    qmgr_queue_unthrottle(queue);
260 	if (QMGR_QUEUE_READY(queue))
261 	    qmgr_queue_suspend(queue, transport->rate_delay);
262     }
263 
264     /*
265      * When the in-core queue for this site is empty and when this site is
266      * not dead, discard the in-core queue. When this site is dead, but the
267      * number of in-core queues exceeds some threshold, get rid of this
268      * in-core queue anyway, in order to avoid running out of memory.
269      *
270      * See also: qmgr_entry_move_todo().
271      */
272     if (queue->todo.next == 0 && queue->busy.next == 0) {
273 	if (QMGR_QUEUE_THROTTLED(queue) && qmgr_queue_count > 2 * var_qmgr_rcpt_limit)
274 	    qmgr_queue_unthrottle(queue);
275 	if (QMGR_QUEUE_READY(queue))
276 	    qmgr_queue_done(queue);
277     }
278 
279     /*
280      * Update the in-core message reference count. When the in-core message
281      * structure has no more references, dispose of the message.
282      *
283      * When the in-core recipient count falls below a threshold, and this
284      * message has more recipients, read more recipients now. If we read more
285      * recipients as soon as the recipient count falls below the in-core
286      * recipient limit, we do not give other messages a chance until this
287      * message is delivered. That's good for mailing list deliveries, bad for
288      * one-to-one mail. If we wait until the in-core recipient count drops
289      * well below the in-core recipient limit, we give other mail a chance,
290      * but we also allow list deliveries to become interleaved. In the worst
291      * case, people near the start of a mailing list get a burst of postings
292      * today, while people near the end of the list get that same burst of
293      * postings a whole day later.
294      */
295 #define FUDGE(x)	((x) * (var_qmgr_fudge / 100.0))
296     message->refcount--;
297     if (message->rcpt_offset > 0
298 	&& qmgr_recipient_count < FUDGE(var_qmgr_rcpt_limit) - 100)
299 	qmgr_message_realloc(message);
300     if (message->refcount == 0)
301 	qmgr_active_done(message);
302 }
303 
304 /* qmgr_entry_create - create queue todo entry */
305 
qmgr_entry_create(QMGR_QUEUE * queue,QMGR_MESSAGE * message)306 QMGR_ENTRY *qmgr_entry_create(QMGR_QUEUE *queue, QMGR_MESSAGE *message)
307 {
308     QMGR_ENTRY *entry;
309 
310     /*
311      * Sanity check.
312      */
313     if (QMGR_QUEUE_THROTTLED(queue))
314 	msg_panic("qmgr_entry_create: dead queue: %s", queue->name);
315 
316     /*
317      * Create the delivery request.
318      */
319     entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY));
320     entry->stream = 0;
321     entry->message = message;
322     recipient_list_init(&entry->rcpt_list, RCPT_LIST_INIT_QUEUE);
323     message->refcount++;
324     entry->queue = queue;
325     QMGR_LIST_APPEND(queue->todo, entry);
326     queue->todo_refcount++;
327 
328     /*
329      * Warn if a destination is falling behind while the active queue
330      * contains a non-trivial amount of single-recipient email. When a
331      * destination takes up more and more space in the active queue, then
332      * other mail will not get through and delivery performance will suffer.
333      *
334      * XXX At this point in the code, the busy reference count is still less
335      * than the concurrency limit (otherwise this code would not be invoked
336      * in the first place) so we have to make some awkward adjustments
337      * below.
338      *
339      * XXX The queue length test below looks at the active queue share of an
340      * individual destination. This catches the case where mail for one
341      * destination is falling behind because it has to round-robin compete
342      * with many other destinations. However, Postfix will also perform
343      * poorly when most of the active queue is tied up by a small number of
344      * concurrency limited destinations. The queue length test below detects
345      * such conditions only indirectly.
346      *
347      * XXX This code does not detect the case that the active queue is being
348      * starved because incoming mail is pounding the disk.
349      */
350     if (var_helpful_warnings && var_qmgr_clog_warn_time > 0) {
351 	int     queue_length = queue->todo_refcount + queue->busy_refcount;
352 	time_t  now;
353 	QMGR_TRANSPORT *transport;
354 	double  active_share;
355 
356 	if (queue_length > var_qmgr_active_limit / 5
357 	    && (now = event_time()) >= queue->clog_time_to_warn) {
358 	    active_share = queue_length / (double) qmgr_message_count;
359 	    msg_warn("mail for %s is using up %d of %d active queue entries",
360 		     queue->nexthop, queue_length, qmgr_message_count);
361 	    if (active_share < 0.9)
362 		msg_warn("this may slow down other mail deliveries");
363 	    transport = queue->transport;
364 	    if (transport->dest_concurrency_limit > 0
365 	    && transport->dest_concurrency_limit <= queue->busy_refcount + 1)
366 		msg_warn("you may need to increase the main.cf %s%s from %d",
367 			 transport->name, _DEST_CON_LIMIT,
368 			 transport->dest_concurrency_limit);
369 	    else if (queue->window > var_qmgr_active_limit * active_share)
370 		msg_warn("you may need to increase the main.cf %s from %d",
371 			 VAR_QMGR_ACT_LIMIT, var_qmgr_active_limit);
372 	    else if (queue->peers.next != queue->peers.prev)
373 		msg_warn("you may need a separate master.cf transport for %s",
374 			 queue->nexthop);
375 	    else {
376 		msg_warn("you may need to reduce %s connect and helo timeouts",
377 			 transport->name);
378 		msg_warn("so that Postfix quickly skips unavailable hosts");
379 		msg_warn("you may need to increase the main.cf %s and %s",
380 			 VAR_MIN_BACKOFF_TIME, VAR_MAX_BACKOFF_TIME);
381 		msg_warn("so that Postfix wastes less time on undeliverable mail");
382 		msg_warn("you may need to increase the master.cf %s process limit",
383 			 transport->name);
384 	    }
385 	    msg_warn("please avoid flushing the whole queue when you have");
386 	    msg_warn("lots of deferred mail, that is bad for performance");
387 	    msg_warn("to turn off these warnings specify: %s = 0",
388 		     VAR_QMGR_CLOG_WARN_TIME);
389 	    queue->clog_time_to_warn = now + var_qmgr_clog_warn_time;
390 	}
391     }
392     return (entry);
393 }
394