1 /* $NetBSD: qmgr_transport.c,v 1.2 2017/02/14 01:16:46 christos Exp $ */
2
3 /*++
4 /* NAME
5 /* qmgr_transport 3
6 /* SUMMARY
7 /* per-transport data structures
8 /* SYNOPSIS
9 /* #include "qmgr.h"
10 /*
11 /* QMGR_TRANSPORT *qmgr_transport_create(name)
12 /* const char *name;
13 /*
14 /* QMGR_TRANSPORT *qmgr_transport_find(name)
15 /* const char *name;
16 /*
17 /* QMGR_TRANSPORT *qmgr_transport_select()
18 /*
19 /* void qmgr_transport_alloc(transport, notify)
20 /* QMGR_TRANSPORT *transport;
21 /* void (*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp);
22 /*
23 /* void qmgr_transport_throttle(transport, dsn)
24 /* QMGR_TRANSPORT *transport;
25 /* DSN *dsn;
26 /*
27 /* void qmgr_transport_unthrottle(transport)
28 /* QMGR_TRANSPORT *transport;
29 /* DESCRIPTION
30 /* This module organizes the world by message transport type.
31 /* Each transport can have zero or more destination queues
32 /* associated with it.
33 /*
34 /* qmgr_transport_create() instantiates a data structure for the
35 /* named transport type.
36 /*
37 /* qmgr_transport_find() looks up an existing message transport
38 /* data structure.
39 /*
40 /* qmgr_transport_select() attempts to find a transport that
41 /* has messages pending delivery. This routine implements
42 /* round-robin search among transports.
43 /*
44 /* qmgr_transport_alloc() allocates a delivery process for the
45 /* specified transport type. Allocation is performed asynchronously.
46 /* When a process becomes available, the application callback routine
47 /* is invoked with as arguments the transport and a stream that
48 /* is connected to a delivery process. It is an error to call
49 /* qmgr_transport_alloc() while delivery process allocation for
50 /* the same transport is in progress.
51 /*
52 /* qmgr_transport_throttle blocks further allocation of delivery
53 /* processes for the named transport. Attempts to throttle a
54 /* throttled transport are ignored.
55 /*
56 /* qmgr_transport_unthrottle() undoes qmgr_transport_throttle().
57 /* Attempts to unthrottle a non-throttled transport are ignored.
58 /* DIAGNOSTICS
59 /* Panic: consistency check failure. Fatal: out of memory.
60 /* LICENSE
61 /* .ad
62 /* .fi
63 /* The Secure Mailer license must be distributed with this software.
64 /* AUTHOR(S)
65 /* Wietse Venema
66 /* IBM T.J. Watson Research
67 /* P.O. Box 704
68 /* Yorktown Heights, NY 10598, USA
69 /*
70 /* Wietse Venema
71 /* Google, Inc.
72 /* 111 8th Avenue
73 /* New York, NY 10011, USA
74 /*--*/
75
76 /* System library. */
77
78 #include <sys_defs.h>
79 #include <unistd.h>
80
81 #include <sys/time.h> /* FD_SETSIZE */
82 #include <sys/types.h> /* FD_SETSIZE */
83 #include <unistd.h> /* FD_SETSIZE */
84
85 #ifdef USE_SYS_SELECT_H
86 #include <sys/select.h> /* FD_SETSIZE */
87 #endif
88
89 /* Utility library. */
90
91 #include <msg.h>
92 #include <htable.h>
93 #include <events.h>
94 #include <mymalloc.h>
95 #include <vstream.h>
96 #include <iostuff.h>
97
98 /* Global library. */
99
100 #include <mail_proto.h>
101 #include <recipient_list.h>
102 #include <mail_conf.h>
103 #include <mail_params.h>
104
105 /* Application-specific. */
106
107 #include "qmgr.h"
108
109 HTABLE *qmgr_transport_byname; /* transport by name */
110 QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */
111
112 /*
113 * A local structure to remember a delivery process allocation request.
114 */
115 typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC;
116
117 struct QMGR_TRANSPORT_ALLOC {
118 QMGR_TRANSPORT *transport; /* transport context */
119 VSTREAM *stream; /* delivery service stream */
120 QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */
121 };
122
123 /*
124 * Connections to delivery agents are managed asynchronously. Each delivery
125 * agent connection goes through multiple wait states:
126 *
127 * - With Linux/Solaris and old queue manager implementations only, wait for
128 * the server to invoke accept().
129 *
130 * - Wait for the delivery agent's announcement that it is ready to receive a
131 * delivery request.
132 *
133 * - Wait for the delivery request completion status.
134 *
135 * Older queue manager implementations had only one pending delivery agent
136 * connection per transport. With low-latency destinations, the output rates
137 * were reduced on Linux/Solaris systems that had the extra wait state.
138 *
139 * To maximize delivery agent output rates with low-latency destinations, the
140 * following changes were made to the queue manager by the end of the 2.4
141 * development cycle:
142 *
143 * - The Linux/Solaris accept() wait state was eliminated.
144 *
145 * - A pipeline was implemented for pending delivery agent connections. The
146 * number of pending delivery agent connections was increased from one to
147 * two: the number of before-delivery wait states, plus one extra pipeline
148 * slot to prevent the pipeline from stalling easily. Increasing the
149 * pipeline much further actually hurt performance.
150 *
151 * - To reduce queue manager disk competition with delivery agents, the queue
152 * scanning algorithm was modified to import only one message per interrupt.
153 * The incoming and deferred queue scans now happen on alternate interrupts.
154 *
155 * Simplistically reasoned, a non-zero (incoming + active) queue length is
156 * equivalent to a time shift for mail deliveries; this is undesirable when
157 * delivery agents are not fully utilized.
158 *
159 * On the other hand a non-empty active queue is what allows us to do clever
160 * things such as queue file prefetch, concurrency windows, and connection
161 * caching; the idea is that such "thinking time" is affordable only after
162 * the output channels are maxed out.
163 */
164 #ifndef QMGR_TRANSPORT_MAX_PEND
165 #define QMGR_TRANSPORT_MAX_PEND 2
166 #endif
167
168 /*
169 * Important note on the _transport_rate_delay implementation: after
170 * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all
171 * code paths must directly or indirectly invoke qmgr_transport_unthrottle()
172 * or qmgr_transport_throttle(). Otherwise, transports with non-zero
173 * _transport_rate_delay will become stuck.
174 */
175
176 /* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
177
qmgr_transport_unthrottle_wrapper(int unused_event,void * context)178 static void qmgr_transport_unthrottle_wrapper(int unused_event, void *context)
179 {
180 qmgr_transport_unthrottle((QMGR_TRANSPORT *) context);
181 }
182
183 /* qmgr_transport_unthrottle - open the throttle */
184
qmgr_transport_unthrottle(QMGR_TRANSPORT * transport)185 void qmgr_transport_unthrottle(QMGR_TRANSPORT *transport)
186 {
187 const char *myname = "qmgr_transport_unthrottle";
188
189 /*
190 * This routine runs after expiration of the timer set by
191 * qmgr_transport_throttle(), or whenever a delivery transport has been
192 * used without malfunction. In either case, we enable delivery again if
193 * the transport was throttled. We always reset the transport rate lock.
194 */
195 if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) {
196 if (msg_verbose)
197 msg_info("%s: transport %s", myname, transport->name);
198 transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD;
199 if (transport->dsn == 0)
200 msg_panic("%s: transport %s: null reason",
201 myname, transport->name);
202 dsn_free(transport->dsn);
203 transport->dsn = 0;
204 event_cancel_timer(qmgr_transport_unthrottle_wrapper,
205 (void *) transport);
206 }
207 if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK)
208 transport->flags &= ~QMGR_TRANSPORT_STAT_RATE_LOCK;
209 }
210
211 /* qmgr_transport_throttle - disable delivery process allocation */
212
qmgr_transport_throttle(QMGR_TRANSPORT * transport,DSN * dsn)213 void qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn)
214 {
215 const char *myname = "qmgr_transport_throttle";
216
217 /*
218 * We are unable to connect to a deliver process for this type of message
219 * transport. Instead of hosing the system by retrying in a tight loop,
220 * back off and disable this transport type for a while.
221 */
222 if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) {
223 if (msg_verbose)
224 msg_info("%s: transport %s: status: %s reason: %s",
225 myname, transport->name, dsn->status, dsn->reason);
226 transport->flags |= QMGR_TRANSPORT_STAT_DEAD;
227 if (transport->dsn)
228 msg_panic("%s: transport %s: spurious reason: %s",
229 myname, transport->name, transport->dsn->reason);
230 transport->dsn = DSN_COPY(dsn);
231 event_request_timer(qmgr_transport_unthrottle_wrapper,
232 (void *) transport, var_transport_retry_time);
233 }
234 }
235
236 /* qmgr_transport_abort - transport connect watchdog */
237
qmgr_transport_abort(int unused_event,void * context)238 static void qmgr_transport_abort(int unused_event, void *context)
239 {
240 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
241
242 msg_fatal("timeout connecting to transport: %s", alloc->transport->name);
243 }
244
245 /* qmgr_transport_rate_event - delivery process availability notice */
246
qmgr_transport_rate_event(int unused_event,void * context)247 static void qmgr_transport_rate_event(int unused_event, void *context)
248 {
249 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
250
251 alloc->notify(alloc->transport, alloc->stream);
252 myfree((void *) alloc);
253 }
254
255 /* qmgr_transport_event - delivery process availability notice */
256
qmgr_transport_event(int unused_event,void * context)257 static void qmgr_transport_event(int unused_event, void *context)
258 {
259 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
260
261 /*
262 * This routine notifies the application when the request given to
263 * qmgr_transport_alloc() completes.
264 */
265 if (msg_verbose)
266 msg_info("transport_event: %s", alloc->transport->name);
267
268 /*
269 * Connection request completed. Stop the watchdog timer.
270 */
271 event_cancel_timer(qmgr_transport_abort, context);
272
273 /*
274 * Disable further read events that end up calling this function, and
275 * free up this pending connection pipeline slot.
276 */
277 if (alloc->stream) {
278 event_disable_readwrite(vstream_fileno(alloc->stream));
279 non_blocking(vstream_fileno(alloc->stream), BLOCKING);
280 }
281 alloc->transport->pending -= 1;
282
283 /*
284 * Notify the requestor.
285 */
286 if (alloc->transport->xport_rate_delay > 0) {
287 if ((alloc->transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) == 0)
288 msg_panic("transport_event: missing rate lock for transport %s",
289 alloc->transport->name);
290 event_request_timer(qmgr_transport_rate_event, (void *) alloc,
291 alloc->transport->xport_rate_delay);
292 } else {
293 alloc->notify(alloc->transport, alloc->stream);
294 myfree((void *) alloc);
295 }
296 }
297
298 /* qmgr_transport_select - select transport for allocation */
299
qmgr_transport_select(void)300 QMGR_TRANSPORT *qmgr_transport_select(void)
301 {
302 QMGR_TRANSPORT *xport;
303 QMGR_QUEUE *queue;
304 int need;
305
306 /*
307 * If we find a suitable transport, rotate the list of transports to
308 * effectuate round-robin selection. See similar selection code in
309 * qmgr_queue_select().
310 *
311 * This function is called repeatedly until all transports have maxed out
312 * the number of pending delivery agent connections, until all delivery
313 * agent concurrency windows are maxed out, or until we run out of "todo"
314 * queue entries.
315 */
316 #define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
317
318 for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
319 if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
320 || (xport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) != 0
321 || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
322 continue;
323 need = xport->pending + 1;
324 for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
325 if (QMGR_QUEUE_READY(queue) == 0)
326 continue;
327 if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
328 queue->todo_refcount)) <= 0) {
329 QMGR_LIST_ROTATE(qmgr_transport_list, xport);
330 if (msg_verbose)
331 msg_info("qmgr_transport_select: %s", xport->name);
332 return (xport);
333 }
334 }
335 }
336 return (0);
337 }
338
339 /* qmgr_transport_alloc - allocate delivery process */
340
qmgr_transport_alloc(QMGR_TRANSPORT * transport,QMGR_TRANSPORT_ALLOC_NOTIFY notify)341 void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
342 {
343 QMGR_TRANSPORT_ALLOC *alloc;
344
345 /*
346 * Sanity checks.
347 */
348 if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
349 msg_panic("qmgr_transport: dead transport: %s", transport->name);
350 if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK)
351 msg_panic("qmgr_transport: rate-locked transport: %s", transport->name);
352 if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
353 msg_panic("qmgr_transport: excess allocation: %s", transport->name);
354
355 /*
356 * When this message delivery transport is rate-limited, do not select it
357 * again before the end of a message delivery transaction.
358 */
359 if (transport->xport_rate_delay > 0)
360 transport->flags |= QMGR_TRANSPORT_STAT_RATE_LOCK;
361
362 /*
363 * Connect to the well-known port for this delivery service, and wake up
364 * when a process announces its availability. Allow only a limited number
365 * of delivery process allocation attempts for this transport. In case of
366 * problems, back off. Do not hose the system when it is in trouble
367 * already.
368 *
369 * Use non-blocking connect(), so that Linux won't block the queue manager
370 * until the delivery agent calls accept().
371 *
372 * When the connection to delivery agent cannot be completed, notify the
373 * event handler so that it can throttle the transport and defer the todo
374 * queues, just like it does when communication fails *after* connection
375 * completion.
376 *
377 * Before Postfix 2.4, the event handler was not invoked after connect()
378 * error, and mail was not deferred. Because of this, mail would be stuck
379 * in the active queue after triggering a "connection refused" condition.
380 */
381 alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
382 alloc->transport = transport;
383 alloc->notify = notify;
384 transport->pending += 1;
385 if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
386 NON_BLOCKING)) == 0) {
387 msg_warn("connect to transport %s/%s: %m",
388 MAIL_CLASS_PRIVATE, transport->name);
389 event_request_timer(qmgr_transport_event, (void *) alloc, 0);
390 return;
391 }
392 #if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(CA_VSTREAM_CTL_DUPFD)
393 #ifndef THRESHOLD_FD_WORKAROUND
394 #define THRESHOLD_FD_WORKAROUND 128
395 #endif
396 vstream_control(alloc->stream,
397 CA_VSTREAM_CTL_DUPFD(THRESHOLD_FD_WORKAROUND),
398 CA_VSTREAM_CTL_END);
399 #endif
400 event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
401 (void *) alloc);
402
403 /*
404 * Guard against broken systems.
405 */
406 event_request_timer(qmgr_transport_abort, (void *) alloc,
407 var_daemon_timeout);
408 }
409
410 /* qmgr_transport_create - create transport instance */
411
qmgr_transport_create(const char * name)412 QMGR_TRANSPORT *qmgr_transport_create(const char *name)
413 {
414 QMGR_TRANSPORT *transport;
415
416 if (htable_find(qmgr_transport_byname, name) != 0)
417 msg_panic("qmgr_transport_create: transport exists: %s", name);
418 transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
419 transport->flags = 0;
420 transport->pending = 0;
421 transport->name = mystrdup(name);
422
423 /*
424 * Use global configuration settings or transport-specific settings.
425 */
426 transport->dest_concurrency_limit =
427 get_mail_conf_int2(name, _DEST_CON_LIMIT,
428 var_dest_con_limit, 0, 0);
429 transport->recipient_limit =
430 get_mail_conf_int2(name, _DEST_RCPT_LIMIT,
431 var_dest_rcpt_limit, 0, 0);
432 transport->init_dest_concurrency =
433 get_mail_conf_int2(name, _INIT_DEST_CON,
434 var_init_dest_concurrency, 1, 0);
435 transport->xport_rate_delay = get_mail_conf_time2(name, _XPORT_RATE_DELAY,
436 var_xport_rate_delay,
437 's', 0, 0);
438 transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
439 var_dest_rate_delay,
440 's', 0, 0);
441
442 if (transport->rate_delay > 0)
443 transport->dest_concurrency_limit = 1;
444 if (transport->dest_concurrency_limit != 0
445 && transport->dest_concurrency_limit < transport->init_dest_concurrency)
446 transport->init_dest_concurrency = transport->dest_concurrency_limit;
447
448 transport->queue_byname = htable_create(0);
449 QMGR_LIST_INIT(transport->queue_list);
450 transport->dsn = 0;
451 qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK,
452 VAR_CONC_POS_FDBACK, var_conc_pos_feedback);
453 qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK,
454 VAR_CONC_NEG_FDBACK, var_conc_neg_feedback);
455 transport->fail_cohort_limit =
456 get_mail_conf_int2(name, _CONC_COHORT_LIM,
457 var_conc_cohort_limit, 0, 0);
458 if (qmgr_transport_byname == 0)
459 qmgr_transport_byname = htable_create(10);
460 htable_enter(qmgr_transport_byname, name, (void *) transport);
461 QMGR_LIST_APPEND(qmgr_transport_list, transport);
462 if (msg_verbose)
463 msg_info("qmgr_transport_create: %s concurrency %d recipients %d",
464 transport->name, transport->dest_concurrency_limit,
465 transport->recipient_limit);
466 return (transport);
467 }
468
469 /* qmgr_transport_find - find transport instance */
470
qmgr_transport_find(const char * name)471 QMGR_TRANSPORT *qmgr_transport_find(const char *name)
472 {
473 return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name));
474 }
475