1 /* $NetBSD: qmgr_active.c,v 1.2 2017/02/14 01:16:46 christos Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmgr_active 3 6 /* SUMMARY 7 /* active queue management 8 /* SYNOPSIS 9 /* #include "qmgr.h" 10 /* 11 /* void qmgr_active_feed(scan_info, queue_id) 12 /* QMGR_SCAN *scan_info; 13 /* const char *queue_id; 14 /* 15 /* void qmgr_active_drain() 16 /* 17 /* int qmgr_active_done(message) 18 /* QMGR_MESSAGE *message; 19 /* DESCRIPTION 20 /* These functions maintain the active message queue: the set 21 /* of messages that the queue manager is actually working on. 22 /* The active queue is limited in size. Messages are drained 23 /* from the active queue by allocating a delivery process and 24 /* by delivering mail via that process. Messages leak into the 25 /* active queue only when the active queue is small enough. 26 /* Damaged message files are saved to the "corrupt" directory. 27 /* 28 /* qmgr_active_feed() inserts the named message file into 29 /* the active queue. Message files with the wrong name or 30 /* with other wrong properties are skipped but not removed. 31 /* The following queue flags are recognized, other flags being 32 /* ignored: 33 /* .IP QMGR_SCAN_ALL 34 /* Examine all queue files. Normally, deferred queue files with 35 /* future time stamps are ignored, and incoming queue files with 36 /* future time stamps are frowned upon. 37 /* .PP 38 /* qmgr_active_drain() allocates one delivery process. 39 /* Process allocation is asynchronous. Once the delivery 40 /* process is available, an attempt is made to deliver 41 /* a message via it. Message delivery is asynchronous, too. 42 /* 43 /* qmgr_active_done() deals with a message after delivery 44 /* has been tried for all in-core recipients. If the message 45 /* was bounced, a bounce message is sent to the sender, or 46 /* to the Errors-To: address if one was specified. 47 /* If there are more on-file recipients, a new batch of 48 /* in-core recipients is read from the queue file. Otherwise, 49 /* if a delivery agent marked the queue file as corrupt, 50 /* the queue file is moved to the "corrupt" queue (surprise); 51 /* if at least one delivery failed, the message is moved 52 /* to the deferred queue. The time stamps of a deferred queue 53 /* file are set to the nearest wakeup time of its recipient 54 /* sites (if delivery failed due to a problem with a next-hop 55 /* host), are set into the future by the amount of time the 56 /* message was queued (per-message exponential backoff), or are set 57 /* into the future by a minimal backoff time, whichever is more. 58 /* The minimal_backoff_time parameter specifies the minimal 59 /* amount of time between delivery attempts; maximal_backoff_time 60 /* specifies an upper limit. 61 /* DIAGNOSTICS 62 /* Fatal: queue file access failures, out of memory. 63 /* Panic: interface violations, internal consistency errors. 64 /* Warnings: corrupt message file. A corrupt message is saved 65 /* to the "corrupt" queue for further inspection. 66 /* LICENSE 67 /* .ad 68 /* .fi 69 /* The Secure Mailer license must be distributed with this software. 70 /* AUTHOR(S) 71 /* Wietse Venema 72 /* IBM T.J. Watson Research 73 /* P.O. Box 704 74 /* Yorktown Heights, NY 10598, USA 75 /*--*/ 76 77 /* System library. */ 78 79 #include <sys_defs.h> 80 #include <sys/stat.h> 81 #include <dirent.h> 82 #include <stdlib.h> 83 #include <unistd.h> 84 #include <string.h> 85 #include <utime.h> 86 #include <errno.h> 87 88 #ifndef S_IRWXU /* What? no POSIX system? */ 89 #define S_IRWXU 0700 90 #endif 91 92 /* Utility library. */ 93 94 #include <msg.h> 95 #include <events.h> 96 #include <mymalloc.h> 97 #include <vstream.h> 98 #include <warn_stat.h> 99 100 /* Global library. */ 101 102 #include <mail_params.h> 103 #include <mail_open_ok.h> 104 #include <mail_queue.h> 105 #include <recipient_list.h> 106 #include <bounce.h> 107 #include <defer.h> 108 #include <trace.h> 109 #include <abounce.h> 110 #include <rec_type.h> 111 #include <qmgr_user.h> 112 113 /* Application-specific. */ 114 115 #include "qmgr.h" 116 117 /* 118 * A bunch of call-back routines. 119 */ 120 static void qmgr_active_done_2_bounce_flush(int, void *); 121 static void qmgr_active_done_2_generic(QMGR_MESSAGE *); 122 static void qmgr_active_done_25_trace_flush(int, void *); 123 static void qmgr_active_done_25_generic(QMGR_MESSAGE *); 124 static void qmgr_active_done_3_defer_flush(int, void *); 125 static void qmgr_active_done_3_defer_warn(int, void *); 126 static void qmgr_active_done_3_generic(QMGR_MESSAGE *); 127 128 /* qmgr_active_corrupt - move corrupted file out of the way */ 129 130 static void qmgr_active_corrupt(const char *queue_id) 131 { 132 const char *myname = "qmgr_active_corrupt"; 133 134 if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) { 135 if (errno != ENOENT) 136 msg_fatal("%s: save corrupt file queue %s id %s: %m", 137 myname, MAIL_QUEUE_ACTIVE, queue_id); 138 } else { 139 msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"", 140 queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT); 141 } 142 } 143 144 /* qmgr_active_defer - defer queue file */ 145 146 static void qmgr_active_defer(const char *queue_name, const char *queue_id, 147 const char *dest_queue, int delay) 148 { 149 const char *myname = "qmgr_active_defer"; 150 const char *path; 151 struct utimbuf tbuf; 152 153 if (msg_verbose) 154 msg_info("wakeup %s after %ld secs", queue_id, (long) delay); 155 156 tbuf.actime = tbuf.modtime = event_time() + delay; 157 path = mail_queue_path((VSTRING *) 0, queue_name, queue_id); 158 if (utime(path, &tbuf) < 0 && errno != ENOENT) 159 msg_fatal("%s: update %s time stamps: %m", myname, path); 160 if (mail_queue_rename(queue_id, queue_name, dest_queue)) { 161 if (errno != ENOENT) 162 msg_fatal("%s: rename %s from %s to %s: %m", myname, 163 queue_id, queue_name, dest_queue); 164 msg_warn("%s: rename %s from %s to %s: %m", myname, 165 queue_id, queue_name, dest_queue); 166 } else if (msg_verbose) { 167 msg_info("%s: defer %s", myname, queue_id); 168 } 169 } 170 171 /* qmgr_active_feed - feed one message into active queue */ 172 173 int qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id) 174 { 175 const char *myname = "qmgr_active_feed"; 176 QMGR_MESSAGE *message; 177 struct stat st; 178 const char *path; 179 180 if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0) 181 msg_panic("%s: bad queue %s", myname, scan_info->queue); 182 if (msg_verbose) 183 msg_info("%s: queue %s", myname, scan_info->queue); 184 185 /* 186 * Make sure this is something we are willing to open. 187 */ 188 if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO) 189 return (0); 190 191 if (msg_verbose) 192 msg_info("%s: %s", myname, path); 193 194 /* 195 * Skip files that have time stamps into the future. They need to cool 196 * down. Incoming and deferred files can have future time stamps. 197 */ 198 if ((scan_info->flags & QMGR_SCAN_ALL) == 0 199 && st.st_mtime > time((time_t *) 0) + 1) { 200 if (msg_verbose) 201 msg_info("%s: skip %s (%ld seconds)", myname, queue_id, 202 (long) (st.st_mtime - event_time())); 203 return (0); 204 } 205 206 /* 207 * Move the message to the active queue. File access errors are fatal. 208 */ 209 if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) { 210 if (errno != ENOENT) 211 msg_fatal("%s: %s: rename from %s to %s: %m", myname, 212 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE); 213 msg_warn("%s: %s: rename from %s to %s: %m", myname, 214 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE); 215 return (0); 216 } 217 218 /* 219 * Extract envelope information: sender and recipients. At this point, 220 * mail addresses have been processed by the cleanup service so they 221 * should be in canonical form. Generate requests to deliver this 222 * message. 223 * 224 * Throwing away queue files seems bad, especially when they made it this 225 * far into the mail system. Therefore we save bad files to a separate 226 * directory for further inspection. 227 * 228 * After queue manager restart it is possible that a queue file is still 229 * being delivered. In that case (the file is locked), defer delivery by 230 * a minimal amount of time. 231 */ 232 #define QMGR_FLUSH_AFTER (QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP) 233 234 if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id, 235 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ? 236 scan_info->flags | QMGR_FLUSH_AFTER : 237 scan_info->flags, 238 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ? 239 st.st_mode & ~MAIL_QUEUE_STAT_UNTHROTTLE : 240 0)) == 0) { 241 qmgr_active_corrupt(queue_id); 242 return (0); 243 } else if (message == QMGR_MESSAGE_LOCKED) { 244 qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60); 245 return (0); 246 } else { 247 248 /* 249 * Special case if all recipients were already delivered. Send any 250 * bounces and clean up. 251 */ 252 if (message->refcount == 0) 253 qmgr_active_done(message); 254 return (1); 255 } 256 } 257 258 /* qmgr_active_done - dispose of message after recipients have been tried */ 259 260 void qmgr_active_done(QMGR_MESSAGE *message) 261 { 262 const char *myname = "qmgr_active_done"; 263 struct stat st; 264 265 if (msg_verbose) 266 msg_info("%s: %s", myname, message->queue_id); 267 268 /* 269 * During a previous iteration, an attempt to bounce this message may 270 * have failed, so there may still be a bounce log lying around. XXX By 271 * groping around in the bounce queue, we're trespassing on the bounce 272 * service's territory. But doing so is more robust than depending on the 273 * bounce daemon to do the lookup for us, and for us to do the deleting 274 * after we have received a successful status from the bounce service. 275 * The bounce queue directory blocks are most likely in memory anyway. If 276 * these lookups become a performance problem we will have to build an 277 * in-core cache into the bounce daemon. 278 * 279 * Don't bounce when the bounce log is empty. The bounce process obviously 280 * failed, and the delivery agent will have requested that the message be 281 * deferred. 282 * 283 * Bounces are sent asynchronously to avoid stalling while the cleanup 284 * daemon waits for the qmgr to accept the "new mail" trigger. 285 * 286 * See also code in cleanup_bounce.c. 287 */ 288 if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) { 289 if (st.st_size == 0) { 290 if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id)) 291 msg_fatal("remove %s %s: %m", 292 MAIL_QUEUE_BOUNCE, message->queue_id); 293 } else { 294 if (msg_verbose) 295 msg_info("%s: bounce %s", myname, message->queue_id); 296 if (message->verp_delims == 0 || var_verp_bounce_off) 297 abounce_flush(BOUNCE_FLAG_KEEP, 298 message->queue_name, 299 message->queue_id, 300 message->encoding, 301 message->smtputf8, 302 message->sender, 303 message->dsn_envid, 304 message->dsn_ret, 305 qmgr_active_done_2_bounce_flush, 306 (void *) message); 307 else 308 abounce_flush_verp(BOUNCE_FLAG_KEEP, 309 message->queue_name, 310 message->queue_id, 311 message->encoding, 312 message->smtputf8, 313 message->sender, 314 message->dsn_envid, 315 message->dsn_ret, 316 message->verp_delims, 317 qmgr_active_done_2_bounce_flush, 318 (void *) message); 319 return; 320 } 321 } 322 323 /* 324 * Asynchronous processing does not reach this point. 325 */ 326 qmgr_active_done_2_generic(message); 327 } 328 329 /* qmgr_active_done_2_bounce_flush - process abounce_flush() status */ 330 331 static void qmgr_active_done_2_bounce_flush(int status, void *context) 332 { 333 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 334 335 /* 336 * Process abounce_flush() status and continue processing. 337 */ 338 message->flags |= status; 339 qmgr_active_done_2_generic(message); 340 } 341 342 /* qmgr_active_done_2_generic - continue processing */ 343 344 static void qmgr_active_done_2_generic(QMGR_MESSAGE *message) 345 { 346 const char *path; 347 struct stat st; 348 349 /* 350 * A delivery agent marks a queue file as corrupt by changing its 351 * attributes, and by pretending that delivery was deferred. 352 */ 353 if (message->flags 354 && mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) { 355 qmgr_active_corrupt(message->queue_id); 356 qmgr_message_free(message); 357 return; 358 } 359 360 /* 361 * If we did not read all recipients from this file, go read some more, 362 * but remember whether some recipients have to be tried again. 363 * 364 * Throwing away queue files seems bad, especially when they made it this 365 * far into the mail system. Therefore we save bad files to a separate 366 * directory for further inspection by a human being. 367 */ 368 if (message->rcpt_offset > 0) { 369 if (qmgr_message_realloc(message) == 0) { 370 qmgr_active_corrupt(message->queue_id); 371 qmgr_message_free(message); 372 } else { 373 if (message->refcount == 0) 374 qmgr_active_done(message); /* recurse for consistency */ 375 } 376 return; 377 } 378 379 /* 380 * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS 381 * and others not. Depending on what subset of recipients are delivered, 382 * a trace file may or may not be created. Even when the last partial 383 * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may 384 * still exist from a previous partial delivery attempt. So as long as 385 * any recipient has NOTIFY=SUCCESS we have to always look for the trace 386 * file and be prepared for the file not to exist. 387 * 388 * See also comments in bounce/bounce_notify_util.c. 389 */ 390 if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD 391 | DEL_REQ_FLAG_REC_DLY_SENT)) 392 || (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) { 393 atrace_flush(message->tflags, 394 message->queue_name, 395 message->queue_id, 396 message->encoding, 397 message->smtputf8, 398 message->sender, 399 message->dsn_envid, 400 message->dsn_ret, 401 qmgr_active_done_25_trace_flush, 402 (void *) message); 403 return; 404 } 405 406 /* 407 * Asynchronous processing does not reach this point. 408 */ 409 qmgr_active_done_25_generic(message); 410 } 411 412 /* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */ 413 414 static void qmgr_active_done_25_trace_flush(int status, void *context) 415 { 416 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 417 418 /* 419 * Process atrace_flush() status and continue processing. 420 */ 421 if (status == 0 && message->tflags_offset) 422 qmgr_message_kill_record(message, message->tflags_offset); 423 message->flags |= status; 424 qmgr_active_done_25_generic(message); 425 } 426 427 /* qmgr_active_done_25_generic - continue processing */ 428 429 static void qmgr_active_done_25_generic(QMGR_MESSAGE *message) 430 { 431 const char *myname = "qmgr_active_done_25_generic"; 432 433 /* 434 * If we get to this point we have tried all recipients for this message. 435 * If the message is too old, try to bounce it. 436 * 437 * Bounces are sent asynchronously to avoid stalling while the cleanup 438 * daemon waits for the qmgr to accept the "new mail" trigger. 439 */ 440 if (message->flags) { 441 if (event_time() >= message->create_time + 442 (*message->sender ? var_max_queue_time : var_dsn_queue_time)) { 443 msg_info("%s: from=<%s>, status=expired, returned to sender", 444 message->queue_id, message->sender); 445 if (message->verp_delims == 0 || var_verp_bounce_off) 446 adefer_flush(BOUNCE_FLAG_KEEP, 447 message->queue_name, 448 message->queue_id, 449 message->encoding, 450 message->smtputf8, 451 message->sender, 452 message->dsn_envid, 453 message->dsn_ret, 454 qmgr_active_done_3_defer_flush, 455 (void *) message); 456 else 457 adefer_flush_verp(BOUNCE_FLAG_KEEP, 458 message->queue_name, 459 message->queue_id, 460 message->encoding, 461 message->smtputf8, 462 message->sender, 463 message->dsn_envid, 464 message->dsn_ret, 465 message->verp_delims, 466 qmgr_active_done_3_defer_flush, 467 (void *) message); 468 return; 469 } else if (message->warn_time > 0 470 && event_time() >= message->warn_time - 1) { 471 if (msg_verbose) 472 msg_info("%s: sending defer warning for %s", myname, message->queue_id); 473 adefer_warn(BOUNCE_FLAG_KEEP, 474 message->queue_name, 475 message->queue_id, 476 message->encoding, 477 message->smtputf8, 478 message->sender, 479 message->dsn_envid, 480 message->dsn_ret, 481 qmgr_active_done_3_defer_warn, 482 (void *) message); 483 return; 484 } 485 } 486 487 /* 488 * Asynchronous processing does not reach this point. 489 */ 490 qmgr_active_done_3_generic(message); 491 } 492 493 /* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */ 494 495 static void qmgr_active_done_3_defer_warn(int status, void *context) 496 { 497 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 498 499 /* 500 * Process adefer_warn() completion status and continue processing. 501 */ 502 if (status == 0) 503 qmgr_message_update_warn(message); 504 qmgr_active_done_3_generic(message); 505 } 506 507 /* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */ 508 509 static void qmgr_active_done_3_defer_flush(int status, void *context) 510 { 511 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 512 513 /* 514 * Process adefer_flush() status and continue processing. 515 */ 516 message->flags = status; 517 qmgr_active_done_3_generic(message); 518 } 519 520 /* qmgr_active_done_3_generic - continue processing */ 521 522 static void qmgr_active_done_3_generic(QMGR_MESSAGE *message) 523 { 524 const char *myname = "qmgr_active_done_3_generic"; 525 int delay; 526 527 /* 528 * Some recipients need to be tried again. Move the queue file time 529 * stamps into the future by the amount of time that the message is 530 * delayed, and move the message to the deferred queue. Impose minimal 531 * and maximal backoff times. 532 * 533 * Since we look at actual time in queue, not time since last delivery 534 * attempt, backoff times will be distributed. However, we can still see 535 * spikes in delivery activity because the interval between deferred 536 * queue scans is finite. 537 */ 538 if (message->flags) { 539 if (message->create_time > 0) { 540 delay = event_time() - message->create_time; 541 if (delay > var_max_backoff_time) 542 delay = var_max_backoff_time; 543 if (delay < var_min_backoff_time) 544 delay = var_min_backoff_time; 545 } else { 546 delay = var_min_backoff_time; 547 } 548 qmgr_active_defer(message->queue_name, message->queue_id, 549 MAIL_QUEUE_DEFERRED, delay); 550 } 551 552 /* 553 * All recipients done. Remove the queue file. 554 */ 555 else { 556 if (mail_queue_remove(message->queue_name, message->queue_id)) { 557 if (errno != ENOENT) 558 msg_fatal("%s: remove %s from %s: %m", myname, 559 message->queue_id, message->queue_name); 560 msg_warn("%s: remove %s from %s: %m", myname, 561 message->queue_id, message->queue_name); 562 } else { 563 /* Same format as logged by postsuper. */ 564 msg_info("%s: removed", message->queue_id); 565 } 566 } 567 568 /* 569 * Finally, delete the in-core message structure. 570 */ 571 qmgr_message_free(message); 572 } 573 574 /* qmgr_active_drain - drain active queue by allocating a delivery process */ 575 576 void qmgr_active_drain(void) 577 { 578 QMGR_TRANSPORT *transport; 579 580 /* 581 * Allocate one delivery process for every transport with pending mail. 582 * The process allocation completes asynchronously. 583 */ 584 while ((transport = qmgr_transport_select()) != 0) { 585 if (msg_verbose) 586 msg_info("qmgr_active_drain: allocate %s", transport->name); 587 qmgr_transport_alloc(transport, qmgr_deliver); 588 } 589 } 590