1 /* $NetBSD: qmgr_transport.c,v 1.2 2017/02/14 01:16:47 christos Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmgr_transport 3 6 /* SUMMARY 7 /* per-transport data structures 8 /* SYNOPSIS 9 /* #include "qmgr.h" 10 /* 11 /* QMGR_TRANSPORT *qmgr_transport_create(name) 12 /* const char *name; 13 /* 14 /* QMGR_TRANSPORT *qmgr_transport_find(name) 15 /* const char *name; 16 /* 17 /* QMGR_TRANSPORT *qmgr_transport_select() 18 /* 19 /* void qmgr_transport_alloc(transport, notify) 20 /* QMGR_TRANSPORT *transport; 21 /* void (*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp); 22 /* 23 /* void qmgr_transport_throttle(transport, dsn) 24 /* QMGR_TRANSPORT *transport; 25 /* DSN *dsn; 26 /* 27 /* void qmgr_transport_unthrottle(transport) 28 /* QMGR_TRANSPORT *transport; 29 /* DESCRIPTION 30 /* This module organizes the world by message transport type. 31 /* Each transport can have zero or more destination queues 32 /* associated with it. 33 /* 34 /* qmgr_transport_create() instantiates a data structure for the 35 /* named transport type. 36 /* 37 /* qmgr_transport_find() looks up an existing message transport 38 /* data structure. 39 /* 40 /* qmgr_transport_select() attempts to find a transport that 41 /* has messages pending delivery. This routine implements 42 /* round-robin search among transports. 43 /* 44 /* qmgr_transport_alloc() allocates a delivery process for the 45 /* specified transport type. Allocation is performed asynchronously. 46 /* When a process becomes available, the application callback routine 47 /* is invoked with as arguments the transport and a stream that 48 /* is connected to a delivery process. It is an error to call 49 /* qmgr_transport_alloc() while delivery process allocation for 50 /* the same transport is in progress. 51 /* 52 /* qmgr_transport_throttle blocks further allocation of delivery 53 /* processes for the named transport. Attempts to throttle a 54 /* throttled transport are ignored. 55 /* 56 /* qmgr_transport_unthrottle() undoes qmgr_transport_throttle(). 57 /* Attempts to unthrottle a non-throttled transport are ignored. 58 /* DIAGNOSTICS 59 /* Panic: consistency check failure. Fatal: out of memory. 60 /* LICENSE 61 /* .ad 62 /* .fi 63 /* The Secure Mailer license must be distributed with this software. 64 /* AUTHOR(S) 65 /* Wietse Venema 66 /* IBM T.J. Watson Research 67 /* P.O. Box 704 68 /* Yorktown Heights, NY 10598, USA 69 /* 70 /* Preemptive scheduler enhancements: 71 /* Patrik Rak 72 /* Modra 6 73 /* 155 00, Prague, Czech Republic 74 /* 75 /* Wietse Venema 76 /* Google, Inc. 77 /* 111 8th Avenue 78 /* New York, NY 10011, USA 79 /*--*/ 80 81 /* System library. */ 82 83 #include <sys_defs.h> 84 #include <unistd.h> 85 86 #include <sys/time.h> /* FD_SETSIZE */ 87 #include <sys/types.h> /* FD_SETSIZE */ 88 #include <unistd.h> /* FD_SETSIZE */ 89 90 #ifdef USE_SYS_SELECT_H 91 #include <sys/select.h> /* FD_SETSIZE */ 92 #endif 93 94 /* Utility library. */ 95 96 #include <msg.h> 97 #include <htable.h> 98 #include <events.h> 99 #include <mymalloc.h> 100 #include <vstream.h> 101 #include <iostuff.h> 102 103 /* Global library. */ 104 105 #include <mail_proto.h> 106 #include <recipient_list.h> 107 #include <mail_conf.h> 108 #include <mail_params.h> 109 110 /* Application-specific. */ 111 112 #include "qmgr.h" 113 114 HTABLE *qmgr_transport_byname; /* transport by name */ 115 QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */ 116 117 /* 118 * A local structure to remember a delivery process allocation request. 119 */ 120 typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC; 121 122 struct QMGR_TRANSPORT_ALLOC { 123 QMGR_TRANSPORT *transport; /* transport context */ 124 VSTREAM *stream; /* delivery service stream */ 125 QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */ 126 }; 127 128 /* 129 * Connections to delivery agents are managed asynchronously. Each delivery 130 * agent connection goes through multiple wait states: 131 * 132 * - With Linux/Solaris and old queue manager implementations only, wait for 133 * the server to invoke accept(). 134 * 135 * - Wait for the delivery agent's announcement that it is ready to receive a 136 * delivery request. 137 * 138 * - Wait for the delivery request completion status. 139 * 140 * Older queue manager implementations had only one pending delivery agent 141 * connection per transport. With low-latency destinations, the output rates 142 * were reduced on Linux/Solaris systems that had the extra wait state. 143 * 144 * To maximize delivery agent output rates with low-latency destinations, the 145 * following changes were made to the queue manager by the end of the 2.4 146 * development cycle: 147 * 148 * - The Linux/Solaris accept() wait state was eliminated. 149 * 150 * - A pipeline was implemented for pending delivery agent connections. The 151 * number of pending delivery agent connections was increased from one to 152 * two: the number of before-delivery wait states, plus one extra pipeline 153 * slot to prevent the pipeline from stalling easily. Increasing the 154 * pipeline much further actually hurt performance. 155 * 156 * - To reduce queue manager disk competition with delivery agents, the queue 157 * scanning algorithm was modified to import only one message per interrupt. 158 * The incoming and deferred queue scans now happen on alternate interrupts. 159 * 160 * Simplistically reasoned, a non-zero (incoming + active) queue length is 161 * equivalent to a time shift for mail deliveries; this is undesirable when 162 * delivery agents are not fully utilized. 163 * 164 * On the other hand a non-empty active queue is what allows us to do clever 165 * things such as queue file prefetch, concurrency windows, and connection 166 * caching; the idea is that such "thinking time" is affordable only after 167 * the output channels are maxed out. 168 */ 169 #ifndef QMGR_TRANSPORT_MAX_PEND 170 #define QMGR_TRANSPORT_MAX_PEND 2 171 #endif 172 173 /* 174 * Important note on the _transport_rate_delay implementation: after 175 * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all 176 * code paths must directly or indirectly invoke qmgr_transport_unthrottle() 177 * or qmgr_transport_throttle(). Otherwise, transports with non-zero 178 * _transport_rate_delay will become stuck. 179 */ 180 181 /* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */ 182 183 static void qmgr_transport_unthrottle_wrapper(int unused_event, void *context) 184 { 185 qmgr_transport_unthrottle((QMGR_TRANSPORT *) context); 186 } 187 188 /* qmgr_transport_unthrottle - open the throttle */ 189 190 void qmgr_transport_unthrottle(QMGR_TRANSPORT *transport) 191 { 192 const char *myname = "qmgr_transport_unthrottle"; 193 194 /* 195 * This routine runs after expiration of the timer set by 196 * qmgr_transport_throttle(), or whenever a delivery transport has been 197 * used without malfunction. In either case, we enable delivery again if 198 * the transport was throttled. We always reset the transport rate lock. 199 */ 200 if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) { 201 if (msg_verbose) 202 msg_info("%s: transport %s", myname, transport->name); 203 transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD; 204 if (transport->dsn == 0) 205 msg_panic("%s: transport %s: null reason", 206 myname, transport->name); 207 dsn_free(transport->dsn); 208 transport->dsn = 0; 209 event_cancel_timer(qmgr_transport_unthrottle_wrapper, 210 (void *) transport); 211 } 212 if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) 213 transport->flags &= ~QMGR_TRANSPORT_STAT_RATE_LOCK; 214 } 215 216 /* qmgr_transport_throttle - disable delivery process allocation */ 217 218 void qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn) 219 { 220 const char *myname = "qmgr_transport_throttle"; 221 222 /* 223 * We are unable to connect to a deliver process for this type of message 224 * transport. Instead of hosing the system by retrying in a tight loop, 225 * back off and disable this transport type for a while. 226 */ 227 if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) { 228 if (msg_verbose) 229 msg_info("%s: transport %s: status: %s reason: %s", 230 myname, transport->name, dsn->status, dsn->reason); 231 transport->flags |= QMGR_TRANSPORT_STAT_DEAD; 232 if (transport->dsn) 233 msg_panic("%s: transport %s: spurious reason: %s", 234 myname, transport->name, transport->dsn->reason); 235 transport->dsn = DSN_COPY(dsn); 236 event_request_timer(qmgr_transport_unthrottle_wrapper, 237 (void *) transport, var_transport_retry_time); 238 } 239 } 240 241 /* qmgr_transport_abort - transport connect watchdog */ 242 243 static void qmgr_transport_abort(int unused_event, void *context) 244 { 245 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; 246 247 msg_fatal("timeout connecting to transport: %s", alloc->transport->name); 248 } 249 250 /* qmgr_transport_rate_event - delivery process availability notice */ 251 252 static void qmgr_transport_rate_event(int unused_event, void *context) 253 { 254 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; 255 256 alloc->notify(alloc->transport, alloc->stream); 257 myfree((void *) alloc); 258 } 259 260 /* qmgr_transport_event - delivery process availability notice */ 261 262 static void qmgr_transport_event(int unused_event, void *context) 263 { 264 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; 265 266 /* 267 * This routine notifies the application when the request given to 268 * qmgr_transport_alloc() completes. 269 */ 270 if (msg_verbose) 271 msg_info("transport_event: %s", alloc->transport->name); 272 273 /* 274 * Connection request completed. Stop the watchdog timer. 275 */ 276 event_cancel_timer(qmgr_transport_abort, context); 277 278 /* 279 * Disable further read events that end up calling this function, and 280 * free up this pending connection pipeline slot. 281 */ 282 if (alloc->stream) { 283 event_disable_readwrite(vstream_fileno(alloc->stream)); 284 non_blocking(vstream_fileno(alloc->stream), BLOCKING); 285 } 286 alloc->transport->pending -= 1; 287 288 /* 289 * Notify the requestor. 290 */ 291 if (alloc->transport->xport_rate_delay > 0) { 292 if ((alloc->transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) == 0) 293 msg_panic("transport_event: missing rate lock for transport %s", 294 alloc->transport->name); 295 event_request_timer(qmgr_transport_rate_event, (void *) alloc, 296 alloc->transport->xport_rate_delay); 297 } else { 298 alloc->notify(alloc->transport, alloc->stream); 299 myfree((void *) alloc); 300 } 301 } 302 303 /* qmgr_transport_select - select transport for allocation */ 304 305 QMGR_TRANSPORT *qmgr_transport_select(void) 306 { 307 QMGR_TRANSPORT *xport; 308 QMGR_QUEUE *queue; 309 int need; 310 311 /* 312 * If we find a suitable transport, rotate the list of transports to 313 * effectuate round-robin selection. See similar selection code in 314 * qmgr_peer_select(). 315 * 316 * This function is called repeatedly until all transports have maxed out 317 * the number of pending delivery agent connections, until all delivery 318 * agent concurrency windows are maxed out, or until we run out of "todo" 319 * queue entries. 320 */ 321 #define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y)) 322 323 for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) { 324 if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0 325 || (xport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) != 0 326 || xport->pending >= QMGR_TRANSPORT_MAX_PEND) 327 continue; 328 need = xport->pending + 1; 329 for (queue = xport->queue_list.next; queue; queue = queue->peers.next) { 330 if (QMGR_QUEUE_READY(queue) == 0) 331 continue; 332 if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount, 333 queue->todo_refcount)) <= 0) { 334 QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers); 335 if (msg_verbose) 336 msg_info("qmgr_transport_select: %s", xport->name); 337 return (xport); 338 } 339 } 340 } 341 return (0); 342 } 343 344 /* qmgr_transport_alloc - allocate delivery process */ 345 346 void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify) 347 { 348 QMGR_TRANSPORT_ALLOC *alloc; 349 350 /* 351 * Sanity checks. 352 */ 353 if (transport->flags & QMGR_TRANSPORT_STAT_DEAD) 354 msg_panic("qmgr_transport: dead transport: %s", transport->name); 355 if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) 356 msg_panic("qmgr_transport: rate-locked transport: %s", transport->name); 357 if (transport->pending >= QMGR_TRANSPORT_MAX_PEND) 358 msg_panic("qmgr_transport: excess allocation: %s", transport->name); 359 360 /* 361 * When this message delivery transport is rate-limited, do not select it 362 * again before the end of a message delivery transaction. 363 */ 364 if (transport->xport_rate_delay > 0) 365 transport->flags |= QMGR_TRANSPORT_STAT_RATE_LOCK; 366 367 /* 368 * Connect to the well-known port for this delivery service, and wake up 369 * when a process announces its availability. Allow only a limited number 370 * of delivery process allocation attempts for this transport. In case of 371 * problems, back off. Do not hose the system when it is in trouble 372 * already. 373 * 374 * Use non-blocking connect(), so that Linux won't block the queue manager 375 * until the delivery agent calls accept(). 376 * 377 * When the connection to delivery agent cannot be completed, notify the 378 * event handler so that it can throttle the transport and defer the todo 379 * queues, just like it does when communication fails *after* connection 380 * completion. 381 * 382 * Before Postfix 2.4, the event handler was not invoked after connect() 383 * error, and mail was not deferred. Because of this, mail would be stuck 384 * in the active queue after triggering a "connection refused" condition. 385 */ 386 alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc)); 387 alloc->transport = transport; 388 alloc->notify = notify; 389 transport->pending += 1; 390 if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, 391 NON_BLOCKING)) == 0) { 392 msg_warn("connect to transport %s/%s: %m", 393 MAIL_CLASS_PRIVATE, transport->name); 394 event_request_timer(qmgr_transport_event, (void *) alloc, 0); 395 return; 396 } 397 #if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(CA_VSTREAM_CTL_DUPFD) 398 #ifndef THRESHOLD_FD_WORKAROUND 399 #define THRESHOLD_FD_WORKAROUND 128 400 #endif 401 vstream_control(alloc->stream, 402 CA_VSTREAM_CTL_DUPFD(THRESHOLD_FD_WORKAROUND), 403 CA_VSTREAM_CTL_END); 404 #endif 405 event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event, 406 (void *) alloc); 407 408 /* 409 * Guard against broken systems. 410 */ 411 event_request_timer(qmgr_transport_abort, (void *) alloc, 412 var_daemon_timeout); 413 } 414 415 /* qmgr_transport_create - create transport instance */ 416 417 QMGR_TRANSPORT *qmgr_transport_create(const char *name) 418 { 419 QMGR_TRANSPORT *transport; 420 421 if (htable_find(qmgr_transport_byname, name) != 0) 422 msg_panic("qmgr_transport_create: transport exists: %s", name); 423 transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT)); 424 transport->flags = 0; 425 transport->pending = 0; 426 transport->name = mystrdup(name); 427 428 /* 429 * Use global configuration settings or transport-specific settings. 430 */ 431 transport->dest_concurrency_limit = 432 get_mail_conf_int2(name, _DEST_CON_LIMIT, 433 var_dest_con_limit, 0, 0); 434 transport->recipient_limit = 435 get_mail_conf_int2(name, _DEST_RCPT_LIMIT, 436 var_dest_rcpt_limit, 0, 0); 437 transport->init_dest_concurrency = 438 get_mail_conf_int2(name, _INIT_DEST_CON, 439 var_init_dest_concurrency, 1, 0); 440 transport->xport_rate_delay = get_mail_conf_time2(name, _XPORT_RATE_DELAY, 441 var_xport_rate_delay, 442 's', 0, 0); 443 transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY, 444 var_dest_rate_delay, 445 's', 0, 0); 446 447 if (transport->rate_delay > 0) 448 transport->dest_concurrency_limit = 1; 449 if (transport->dest_concurrency_limit != 0 450 && transport->dest_concurrency_limit < transport->init_dest_concurrency) 451 transport->init_dest_concurrency = transport->dest_concurrency_limit; 452 453 transport->slot_cost = get_mail_conf_int2(name, _DELIVERY_SLOT_COST, 454 var_delivery_slot_cost, 0, 0); 455 transport->slot_loan = get_mail_conf_int2(name, _DELIVERY_SLOT_LOAN, 456 var_delivery_slot_loan, 0, 0); 457 transport->slot_loan_factor = 458 100 - get_mail_conf_int2(name, _DELIVERY_SLOT_DISCOUNT, 459 var_delivery_slot_discount, 0, 100); 460 transport->min_slots = get_mail_conf_int2(name, _MIN_DELIVERY_SLOTS, 461 var_min_delivery_slots, 0, 0); 462 transport->rcpt_unused = get_mail_conf_int2(name, _XPORT_RCPT_LIMIT, 463 var_xport_rcpt_limit, 0, 0); 464 transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT, 465 var_stack_rcpt_limit, 0, 0); 466 transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT, 467 var_xport_refill_limit, 1, 0); 468 transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY, 469 var_xport_refill_delay, 's', 1, 0); 470 471 transport->queue_byname = htable_create(0); 472 QMGR_LIST_INIT(transport->queue_list); 473 transport->job_byname = htable_create(0); 474 QMGR_LIST_INIT(transport->job_list); 475 QMGR_LIST_INIT(transport->job_bytime); 476 transport->job_current = 0; 477 transport->job_next_unread = 0; 478 transport->candidate_cache = 0; 479 transport->candidate_cache_current = 0; 480 transport->candidate_cache_time = (time_t) 0; 481 transport->blocker_tag = 1; 482 transport->dsn = 0; 483 qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK, 484 VAR_CONC_POS_FDBACK, var_conc_pos_feedback); 485 qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK, 486 VAR_CONC_NEG_FDBACK, var_conc_neg_feedback); 487 transport->fail_cohort_limit = 488 get_mail_conf_int2(name, _CONC_COHORT_LIM, 489 var_conc_cohort_limit, 0, 0); 490 if (qmgr_transport_byname == 0) 491 qmgr_transport_byname = htable_create(10); 492 htable_enter(qmgr_transport_byname, name, (void *) transport); 493 QMGR_LIST_PREPEND(qmgr_transport_list, transport, peers); 494 if (msg_verbose) 495 msg_info("qmgr_transport_create: %s concurrency %d recipients %d", 496 transport->name, transport->dest_concurrency_limit, 497 transport->recipient_limit); 498 return (transport); 499 } 500 501 /* qmgr_transport_find - find transport instance */ 502 503 QMGR_TRANSPORT *qmgr_transport_find(const char *name) 504 { 505 return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name)); 506 } 507