xref: /netbsd-src/external/ibm-public/postfix/dist/src/qmgr/qmgr_transport.c (revision e89934bbf778a6d6d6894877c4da59d0c7835b0f)
1 /*	$NetBSD: qmgr_transport.c,v 1.2 2017/02/14 01:16:47 christos Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_transport 3
6 /* SUMMARY
7 /*	per-transport data structures
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	QMGR_TRANSPORT *qmgr_transport_create(name)
12 /*	const char *name;
13 /*
14 /*	QMGR_TRANSPORT *qmgr_transport_find(name)
15 /*	const char *name;
16 /*
17 /*	QMGR_TRANSPORT *qmgr_transport_select()
18 /*
19 /*	void	qmgr_transport_alloc(transport, notify)
20 /*	QMGR_TRANSPORT *transport;
21 /*	void	(*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp);
22 /*
23 /*	void	qmgr_transport_throttle(transport, dsn)
24 /*	QMGR_TRANSPORT *transport;
25 /*	DSN	*dsn;
26 /*
27 /*	void	qmgr_transport_unthrottle(transport)
28 /*	QMGR_TRANSPORT *transport;
29 /* DESCRIPTION
30 /*	This module organizes the world by message transport type.
31 /*	Each transport can have zero or more destination queues
32 /*	associated with it.
33 /*
34 /*	qmgr_transport_create() instantiates a data structure for the
35 /*	named transport type.
36 /*
37 /*	qmgr_transport_find() looks up an existing message transport
38 /*	data structure.
39 /*
40 /*	qmgr_transport_select() attempts to find a transport that
41 /*	has messages pending delivery.  This routine implements
42 /*	round-robin search among transports.
43 /*
44 /*	qmgr_transport_alloc() allocates a delivery process for the
45 /*	specified transport type. Allocation is performed asynchronously.
46 /*	When a process becomes available, the application callback routine
47 /*	is invoked with as arguments the transport and a stream that
48 /*	is connected to a delivery process. It is an error to call
49 /*	qmgr_transport_alloc() while delivery process allocation for
50 /*	the same transport is in progress.
51 /*
52 /*	qmgr_transport_throttle blocks further allocation of delivery
53 /*	processes for the named transport. Attempts to throttle a
54 /*	throttled transport are ignored.
55 /*
56 /*	qmgr_transport_unthrottle() undoes qmgr_transport_throttle().
57 /*	Attempts to unthrottle a non-throttled transport are ignored.
58 /* DIAGNOSTICS
59 /*	Panic: consistency check failure. Fatal: out of memory.
60 /* LICENSE
61 /* .ad
62 /* .fi
63 /*	The Secure Mailer license must be distributed with this software.
64 /* AUTHOR(S)
65 /*	Wietse Venema
66 /*	IBM T.J. Watson Research
67 /*	P.O. Box 704
68 /*	Yorktown Heights, NY 10598, USA
69 /*
70 /*	Preemptive scheduler enhancements:
71 /*	Patrik Rak
72 /*	Modra 6
73 /*	155 00, Prague, Czech Republic
74 /*
75 /*	Wietse Venema
76 /*	Google, Inc.
77 /*	111 8th Avenue
78 /*	New York, NY 10011, USA
79 /*--*/
80 
81 /* System library. */
82 
83 #include <sys_defs.h>
84 #include <unistd.h>
85 
86 #include <sys/time.h>			/* FD_SETSIZE */
87 #include <sys/types.h>			/* FD_SETSIZE */
88 #include <unistd.h>			/* FD_SETSIZE */
89 
90 #ifdef USE_SYS_SELECT_H
91 #include <sys/select.h>			/* FD_SETSIZE */
92 #endif
93 
94 /* Utility library. */
95 
96 #include <msg.h>
97 #include <htable.h>
98 #include <events.h>
99 #include <mymalloc.h>
100 #include <vstream.h>
101 #include <iostuff.h>
102 
103 /* Global library. */
104 
105 #include <mail_proto.h>
106 #include <recipient_list.h>
107 #include <mail_conf.h>
108 #include <mail_params.h>
109 
110 /* Application-specific. */
111 
112 #include "qmgr.h"
113 
114 HTABLE *qmgr_transport_byname;		/* transport by name */
115 QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */
116 
117  /*
118   * A local structure to remember a delivery process allocation request.
119   */
120 typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC;
121 
122 struct QMGR_TRANSPORT_ALLOC {
123     QMGR_TRANSPORT *transport;		/* transport context */
124     VSTREAM *stream;			/* delivery service stream */
125     QMGR_TRANSPORT_ALLOC_NOTIFY notify;	/* application call-back routine */
126 };
127 
128  /*
129   * Connections to delivery agents are managed asynchronously. Each delivery
130   * agent connection goes through multiple wait states:
131   *
132   * - With Linux/Solaris and old queue manager implementations only, wait for
133   * the server to invoke accept().
134   *
135   * - Wait for the delivery agent's announcement that it is ready to receive a
136   * delivery request.
137   *
138   * - Wait for the delivery request completion status.
139   *
140   * Older queue manager implementations had only one pending delivery agent
141   * connection per transport. With low-latency destinations, the output rates
142   * were reduced on Linux/Solaris systems that had the extra wait state.
143   *
144   * To maximize delivery agent output rates with low-latency destinations, the
145   * following changes were made to the queue manager by the end of the 2.4
146   * development cycle:
147   *
148   * - The Linux/Solaris accept() wait state was eliminated.
149   *
150   * - A pipeline was implemented for pending delivery agent connections. The
151   * number of pending delivery agent connections was increased from one to
152   * two: the number of before-delivery wait states, plus one extra pipeline
153   * slot to prevent the pipeline from stalling easily. Increasing the
154   * pipeline much further actually hurt performance.
155   *
156   * - To reduce queue manager disk competition with delivery agents, the queue
157   * scanning algorithm was modified to import only one message per interrupt.
158   * The incoming and deferred queue scans now happen on alternate interrupts.
159   *
160   * Simplistically reasoned, a non-zero (incoming + active) queue length is
161   * equivalent to a time shift for mail deliveries; this is undesirable when
162   * delivery agents are not fully utilized.
163   *
164   * On the other hand a non-empty active queue is what allows us to do clever
165   * things such as queue file prefetch, concurrency windows, and connection
166   * caching; the idea is that such "thinking time" is affordable only after
167   * the output channels are maxed out.
168   */
169 #ifndef QMGR_TRANSPORT_MAX_PEND
170 #define QMGR_TRANSPORT_MAX_PEND	2
171 #endif
172 
173  /*
174   * Important note on the _transport_rate_delay implementation: after
175   * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all
176   * code paths must directly or indirectly invoke qmgr_transport_unthrottle()
177   * or qmgr_transport_throttle(). Otherwise, transports with non-zero
178   * _transport_rate_delay will become stuck.
179   */
180 
181 /* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
182 
qmgr_transport_unthrottle_wrapper(int unused_event,void * context)183 static void qmgr_transport_unthrottle_wrapper(int unused_event, void *context)
184 {
185     qmgr_transport_unthrottle((QMGR_TRANSPORT *) context);
186 }
187 
188 /* qmgr_transport_unthrottle - open the throttle */
189 
qmgr_transport_unthrottle(QMGR_TRANSPORT * transport)190 void    qmgr_transport_unthrottle(QMGR_TRANSPORT *transport)
191 {
192     const char *myname = "qmgr_transport_unthrottle";
193 
194     /*
195      * This routine runs after expiration of the timer set by
196      * qmgr_transport_throttle(), or whenever a delivery transport has been
197      * used without malfunction. In either case, we enable delivery again if
198      * the transport was throttled. We always reset the transport rate lock.
199      */
200     if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) {
201 	if (msg_verbose)
202 	    msg_info("%s: transport %s", myname, transport->name);
203 	transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD;
204 	if (transport->dsn == 0)
205 	    msg_panic("%s: transport %s: null reason",
206 		      myname, transport->name);
207 	dsn_free(transport->dsn);
208 	transport->dsn = 0;
209 	event_cancel_timer(qmgr_transport_unthrottle_wrapper,
210 			   (void *) transport);
211     }
212     if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK)
213 	transport->flags &= ~QMGR_TRANSPORT_STAT_RATE_LOCK;
214 }
215 
216 /* qmgr_transport_throttle - disable delivery process allocation */
217 
qmgr_transport_throttle(QMGR_TRANSPORT * transport,DSN * dsn)218 void    qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn)
219 {
220     const char *myname = "qmgr_transport_throttle";
221 
222     /*
223      * We are unable to connect to a deliver process for this type of message
224      * transport. Instead of hosing the system by retrying in a tight loop,
225      * back off and disable this transport type for a while.
226      */
227     if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) {
228 	if (msg_verbose)
229 	    msg_info("%s: transport %s: status: %s reason: %s",
230 		     myname, transport->name, dsn->status, dsn->reason);
231 	transport->flags |= QMGR_TRANSPORT_STAT_DEAD;
232 	if (transport->dsn)
233 	    msg_panic("%s: transport %s: spurious reason: %s",
234 		      myname, transport->name, transport->dsn->reason);
235 	transport->dsn = DSN_COPY(dsn);
236 	event_request_timer(qmgr_transport_unthrottle_wrapper,
237 			    (void *) transport, var_transport_retry_time);
238     }
239 }
240 
241 /* qmgr_transport_abort - transport connect watchdog */
242 
qmgr_transport_abort(int unused_event,void * context)243 static void qmgr_transport_abort(int unused_event, void *context)
244 {
245     QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
246 
247     msg_fatal("timeout connecting to transport: %s", alloc->transport->name);
248 }
249 
250 /* qmgr_transport_rate_event - delivery process availability notice */
251 
qmgr_transport_rate_event(int unused_event,void * context)252 static void qmgr_transport_rate_event(int unused_event, void *context)
253 {
254     QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
255 
256     alloc->notify(alloc->transport, alloc->stream);
257     myfree((void *) alloc);
258 }
259 
260 /* qmgr_transport_event - delivery process availability notice */
261 
qmgr_transport_event(int unused_event,void * context)262 static void qmgr_transport_event(int unused_event, void *context)
263 {
264     QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
265 
266     /*
267      * This routine notifies the application when the request given to
268      * qmgr_transport_alloc() completes.
269      */
270     if (msg_verbose)
271 	msg_info("transport_event: %s", alloc->transport->name);
272 
273     /*
274      * Connection request completed. Stop the watchdog timer.
275      */
276     event_cancel_timer(qmgr_transport_abort, context);
277 
278     /*
279      * Disable further read events that end up calling this function, and
280      * free up this pending connection pipeline slot.
281      */
282     if (alloc->stream) {
283 	event_disable_readwrite(vstream_fileno(alloc->stream));
284 	non_blocking(vstream_fileno(alloc->stream), BLOCKING);
285     }
286     alloc->transport->pending -= 1;
287 
288     /*
289      * Notify the requestor.
290      */
291     if (alloc->transport->xport_rate_delay > 0) {
292 	if ((alloc->transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) == 0)
293 	    msg_panic("transport_event: missing rate lock for transport %s",
294 		      alloc->transport->name);
295 	event_request_timer(qmgr_transport_rate_event, (void *) alloc,
296 			    alloc->transport->xport_rate_delay);
297     } else {
298 	alloc->notify(alloc->transport, alloc->stream);
299 	myfree((void *) alloc);
300     }
301 }
302 
303 /* qmgr_transport_select - select transport for allocation */
304 
qmgr_transport_select(void)305 QMGR_TRANSPORT *qmgr_transport_select(void)
306 {
307     QMGR_TRANSPORT *xport;
308     QMGR_QUEUE *queue;
309     int     need;
310 
311     /*
312      * If we find a suitable transport, rotate the list of transports to
313      * effectuate round-robin selection. See similar selection code in
314      * qmgr_peer_select().
315      *
316      * This function is called repeatedly until all transports have maxed out
317      * the number of pending delivery agent connections, until all delivery
318      * agent concurrency windows are maxed out, or until we run out of "todo"
319      * queue entries.
320      */
321 #define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
322 
323     for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
324 	if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
325 	    || (xport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) != 0
326 	    || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
327 	    continue;
328 	need = xport->pending + 1;
329 	for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
330 	    if (QMGR_QUEUE_READY(queue) == 0)
331 		continue;
332 	    if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
333 					  queue->todo_refcount)) <= 0) {
334 		QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers);
335 		if (msg_verbose)
336 		    msg_info("qmgr_transport_select: %s", xport->name);
337 		return (xport);
338 	    }
339 	}
340     }
341     return (0);
342 }
343 
344 /* qmgr_transport_alloc - allocate delivery process */
345 
qmgr_transport_alloc(QMGR_TRANSPORT * transport,QMGR_TRANSPORT_ALLOC_NOTIFY notify)346 void    qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
347 {
348     QMGR_TRANSPORT_ALLOC *alloc;
349 
350     /*
351      * Sanity checks.
352      */
353     if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
354 	msg_panic("qmgr_transport: dead transport: %s", transport->name);
355     if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK)
356 	msg_panic("qmgr_transport: rate-locked transport: %s", transport->name);
357     if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
358 	msg_panic("qmgr_transport: excess allocation: %s", transport->name);
359 
360     /*
361      * When this message delivery transport is rate-limited, do not select it
362      * again before the end of a message delivery transaction.
363      */
364     if (transport->xport_rate_delay > 0)
365 	transport->flags |= QMGR_TRANSPORT_STAT_RATE_LOCK;
366 
367     /*
368      * Connect to the well-known port for this delivery service, and wake up
369      * when a process announces its availability. Allow only a limited number
370      * of delivery process allocation attempts for this transport. In case of
371      * problems, back off. Do not hose the system when it is in trouble
372      * already.
373      *
374      * Use non-blocking connect(), so that Linux won't block the queue manager
375      * until the delivery agent calls accept().
376      *
377      * When the connection to delivery agent cannot be completed, notify the
378      * event handler so that it can throttle the transport and defer the todo
379      * queues, just like it does when communication fails *after* connection
380      * completion.
381      *
382      * Before Postfix 2.4, the event handler was not invoked after connect()
383      * error, and mail was not deferred. Because of this, mail would be stuck
384      * in the active queue after triggering a "connection refused" condition.
385      */
386     alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
387     alloc->transport = transport;
388     alloc->notify = notify;
389     transport->pending += 1;
390     if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
391 				      NON_BLOCKING)) == 0) {
392 	msg_warn("connect to transport %s/%s: %m",
393 		 MAIL_CLASS_PRIVATE, transport->name);
394 	event_request_timer(qmgr_transport_event, (void *) alloc, 0);
395 	return;
396     }
397 #if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(CA_VSTREAM_CTL_DUPFD)
398 #ifndef THRESHOLD_FD_WORKAROUND
399 #define THRESHOLD_FD_WORKAROUND 128
400 #endif
401     vstream_control(alloc->stream,
402 		    CA_VSTREAM_CTL_DUPFD(THRESHOLD_FD_WORKAROUND),
403 		    CA_VSTREAM_CTL_END);
404 #endif
405     event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
406 		      (void *) alloc);
407 
408     /*
409      * Guard against broken systems.
410      */
411     event_request_timer(qmgr_transport_abort, (void *) alloc,
412 			var_daemon_timeout);
413 }
414 
415 /* qmgr_transport_create - create transport instance */
416 
qmgr_transport_create(const char * name)417 QMGR_TRANSPORT *qmgr_transport_create(const char *name)
418 {
419     QMGR_TRANSPORT *transport;
420 
421     if (htable_find(qmgr_transport_byname, name) != 0)
422 	msg_panic("qmgr_transport_create: transport exists: %s", name);
423     transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
424     transport->flags = 0;
425     transport->pending = 0;
426     transport->name = mystrdup(name);
427 
428     /*
429      * Use global configuration settings or transport-specific settings.
430      */
431     transport->dest_concurrency_limit =
432 	get_mail_conf_int2(name, _DEST_CON_LIMIT,
433 			   var_dest_con_limit, 0, 0);
434     transport->recipient_limit =
435 	get_mail_conf_int2(name, _DEST_RCPT_LIMIT,
436 			   var_dest_rcpt_limit, 0, 0);
437     transport->init_dest_concurrency =
438 	get_mail_conf_int2(name, _INIT_DEST_CON,
439 			   var_init_dest_concurrency, 1, 0);
440     transport->xport_rate_delay = get_mail_conf_time2(name, _XPORT_RATE_DELAY,
441 						      var_xport_rate_delay,
442 						      's', 0, 0);
443     transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
444 						var_dest_rate_delay,
445 						's', 0, 0);
446 
447     if (transport->rate_delay > 0)
448 	transport->dest_concurrency_limit = 1;
449     if (transport->dest_concurrency_limit != 0
450     && transport->dest_concurrency_limit < transport->init_dest_concurrency)
451 	transport->init_dest_concurrency = transport->dest_concurrency_limit;
452 
453     transport->slot_cost = get_mail_conf_int2(name, _DELIVERY_SLOT_COST,
454 					      var_delivery_slot_cost, 0, 0);
455     transport->slot_loan = get_mail_conf_int2(name, _DELIVERY_SLOT_LOAN,
456 					      var_delivery_slot_loan, 0, 0);
457     transport->slot_loan_factor =
458 	100 - get_mail_conf_int2(name, _DELIVERY_SLOT_DISCOUNT,
459 				 var_delivery_slot_discount, 0, 100);
460     transport->min_slots = get_mail_conf_int2(name, _MIN_DELIVERY_SLOTS,
461 					      var_min_delivery_slots, 0, 0);
462     transport->rcpt_unused = get_mail_conf_int2(name, _XPORT_RCPT_LIMIT,
463 						var_xport_rcpt_limit, 0, 0);
464     transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT,
465 						var_stack_rcpt_limit, 0, 0);
466     transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT,
467 					      var_xport_refill_limit, 1, 0);
468     transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY,
469 					 var_xport_refill_delay, 's', 1, 0);
470 
471     transport->queue_byname = htable_create(0);
472     QMGR_LIST_INIT(transport->queue_list);
473     transport->job_byname = htable_create(0);
474     QMGR_LIST_INIT(transport->job_list);
475     QMGR_LIST_INIT(transport->job_bytime);
476     transport->job_current = 0;
477     transport->job_next_unread = 0;
478     transport->candidate_cache = 0;
479     transport->candidate_cache_current = 0;
480     transport->candidate_cache_time = (time_t) 0;
481     transport->blocker_tag = 1;
482     transport->dsn = 0;
483     qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK,
484 		       VAR_CONC_POS_FDBACK, var_conc_pos_feedback);
485     qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK,
486 		       VAR_CONC_NEG_FDBACK, var_conc_neg_feedback);
487     transport->fail_cohort_limit =
488 	get_mail_conf_int2(name, _CONC_COHORT_LIM,
489 			   var_conc_cohort_limit, 0, 0);
490     if (qmgr_transport_byname == 0)
491 	qmgr_transport_byname = htable_create(10);
492     htable_enter(qmgr_transport_byname, name, (void *) transport);
493     QMGR_LIST_PREPEND(qmgr_transport_list, transport, peers);
494     if (msg_verbose)
495 	msg_info("qmgr_transport_create: %s concurrency %d recipients %d",
496 		 transport->name, transport->dest_concurrency_limit,
497 		 transport->recipient_limit);
498     return (transport);
499 }
500 
501 /* qmgr_transport_find - find transport instance */
502 
qmgr_transport_find(const char * name)503 QMGR_TRANSPORT *qmgr_transport_find(const char *name)
504 {
505     return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name));
506 }
507