1 /* $NetBSD: qmgr_message.c,v 1.4 2022/10/08 16:12:48 christos Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmgr_message 3 6 /* SUMMARY 7 /* in-core message structures 8 /* SYNOPSIS 9 /* #include "qmgr.h" 10 /* 11 /* int qmgr_message_count; 12 /* int qmgr_recipient_count; 13 /* int qmgr_vrfy_pend_count; 14 /* 15 /* QMGR_MESSAGE *qmgr_message_alloc(class, name, qflags, mode) 16 /* const char *class; 17 /* const char *name; 18 /* int qflags; 19 /* mode_t mode; 20 /* 21 /* QMGR_MESSAGE *qmgr_message_realloc(message) 22 /* QMGR_MESSAGE *message; 23 /* 24 /* void qmgr_message_free(message) 25 /* QMGR_MESSAGE *message; 26 /* 27 /* void qmgr_message_update_warn(message) 28 /* QMGR_MESSAGE *message; 29 /* 30 /* void qmgr_message_kill_record(message, offset) 31 /* QMGR_MESSAGE *message; 32 /* long offset; 33 /* DESCRIPTION 34 /* This module performs en-gross operations on queue messages. 35 /* 36 /* qmgr_message_count is a global counter for the total number 37 /* of in-core message structures (i.e. the total size of the 38 /* `active' message queue). 39 /* 40 /* qmgr_recipient_count is a global counter for the total number 41 /* of in-core recipient structures (i.e. the sum of all recipients 42 /* in all in-core message structures). 43 /* 44 /* qmgr_vrfy_pend_count is a global counter for the total 45 /* number of in-core message structures that are associated 46 /* with an address verification request. Requests that exceed 47 /* the address_verify_pending_limit are deferred immediately. 48 /* This is a backup mechanism for a more refined enforcement 49 /* mechanism in the verify(8) daemon. 50 /* 51 /* qmgr_message_alloc() creates an in-core message structure 52 /* with sender and recipient information taken from the named queue 53 /* file. A null result means the queue file could not be read or 54 /* that the queue file contained incorrect information. A result 55 /* QMGR_MESSAGE_LOCKED means delivery must be deferred. The number 56 /* of recipients read from a queue file is limited by the global 57 /* var_qmgr_rcpt_limit configuration parameter. When the limit 58 /* is reached, the \fIrcpt_offset\fR structure member is set to 59 /* the position where the read was terminated. Recipients are 60 /* run through the resolver, and are assigned to destination 61 /* queues. Recipients that cannot be assigned are deferred or 62 /* bounced. Mail that has bounced twice is silently absorbed. 63 /* A non-zero mode means change the queue file permissions. 64 /* 65 /* qmgr_message_realloc() resumes reading recipients from the queue 66 /* file, and updates the recipient list and \fIrcpt_offset\fR message 67 /* structure members. A null result means that the file could not be 68 /* read or that the file contained incorrect information. Recipient 69 /* limit imposed this time is based on the position of the message 70 /* job(s) on corresponding transport job list(s). It's considered 71 /* an error to call this when the recipient slots can't be allocated. 72 /* 73 /* qmgr_message_free() destroys an in-core message structure and makes 74 /* the resources available for reuse. It is an error to destroy 75 /* a message structure that is still referenced by queue entry structures. 76 /* 77 /* qmgr_message_update_warn() takes a closed message, opens it, updates 78 /* the warning field, and closes it again. 79 /* 80 /* qmgr_message_kill_record() takes a closed message, opens it, updates 81 /* the record type at the given offset to "killed", and closes the file. 82 /* A killed envelope record is ignored. Killed records are not allowed 83 /* inside the message content. 84 /* DIAGNOSTICS 85 /* Warnings: malformed message file. Fatal errors: out of memory. 86 /* SEE ALSO 87 /* envelope(3) message envelope parser 88 /* LICENSE 89 /* .ad 90 /* .fi 91 /* The Secure Mailer license must be distributed with this software. 92 /* AUTHOR(S) 93 /* Wietse Venema 94 /* IBM T.J. Watson Research 95 /* P.O. Box 704 96 /* Yorktown Heights, NY 10598, USA 97 /* 98 /* Wietse Venema 99 /* Google, Inc. 100 /* 111 8th Avenue 101 /* New York, NY 10011, USA 102 /* 103 /* Preemptive scheduler enhancements: 104 /* Patrik Rak 105 /* Modra 6 106 /* 155 00, Prague, Czech Republic 107 /*--*/ 108 109 /* System library. */ 110 111 #include <sys_defs.h> 112 #include <sys/stat.h> 113 #include <stdlib.h> 114 #include <stdio.h> /* sscanf() */ 115 #include <fcntl.h> 116 #include <errno.h> 117 #include <unistd.h> 118 #include <string.h> 119 #include <ctype.h> 120 121 /* Utility library. */ 122 123 #include <msg.h> 124 #include <mymalloc.h> 125 #include <vstring.h> 126 #include <vstream.h> 127 #include <split_at.h> 128 #include <valid_hostname.h> 129 #include <argv.h> 130 #include <stringops.h> 131 #include <myflock.h> 132 #include <sane_time.h> 133 134 /* Global library. */ 135 136 #include <dict.h> 137 #include <mail_queue.h> 138 #include <mail_params.h> 139 #include <canon_addr.h> 140 #include <record.h> 141 #include <rec_type.h> 142 #include <sent.h> 143 #include <deliver_completed.h> 144 #include <opened.h> 145 #include <verp_sender.h> 146 #include <mail_proto.h> 147 #include <qmgr_user.h> 148 #include <split_addr.h> 149 #include <dsn_mask.h> 150 #include <rec_attr_map.h> 151 152 /* Client stubs. */ 153 154 #include <rewrite_clnt.h> 155 #include <resolve_clnt.h> 156 157 /* Application-specific. */ 158 159 #include "qmgr.h" 160 161 int qmgr_message_count; 162 int qmgr_recipient_count; 163 int qmgr_vrfy_pend_count; 164 165 /* qmgr_message_create - create in-core message structure */ 166 167 static QMGR_MESSAGE *qmgr_message_create(const char *queue_name, 168 const char *queue_id, int qflags) 169 { 170 QMGR_MESSAGE *message; 171 172 message = (QMGR_MESSAGE *) mymalloc(sizeof(QMGR_MESSAGE)); 173 qmgr_message_count++; 174 message->flags = 0; 175 message->qflags = qflags; 176 message->tflags = 0; 177 message->tflags_offset = 0; 178 message->rflags = QMGR_READ_FLAG_DEFAULT; 179 message->fp = 0; 180 message->refcount = 0; 181 message->single_rcpt = 0; 182 message->arrival_time.tv_sec = message->arrival_time.tv_usec = 0; 183 message->create_time = 0; 184 GETTIMEOFDAY(&message->active_time); 185 message->queued_time = sane_time(); 186 message->refill_time = 0; 187 message->data_offset = 0; 188 message->queue_id = mystrdup(queue_id); 189 message->queue_name = mystrdup(queue_name); 190 message->encoding = 0; 191 message->sender = 0; 192 message->dsn_envid = 0; 193 message->dsn_ret = 0; 194 message->smtputf8 = 0; 195 message->filter_xport = 0; 196 message->inspect_xport = 0; 197 message->redirect_addr = 0; 198 message->data_size = 0; 199 message->cont_length = 0; 200 message->warn_offset = 0; 201 message->warn_time = 0; 202 message->rcpt_offset = 0; 203 message->verp_delims = 0; 204 message->client_name = 0; 205 message->client_addr = 0; 206 message->client_port = 0; 207 message->client_proto = 0; 208 message->client_helo = 0; 209 message->sasl_method = 0; 210 message->sasl_username = 0; 211 message->sasl_sender = 0; 212 message->log_ident = 0; 213 message->rewrite_context = 0; 214 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); 215 message->rcpt_count = 0; 216 message->rcpt_limit = var_qmgr_msg_rcpt_limit; 217 message->rcpt_unread = 0; 218 QMGR_LIST_INIT(message->job_list); 219 return (message); 220 } 221 222 /* qmgr_message_close - close queue file */ 223 224 static void qmgr_message_close(QMGR_MESSAGE *message) 225 { 226 vstream_fclose(message->fp); 227 message->fp = 0; 228 } 229 230 /* qmgr_message_open - open queue file */ 231 232 static int qmgr_message_open(QMGR_MESSAGE *message) 233 { 234 235 /* 236 * Sanity check. 237 */ 238 if (message->fp) 239 msg_panic("%s: queue file is open", message->queue_id); 240 241 /* 242 * Open this queue file. Skip files that we cannot open. Back off when 243 * the system appears to be running out of resources. 244 */ 245 if ((message->fp = mail_queue_open(message->queue_name, 246 message->queue_id, 247 O_RDWR, 0)) == 0) { 248 if (errno != ENOENT) 249 msg_fatal("open %s %s: %m", message->queue_name, message->queue_id); 250 msg_warn("open %s %s: %m", message->queue_name, message->queue_id); 251 return (-1); 252 } 253 return (0); 254 } 255 256 /* qmgr_message_oldstyle_scan - support for Postfix < 1.0 queue files */ 257 258 static void qmgr_message_oldstyle_scan(QMGR_MESSAGE *message) 259 { 260 VSTRING *buf; 261 long orig_offset, extra_offset; 262 int rec_type; 263 char *start; 264 265 /* 266 * Initialize. No early returns or we have a memory leak. 267 */ 268 buf = vstring_alloc(100); 269 if ((orig_offset = vstream_ftell(message->fp)) < 0) 270 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); 271 272 /* 273 * Rewind to the very beginning to make sure we see all records. 274 */ 275 if (vstream_fseek(message->fp, 0, SEEK_SET) < 0) 276 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 277 278 /* 279 * Scan through the old style queue file. Count the total number of 280 * recipients and find the data/extra sections offsets. Note that the new 281 * queue files require that data_size equals extra_offset - data_offset, 282 * so we set data_size to this as well and ignore the size record itself 283 * completely. 284 */ 285 message->rcpt_unread = 0; 286 for (;;) { 287 rec_type = rec_get(message->fp, buf, 0); 288 if (rec_type <= 0) 289 /* Report missing end record later. */ 290 break; 291 start = vstring_str(buf); 292 if (msg_verbose > 1) 293 msg_info("old-style scan record %c %s", rec_type, start); 294 if (rec_type == REC_TYPE_END) 295 break; 296 if (rec_type == REC_TYPE_DONE 297 || rec_type == REC_TYPE_RCPT 298 || rec_type == REC_TYPE_DRCP) { 299 message->rcpt_unread++; 300 continue; 301 } 302 if (rec_type == REC_TYPE_MESG) { 303 if (message->data_offset == 0) { 304 if ((message->data_offset = vstream_ftell(message->fp)) < 0) 305 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); 306 if ((extra_offset = atol(start)) <= message->data_offset) 307 msg_fatal("bad extra offset %s file %s", 308 start, VSTREAM_PATH(message->fp)); 309 if (vstream_fseek(message->fp, extra_offset, SEEK_SET) < 0) 310 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 311 message->data_size = extra_offset - message->data_offset; 312 } 313 continue; 314 } 315 } 316 317 /* 318 * Clean up. 319 */ 320 if (vstream_fseek(message->fp, orig_offset, SEEK_SET) < 0) 321 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 322 vstring_free(buf); 323 324 /* 325 * Sanity checks. Verify that all required information was found, 326 * including the queue file end marker. 327 */ 328 if (message->data_offset == 0 || rec_type != REC_TYPE_END) 329 msg_fatal("%s: envelope records out of order", message->queue_id); 330 } 331 332 /* qmgr_message_read - read envelope records */ 333 334 static int qmgr_message_read(QMGR_MESSAGE *message) 335 { 336 VSTRING *buf; 337 int rec_type; 338 long curr_offset; 339 long save_offset = message->rcpt_offset; /* save a flag */ 340 int save_unread = message->rcpt_unread; /* save a count */ 341 char *start; 342 int recipient_limit; 343 const char *error_text; 344 char *name; 345 char *value; 346 char *orig_rcpt = 0; 347 int count; 348 int dsn_notify = 0; 349 char *dsn_orcpt = 0; 350 int n; 351 int have_log_client_attr = 0; 352 static const char env_rec_types[] = REC_TYPE_ENVELOPE REC_TYPE_EXTRACT; 353 static const char extra_rec_type[] = {REC_TYPE_XTRA, 0}; 354 const char *expected_rec_types; 355 356 /* 357 * Initialize. No early returns or we have a memory leak. 358 */ 359 buf = vstring_alloc(100); 360 361 /* 362 * If we re-open this file, skip over on-file recipient records that we 363 * already looked at, and refill the in-core recipient address list. 364 * 365 * For the first time, the message recipient limit is calculated from the 366 * global recipient limit. This is to avoid reading little recipients 367 * when the active queue is near empty. When the queue becomes full, only 368 * the necessary amount is read in core. Such priming is necessary 369 * because there are no message jobs yet. 370 * 371 * For the next time, the recipient limit is based solely on the message 372 * jobs' positions in the job lists and/or job stacks. 373 */ 374 if (message->rcpt_offset) { 375 if (message->rcpt_list.len) 376 msg_panic("%s: recipient list not empty on recipient reload", 377 message->queue_id); 378 if (vstream_fseek(message->fp, message->rcpt_offset, SEEK_SET) < 0) 379 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 380 message->rcpt_offset = 0; 381 recipient_limit = message->rcpt_limit - message->rcpt_count; 382 } else { 383 recipient_limit = var_qmgr_rcpt_limit - qmgr_recipient_count; 384 if (recipient_limit < message->rcpt_limit) 385 recipient_limit = message->rcpt_limit; 386 } 387 /* Keep interrupt latency in check. */ 388 if (recipient_limit > 5000) 389 recipient_limit = 5000; 390 if (recipient_limit <= 0) 391 msg_panic("%s: no recipient slots available", message->queue_id); 392 if (msg_verbose) 393 msg_info("%s: recipient limit %d", message->queue_id, recipient_limit); 394 395 /* 396 * Read envelope records. XXX Rely on the front-end programs to enforce 397 * record size limits. Read up to recipient_limit recipients from the 398 * queue file, to protect against memory exhaustion. Recipient records 399 * may appear before or after the message content, so we keep reading 400 * from the queue file until we have enough recipients (rcpt_offset != 0) 401 * and until we know all the non-recipient information. 402 * 403 * Note that the total recipient count record is accurate only for fresh 404 * queue files. After some of the recipients are marked as done and the 405 * queue file is deferred, it can be used as upper bound estimate only. 406 * Fortunately, this poses no major problem on the scheduling algorithm, 407 * as the only impact is that the already deferred messages are not 408 * chosen by qmgr_job_candidate() as often as they could. 409 * 410 * On the first open, we must examine all non-recipient records. 411 * 412 * Optimization: when we know that recipient records are not mixed with 413 * non-recipient records, as is typical with mailing list mail, then we 414 * can avoid having to examine all the queue file records before we can 415 * start deliveries. This avoids some file system thrashing with huge 416 * mailing lists. 417 */ 418 for (;;) { 419 expected_rec_types = env_rec_types; 420 if ((curr_offset = vstream_ftell(message->fp)) < 0) 421 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); 422 if (curr_offset == message->data_offset && curr_offset > 0) { 423 if (vstream_fseek(message->fp, message->data_size, SEEK_CUR) < 0) 424 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 425 curr_offset += message->data_size; 426 expected_rec_types = extra_rec_type; 427 } 428 rec_type = rec_get_raw(message->fp, buf, 0, REC_FLAG_NONE); 429 start = vstring_str(buf); 430 if (msg_verbose > 1) 431 msg_info("record %c %s", rec_type, start); 432 if (rec_type == REC_TYPE_PTR) { 433 if ((rec_type = rec_goto(message->fp, start)) == REC_TYPE_ERROR) 434 break; 435 /* Need to update curr_offset after pointer jump. */ 436 continue; 437 } 438 if (rec_type <= 0) { 439 msg_warn("%s: message rejected: missing end record", 440 message->queue_id); 441 break; 442 } 443 if (strchr(expected_rec_types, rec_type) == 0) { 444 msg_warn("Unexpected record type '%c' at offset %ld", 445 rec_type, (long) curr_offset); 446 rec_type = REC_TYPE_ERROR; 447 break; 448 } 449 if (rec_type == REC_TYPE_END) { 450 message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; 451 break; 452 } 453 454 /* 455 * Map named attributes to pseudo record types, so that we don't have 456 * to pollute the queue file with records that are incompatible with 457 * past Postfix versions. Preferably, people should be able to back 458 * out from an upgrade without losing mail. 459 */ 460 if (rec_type == REC_TYPE_ATTR) { 461 if ((error_text = split_nameval(start, &name, &value)) != 0) { 462 msg_warn("%s: bad attribute record: %s: %.200s", 463 message->queue_id, error_text, start); 464 rec_type = REC_TYPE_ERROR; 465 break; 466 } 467 if ((n = rec_attr_map(name)) != 0) { 468 start = value; 469 rec_type = n; 470 } 471 } 472 473 /* 474 * Process recipient records. 475 */ 476 if (rec_type == REC_TYPE_RCPT) { 477 /* See also below for code setting orig_rcpt etc. */ 478 if (message->rcpt_offset == 0) { 479 message->rcpt_unread--; 480 recipient_list_add(&message->rcpt_list, curr_offset, 481 dsn_orcpt ? dsn_orcpt : "", 482 dsn_notify ? dsn_notify : 0, 483 orig_rcpt ? orig_rcpt : "", start); 484 if (dsn_orcpt) { 485 myfree(dsn_orcpt); 486 dsn_orcpt = 0; 487 } 488 if (orig_rcpt) { 489 myfree(orig_rcpt); 490 orig_rcpt = 0; 491 } 492 if (dsn_notify) 493 dsn_notify = 0; 494 if (message->rcpt_list.len >= recipient_limit) { 495 if ((message->rcpt_offset = vstream_ftell(message->fp)) < 0) 496 msg_fatal("vstream_ftell %s: %m", 497 VSTREAM_PATH(message->fp)); 498 if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) 499 /* We already examined all non-recipient records. */ 500 break; 501 if (message->rflags & QMGR_READ_FLAG_MIXED_RCPT_OTHER) 502 /* Examine all remaining non-recipient records. */ 503 continue; 504 /* Optimizations for "pure recipient" record sections. */ 505 if (curr_offset > message->data_offset) { 506 /* We already examined all non-recipient records. */ 507 message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; 508 break; 509 } 510 511 /* 512 * Examine non-recipient records in the extracted 513 * segment. Note that this skips to the message start 514 * record, because the handler for that record changes 515 * the expectations for allowed record types. 516 */ 517 if (vstream_fseek(message->fp, message->data_offset, 518 SEEK_SET) < 0) 519 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 520 continue; 521 } 522 } 523 continue; 524 } 525 if (rec_type == REC_TYPE_DONE || rec_type == REC_TYPE_DRCP) { 526 if (message->rcpt_offset == 0) { 527 message->rcpt_unread--; 528 if (dsn_orcpt) { 529 myfree(dsn_orcpt); 530 dsn_orcpt = 0; 531 } 532 if (orig_rcpt) { 533 myfree(orig_rcpt); 534 orig_rcpt = 0; 535 } 536 if (dsn_notify) 537 dsn_notify = 0; 538 } 539 continue; 540 } 541 if (rec_type == REC_TYPE_DSN_ORCPT) { 542 /* See also above for code clearing dsn_orcpt. */ 543 if (dsn_orcpt != 0) { 544 msg_warn("%s: ignoring out-of-order DSN original recipient address <%.200s>", 545 message->queue_id, dsn_orcpt); 546 myfree(dsn_orcpt); 547 dsn_orcpt = 0; 548 } 549 if (message->rcpt_offset == 0) 550 dsn_orcpt = mystrdup(start); 551 continue; 552 } 553 if (rec_type == REC_TYPE_DSN_NOTIFY) { 554 /* See also above for code clearing dsn_notify. */ 555 if (dsn_notify != 0) { 556 msg_warn("%s: ignoring out-of-order DSN notify flags <%d>", 557 message->queue_id, dsn_notify); 558 dsn_notify = 0; 559 } 560 if (message->rcpt_offset == 0) { 561 if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_NOTIFY_OK(n)) 562 msg_warn("%s: ignoring malformed DSN notify flags <%.200s>", 563 message->queue_id, start); 564 else 565 dsn_notify = n; 566 continue; 567 } 568 } 569 if (rec_type == REC_TYPE_ORCP) { 570 /* See also above for code clearing orig_rcpt. */ 571 if (orig_rcpt != 0) { 572 msg_warn("%s: ignoring out-of-order original recipient <%.200s>", 573 message->queue_id, orig_rcpt); 574 myfree(orig_rcpt); 575 orig_rcpt = 0; 576 } 577 if (message->rcpt_offset == 0) 578 orig_rcpt = mystrdup(start); 579 continue; 580 } 581 582 /* 583 * Process non-recipient records. 584 */ 585 if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) 586 /* We already examined all non-recipient records. */ 587 continue; 588 if (rec_type == REC_TYPE_SIZE) { 589 if (message->data_offset == 0) { 590 if ((count = sscanf(start, "%ld %ld %d %d %ld %d", 591 &message->data_size, &message->data_offset, 592 &message->rcpt_unread, &message->rflags, 593 &message->cont_length, 594 &message->smtputf8)) >= 3) { 595 /* Postfix >= 1.0 (a.k.a. 20010228). */ 596 if (message->data_offset <= 0 || message->data_size <= 0) { 597 msg_warn("%s: invalid size record: %.100s", 598 message->queue_id, start); 599 rec_type = REC_TYPE_ERROR; 600 break; 601 } 602 if (message->rflags & ~QMGR_READ_FLAG_USER) { 603 msg_warn("%s: invalid flags in size record: %.100s", 604 message->queue_id, start); 605 rec_type = REC_TYPE_ERROR; 606 break; 607 } 608 } else if (count == 1) { 609 /* Postfix < 1.0 (a.k.a. 20010228). */ 610 qmgr_message_oldstyle_scan(message); 611 } else { 612 /* Can't happen. */ 613 msg_warn("%s: message rejected: weird size record", 614 message->queue_id); 615 rec_type = REC_TYPE_ERROR; 616 break; 617 } 618 } 619 /* Postfix < 2.4 compatibility. */ 620 if (message->cont_length == 0) { 621 message->cont_length = message->data_size; 622 } else if (message->cont_length < 0) { 623 msg_warn("%s: invalid size record: %.100s", 624 message->queue_id, start); 625 rec_type = REC_TYPE_ERROR; 626 break; 627 } 628 continue; 629 } 630 if (rec_type == REC_TYPE_TIME) { 631 if (message->arrival_time.tv_sec == 0) 632 REC_TYPE_TIME_SCAN(start, message->arrival_time); 633 continue; 634 } 635 if (rec_type == REC_TYPE_CTIME) { 636 if (message->create_time == 0) 637 message->create_time = atol(start); 638 continue; 639 } 640 if (rec_type == REC_TYPE_FILT) { 641 if (message->filter_xport != 0) 642 myfree(message->filter_xport); 643 message->filter_xport = mystrdup(start); 644 continue; 645 } 646 if (rec_type == REC_TYPE_INSP) { 647 if (message->inspect_xport != 0) 648 myfree(message->inspect_xport); 649 message->inspect_xport = mystrdup(start); 650 continue; 651 } 652 if (rec_type == REC_TYPE_RDR) { 653 if (message->redirect_addr != 0) 654 myfree(message->redirect_addr); 655 message->redirect_addr = mystrdup(start); 656 continue; 657 } 658 if (rec_type == REC_TYPE_FROM) { 659 if (message->sender == 0) { 660 message->sender = mystrdup(start); 661 opened(message->queue_id, message->sender, 662 message->cont_length, message->rcpt_unread, 663 "queue %s", message->queue_name); 664 } 665 continue; 666 } 667 if (rec_type == REC_TYPE_DSN_ENVID) { 668 /* Allow Milter override. */ 669 if (message->dsn_envid != 0) 670 myfree(message->dsn_envid); 671 message->dsn_envid = mystrdup(start); 672 } 673 if (rec_type == REC_TYPE_DSN_RET) { 674 /* Allow Milter override. */ 675 if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_RET_OK(n)) 676 msg_warn("%s: ignoring malformed DSN RET flags in queue file record:%.100s", 677 message->queue_id, start); 678 else 679 message->dsn_ret = n; 680 } 681 if (rec_type == REC_TYPE_ATTR) { 682 /* Allow extra segment to override envelope segment info. */ 683 if (strcmp(name, MAIL_ATTR_ENCODING) == 0) { 684 if (message->encoding != 0) 685 myfree(message->encoding); 686 message->encoding = mystrdup(value); 687 } 688 689 /* 690 * Backwards compatibility. Before Postfix 2.3, the logging 691 * attributes were called client_name, etc. Now they are called 692 * log_client_name. etc., and client_name is used for the actual 693 * client information. To support old queue files we accept both 694 * names for the purpose of logging; the new name overrides the 695 * old one. 696 * 697 * XXX Do not use the "legacy" client_name etc. attribute values for 698 * initializing the logging attributes, when this file already 699 * contains the "modern" log_client_name etc. logging attributes. 700 * Otherwise, logging attributes that are not present in the 701 * queue file would be set with information from the real client. 702 */ 703 else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_NAME) == 0) { 704 if (have_log_client_attr == 0 && message->client_name == 0) 705 message->client_name = mystrdup(value); 706 } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_ADDR) == 0) { 707 if (have_log_client_attr == 0 && message->client_addr == 0) 708 message->client_addr = mystrdup(value); 709 } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_PORT) == 0) { 710 if (have_log_client_attr == 0 && message->client_port == 0) 711 message->client_port = mystrdup(value); 712 } else if (strcmp(name, MAIL_ATTR_ACT_PROTO_NAME) == 0) { 713 if (have_log_client_attr == 0 && message->client_proto == 0) 714 message->client_proto = mystrdup(value); 715 } else if (strcmp(name, MAIL_ATTR_ACT_HELO_NAME) == 0) { 716 if (have_log_client_attr == 0 && message->client_helo == 0) 717 message->client_helo = mystrdup(value); 718 } 719 /* Original client attributes. */ 720 else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_NAME) == 0) { 721 if (message->client_name != 0) 722 myfree(message->client_name); 723 message->client_name = mystrdup(value); 724 have_log_client_attr = 1; 725 } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_ADDR) == 0) { 726 if (message->client_addr != 0) 727 myfree(message->client_addr); 728 message->client_addr = mystrdup(value); 729 have_log_client_attr = 1; 730 } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_PORT) == 0) { 731 if (message->client_port != 0) 732 myfree(message->client_port); 733 message->client_port = mystrdup(value); 734 have_log_client_attr = 1; 735 } else if (strcmp(name, MAIL_ATTR_LOG_PROTO_NAME) == 0) { 736 if (message->client_proto != 0) 737 myfree(message->client_proto); 738 message->client_proto = mystrdup(value); 739 have_log_client_attr = 1; 740 } else if (strcmp(name, MAIL_ATTR_LOG_HELO_NAME) == 0) { 741 if (message->client_helo != 0) 742 myfree(message->client_helo); 743 message->client_helo = mystrdup(value); 744 have_log_client_attr = 1; 745 } else if (strcmp(name, MAIL_ATTR_SASL_METHOD) == 0) { 746 if (message->sasl_method == 0) 747 message->sasl_method = mystrdup(value); 748 else 749 msg_warn("%s: ignoring multiple %s attribute: %s", 750 message->queue_id, MAIL_ATTR_SASL_METHOD, value); 751 } else if (strcmp(name, MAIL_ATTR_SASL_USERNAME) == 0) { 752 if (message->sasl_username == 0) 753 message->sasl_username = mystrdup(value); 754 else 755 msg_warn("%s: ignoring multiple %s attribute: %s", 756 message->queue_id, MAIL_ATTR_SASL_USERNAME, value); 757 } else if (strcmp(name, MAIL_ATTR_SASL_SENDER) == 0) { 758 if (message->sasl_sender == 0) 759 message->sasl_sender = mystrdup(value); 760 else 761 msg_warn("%s: ignoring multiple %s attribute: %s", 762 message->queue_id, MAIL_ATTR_SASL_SENDER, value); 763 } else if (strcmp(name, MAIL_ATTR_LOG_IDENT) == 0) { 764 if (message->log_ident == 0) 765 message->log_ident = mystrdup(value); 766 else 767 msg_warn("%s: ignoring multiple %s attribute: %s", 768 message->queue_id, MAIL_ATTR_LOG_IDENT, value); 769 } else if (strcmp(name, MAIL_ATTR_RWR_CONTEXT) == 0) { 770 if (message->rewrite_context == 0) 771 message->rewrite_context = mystrdup(value); 772 else 773 msg_warn("%s: ignoring multiple %s attribute: %s", 774 message->queue_id, MAIL_ATTR_RWR_CONTEXT, value); 775 } 776 777 /* 778 * Optional tracing flags (verify, sendmail -v, sendmail -bv). 779 * This record is killed after a trace logfile report is sent and 780 * after the logfile is deleted. 781 */ 782 else if (strcmp(name, MAIL_ATTR_TRACE_FLAGS) == 0) { 783 if (message->tflags == 0) { 784 message->tflags = DEL_REQ_TRACE_FLAGS(atoi(value)); 785 if (message->tflags == DEL_REQ_FLAG_RECORD) 786 message->tflags_offset = curr_offset; 787 else 788 message->tflags_offset = 0; 789 if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0) 790 qmgr_vrfy_pend_count++; 791 } 792 } 793 continue; 794 } 795 if (rec_type == REC_TYPE_WARN) { 796 if (message->warn_offset == 0) { 797 message->warn_offset = curr_offset; 798 REC_TYPE_WARN_SCAN(start, message->warn_time); 799 } 800 continue; 801 } 802 if (rec_type == REC_TYPE_VERP) { 803 if (message->verp_delims == 0) { 804 if (message->sender == 0 || message->sender[0] == 0) { 805 msg_warn("%s: ignoring VERP request for null sender", 806 message->queue_id); 807 } else if (verp_delims_verify(start) != 0) { 808 msg_warn("%s: ignoring bad VERP request: \"%.100s\"", 809 message->queue_id, start); 810 } else { 811 if (msg_verbose) 812 msg_info("%s: enabling VERP for sender \"%.100s\"", 813 message->queue_id, message->sender); 814 message->single_rcpt = 1; 815 message->verp_delims = mystrdup(start); 816 } 817 } 818 continue; 819 } 820 } 821 822 /* 823 * Grr. 824 */ 825 if (dsn_orcpt != 0) { 826 if (rec_type > 0) 827 msg_warn("%s: ignoring out-of-order DSN original recipient <%.200s>", 828 message->queue_id, dsn_orcpt); 829 myfree(dsn_orcpt); 830 } 831 if (orig_rcpt != 0) { 832 if (rec_type > 0) 833 msg_warn("%s: ignoring out-of-order original recipient <%.200s>", 834 message->queue_id, orig_rcpt); 835 myfree(orig_rcpt); 836 } 837 838 /* 839 * After sending a "delayed" warning, request sender notification when 840 * message delivery is completed. While "mail delayed" notifications are 841 * bad enough because they multiply the amount of email traffic, "delay 842 * cleared" notifications are even worse because they come in a sudden 843 * burst when the queue drains after a network outage. 844 */ 845 if (var_dsn_delay_cleared && message->warn_time < 0) 846 message->tflags |= DEL_REQ_FLAG_REC_DLY_SENT; 847 848 /* 849 * Remember when we have read the last recipient batch. Note that we do 850 * it here after reading as reading might have used considerable amount 851 * of time. 852 */ 853 message->refill_time = sane_time(); 854 855 /* 856 * Avoid clumsiness elsewhere in the program. When sending data across an 857 * IPC channel, sending an empty string is more convenient than sending a 858 * null pointer. 859 */ 860 if (message->dsn_envid == 0) 861 message->dsn_envid = mystrdup(""); 862 if (message->encoding == 0) 863 message->encoding = mystrdup(MAIL_ATTR_ENC_NONE); 864 if (message->client_name == 0) 865 message->client_name = mystrdup(""); 866 if (message->client_addr == 0) 867 message->client_addr = mystrdup(""); 868 if (message->client_port == 0) 869 message->client_port = mystrdup(""); 870 if (message->client_proto == 0) 871 message->client_proto = mystrdup(""); 872 if (message->client_helo == 0) 873 message->client_helo = mystrdup(""); 874 if (message->sasl_method == 0) 875 message->sasl_method = mystrdup(""); 876 if (message->sasl_username == 0) 877 message->sasl_username = mystrdup(""); 878 if (message->sasl_sender == 0) 879 message->sasl_sender = mystrdup(""); 880 if (message->log_ident == 0) 881 message->log_ident = mystrdup(""); 882 if (message->rewrite_context == 0) 883 message->rewrite_context = mystrdup(MAIL_ATTR_RWR_LOCAL); 884 /* Postfix < 2.3 compatibility. */ 885 if (message->create_time == 0) 886 message->create_time = message->arrival_time.tv_sec; 887 888 /* 889 * Clean up. 890 */ 891 vstring_free(buf); 892 893 /* 894 * Sanity checks. Verify that all required information was found, 895 * including the queue file end marker. 896 */ 897 if (message->rcpt_unread < 0 898 || (message->rcpt_offset == 0 && message->rcpt_unread != 0)) { 899 msg_warn("%s: rcpt count mismatch (%d)", 900 message->queue_id, message->rcpt_unread); 901 message->rcpt_unread = 0; 902 } 903 if (rec_type <= 0) { 904 /* Already logged warning. */ 905 } else if (message->arrival_time.tv_sec == 0) { 906 msg_warn("%s: message rejected: missing arrival time record", 907 message->queue_id); 908 } else if (message->sender == 0) { 909 msg_warn("%s: message rejected: missing sender record", 910 message->queue_id); 911 } else if (message->data_offset == 0) { 912 msg_warn("%s: message rejected: missing size record", 913 message->queue_id); 914 } else { 915 return (0); 916 } 917 message->rcpt_offset = save_offset; /* restore flag */ 918 message->rcpt_unread = save_unread; /* restore count */ 919 recipient_list_free(&message->rcpt_list); 920 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); 921 return (-1); 922 } 923 924 /* qmgr_message_update_warn - update the time of next delay warning */ 925 926 void qmgr_message_update_warn(QMGR_MESSAGE *message) 927 { 928 929 /* 930 * After the "mail delayed" warning, optionally send a "delay cleared" 931 * notification. 932 */ 933 if (qmgr_message_open(message) 934 || vstream_fseek(message->fp, message->warn_offset, SEEK_SET) < 0 935 || rec_fprintf(message->fp, REC_TYPE_WARN, REC_TYPE_WARN_FORMAT, 936 REC_TYPE_WARN_ARG(-1)) < 0 937 || vstream_fflush(message->fp)) 938 msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); 939 qmgr_message_close(message); 940 } 941 942 /* qmgr_message_kill_record - mark one message record as killed */ 943 944 void qmgr_message_kill_record(QMGR_MESSAGE *message, long offset) 945 { 946 if (offset <= 0) 947 msg_panic("qmgr_message_kill_record: bad offset 0x%lx", offset); 948 if (qmgr_message_open(message) 949 || rec_put_type(message->fp, REC_TYPE_KILL, offset) < 0 950 || vstream_fflush(message->fp)) 951 msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); 952 qmgr_message_close(message); 953 } 954 955 /* qmgr_message_sort_compare - compare recipient information */ 956 957 static int qmgr_message_sort_compare(const void *p1, const void *p2) 958 { 959 RECIPIENT *rcpt1 = (RECIPIENT *) p1; 960 RECIPIENT *rcpt2 = (RECIPIENT *) p2; 961 QMGR_QUEUE *queue1; 962 QMGR_QUEUE *queue2; 963 char *at1; 964 char *at2; 965 int result; 966 967 /* 968 * Compare most significant to least significant recipient attributes. 969 * The comparison function must be transitive, so NULL values need to be 970 * assigned an ordinal (we set NULL last). 971 */ 972 973 queue1 = rcpt1->u.queue; 974 queue2 = rcpt2->u.queue; 975 if (queue1 != 0 && queue2 == 0) 976 return (-1); 977 if (queue1 == 0 && queue2 != 0) 978 return (1); 979 if (queue1 != 0 && queue2 != 0) { 980 981 /* 982 * Compare message transport. 983 */ 984 if ((result = strcmp(queue1->transport->name, 985 queue2->transport->name)) != 0) 986 return (result); 987 988 /* 989 * Compare queue name (nexthop or recipient@nexthop). 990 */ 991 if ((result = strcmp(queue1->name, queue2->name)) != 0) 992 return (result); 993 } 994 995 /* 996 * Compare recipient domain. 997 */ 998 at1 = strrchr(rcpt1->address, '@'); 999 at2 = strrchr(rcpt2->address, '@'); 1000 if (at1 == 0 && at2 != 0) 1001 return (1); 1002 if (at1 != 0 && at2 == 0) 1003 return (-1); 1004 if (at1 != 0 && at2 != 0 1005 && (result = strcasecmp_utf8(at1, at2)) != 0) 1006 return (result); 1007 1008 /* 1009 * Compare recipient address. 1010 */ 1011 return (strcmp(rcpt1->address, rcpt2->address)); 1012 } 1013 1014 /* qmgr_message_sort - sort message recipient addresses by domain */ 1015 1016 static void qmgr_message_sort(QMGR_MESSAGE *message) 1017 { 1018 qsort((void *) message->rcpt_list.info, message->rcpt_list.len, 1019 sizeof(message->rcpt_list.info[0]), qmgr_message_sort_compare); 1020 if (msg_verbose) { 1021 RECIPIENT_LIST list = message->rcpt_list; 1022 RECIPIENT *rcpt; 1023 1024 msg_info("start sorted recipient list"); 1025 for (rcpt = list.info; rcpt < list.info + list.len; rcpt++) 1026 msg_info("qmgr_message_sort: %s", rcpt->address); 1027 msg_info("end sorted recipient list"); 1028 } 1029 } 1030 1031 /* qmgr_resolve_one - resolve or skip one recipient */ 1032 1033 static int qmgr_resolve_one(QMGR_MESSAGE *message, RECIPIENT *recipient, 1034 const char *addr, RESOLVE_REPLY *reply) 1035 { 1036 #define QMGR_REDIRECT(rp, tp, np) do { \ 1037 (rp)->flags = 0; \ 1038 vstring_strcpy((rp)->transport, (tp)); \ 1039 vstring_strcpy((rp)->nexthop, (np)); \ 1040 } while (0) 1041 1042 if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) == 0) 1043 resolve_clnt_query_from(message->sender, addr, reply); 1044 else 1045 resolve_clnt_verify_from(message->sender, addr, reply); 1046 if (reply->flags & RESOLVE_FLAG_FAIL) { 1047 QMGR_REDIRECT(reply, MAIL_SERVICE_RETRY, 1048 "4.3.0 address resolver failure"); 1049 return (0); 1050 } else if (reply->flags & RESOLVE_FLAG_ERROR) { 1051 QMGR_REDIRECT(reply, MAIL_SERVICE_ERROR, 1052 "5.1.3 bad address syntax"); 1053 return (0); 1054 } else { 1055 return (0); 1056 } 1057 } 1058 1059 /* qmgr_message_resolve - resolve recipients */ 1060 1061 static void qmgr_message_resolve(QMGR_MESSAGE *message) 1062 { 1063 static ARGV *defer_xport_argv; 1064 RECIPIENT_LIST list = message->rcpt_list; 1065 RECIPIENT *recipient; 1066 QMGR_TRANSPORT *transport = 0; 1067 QMGR_QUEUE *queue = 0; 1068 RESOLVE_REPLY reply; 1069 VSTRING *queue_name; 1070 char *at; 1071 char **cpp; 1072 char *nexthop; 1073 ssize_t len; 1074 int status; 1075 DSN dsn; 1076 MSG_STATS stats; 1077 DSN *saved_dsn; 1078 1079 #define STREQ(x,y) (strcmp(x,y) == 0) 1080 #define STR vstring_str 1081 #define LEN VSTRING_LEN 1082 1083 resolve_clnt_init(&reply); 1084 queue_name = vstring_alloc(1); 1085 for (recipient = list.info; recipient < list.info + list.len; recipient++) { 1086 1087 /* 1088 * Redirect overrides all else. But only once (per entire message). 1089 * For consistency with the remainder of Postfix, rewrite the address 1090 * to canonical form before resolving it. 1091 */ 1092 if (message->redirect_addr) { 1093 if (recipient > list.info) { 1094 recipient->u.queue = 0; 1095 continue; 1096 } 1097 message->rcpt_offset = 0; 1098 message->rcpt_unread = 0; 1099 1100 rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr, 1101 reply.recipient); 1102 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); 1103 if (qmgr_resolve_one(message, recipient, 1104 recipient->address, &reply) < 0) 1105 continue; 1106 if (!STREQ(recipient->address, STR(reply.recipient))) 1107 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); 1108 } 1109 1110 /* 1111 * Content filtering overrides the address resolver. 1112 * 1113 * XXX Bypass content_filter inspection for user-generated probes 1114 * (sendmail -bv). MTA-generated probes never have the "please filter 1115 * me" bits turned on, but we handle them here anyway for the sake of 1116 * future proofing. 1117 */ 1118 #define FILTER_WITHOUT_NEXTHOP(filter, next) \ 1119 (((next) = split_at((filter), ':')) == 0 || *(next) == 0) 1120 1121 #define RCPT_WITHOUT_DOMAIN(rcpt, next) \ 1122 ((next = strrchr(rcpt, '@')) == 0 || *++(next) == 0) 1123 1124 else if (message->filter_xport 1125 && (message->tflags & DEL_REQ_TRACE_ONLY_MASK) == 0) { 1126 reply.flags = 0; 1127 vstring_strcpy(reply.transport, message->filter_xport); 1128 if (FILTER_WITHOUT_NEXTHOP(STR(reply.transport), nexthop) 1129 && *(nexthop = var_def_filter_nexthop) == 0 1130 && RCPT_WITHOUT_DOMAIN(recipient->address, nexthop)) 1131 nexthop = var_myhostname; 1132 vstring_strcpy(reply.nexthop, nexthop); 1133 vstring_strcpy(reply.recipient, recipient->address); 1134 } 1135 1136 /* 1137 * Resolve the destination to (transport, nexthop, address). The 1138 * result address may differ from the one specified by the sender. 1139 */ 1140 else { 1141 if (qmgr_resolve_one(message, recipient, 1142 recipient->address, &reply) < 0) 1143 continue; 1144 if (!STREQ(recipient->address, STR(reply.recipient))) 1145 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); 1146 } 1147 1148 /* 1149 * Bounce null recipients. This should never happen, but is most 1150 * likely the result of a fault in a different program, so aborting 1151 * the queue manager process does not help. 1152 */ 1153 if (recipient->address[0] == 0) { 1154 QMGR_REDIRECT(&reply, MAIL_SERVICE_ERROR, 1155 "5.1.3 null recipient address"); 1156 } 1157 1158 /* 1159 * Redirect a forced-to-expire message without defer log to the retry 1160 * service, so that its defer log will contain an appropriate reason. 1161 * Do not redirect such a message to the error service, because if 1162 * that request fails, a defer log would be created with reason 1163 * "bounce or trace service failure" which would make no sense. Note 1164 * that if the bounce service fails to create a defer log, the 1165 * message will be returned as undeliverable anyway, because it is 1166 * expired. 1167 */ 1168 if ((message->qflags & QMGR_FORCE_EXPIRE) != 0) { 1169 QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, 1170 "4.7.0 message is administratively expired"); 1171 } 1172 1173 /* 1174 * Discard mail to the local double bounce address here, so this 1175 * system can run without a local delivery agent. They'd still have 1176 * to configure something for mail directed to the local postmaster, 1177 * though, but that is an RFC requirement anyway. 1178 * 1179 * XXX This lookup should be done in the resolver, and the mail should 1180 * be directed to a general-purpose null delivery agent. 1181 */ 1182 if (reply.flags & RESOLVE_CLASS_LOCAL) { 1183 at = strrchr(STR(reply.recipient), '@'); 1184 len = (at ? (at - STR(reply.recipient)) 1185 : strlen(STR(reply.recipient))); 1186 if (strncasecmp_utf8(STR(reply.recipient), 1187 var_double_bounce_sender, len) == 0 1188 && !var_double_bounce_sender[len]) { 1189 status = sent(message->tflags, message->queue_id, 1190 QMGR_MSG_STATS(&stats, message), recipient, 1191 "none", DSN_SIMPLE(&dsn, "2.0.0", 1192 "undeliverable postmaster notification discarded")); 1193 if (status == 0) { 1194 deliver_completed(message->fp, recipient->offset); 1195 #if 0 1196 /* It's the default verification probe sender address. */ 1197 msg_warn("%s: undeliverable postmaster notification discarded", 1198 message->queue_id); 1199 #endif 1200 } else 1201 message->flags |= status; 1202 continue; 1203 } 1204 } 1205 1206 /* 1207 * Optionally defer deliveries over specific transports, unless the 1208 * restriction is lifted temporarily. 1209 */ 1210 if (*var_defer_xports && (message->qflags & QMGR_FLUSH_DFXP) == 0) { 1211 if (defer_xport_argv == 0) 1212 defer_xport_argv = argv_split(var_defer_xports, CHARS_COMMA_SP); 1213 for (cpp = defer_xport_argv->argv; *cpp; cpp++) 1214 if (strcmp(*cpp, STR(reply.transport)) == 0) 1215 break; 1216 if (*cpp) { 1217 QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, 1218 "4.3.2 deferred transport"); 1219 } 1220 } 1221 1222 /* 1223 * Safety: defer excess address verification requests. 1224 */ 1225 if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0 1226 && qmgr_vrfy_pend_count > var_vrfy_pend_limit) 1227 QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, 1228 "4.3.2 Too many address verification requests"); 1229 1230 /* 1231 * Look up or instantiate the proper transport. 1232 */ 1233 if (transport == 0 || !STREQ(transport->name, STR(reply.transport))) { 1234 if ((transport = qmgr_transport_find(STR(reply.transport))) == 0) 1235 transport = qmgr_transport_create(STR(reply.transport)); 1236 queue = 0; 1237 } 1238 1239 /* 1240 * This message is being flushed. If need-be unthrottle the 1241 * transport. 1242 */ 1243 if ((message->qflags & QMGR_FLUSH_EACH) != 0 1244 && QMGR_TRANSPORT_THROTTLED(transport)) 1245 qmgr_transport_unthrottle(transport); 1246 1247 /* 1248 * This transport is dead. Defer delivery to this recipient. 1249 */ 1250 if (QMGR_TRANSPORT_THROTTLED(transport)) { 1251 saved_dsn = transport->dsn; 1252 if ((transport = qmgr_error_transport(MAIL_SERVICE_RETRY)) != 0) { 1253 nexthop = qmgr_error_nexthop(saved_dsn); 1254 vstring_strcpy(reply.nexthop, nexthop); 1255 myfree(nexthop); 1256 queue = 0; 1257 } else { 1258 qmgr_defer_recipient(message, recipient, saved_dsn); 1259 continue; 1260 } 1261 } 1262 1263 /* 1264 * The nexthop destination provides the default name for the 1265 * per-destination queue. When the delivery agent accepts only one 1266 * recipient per delivery, give each recipient its own queue, so that 1267 * deliveries to different recipients of the same message can happen 1268 * in parallel, and so that we can enforce per-recipient concurrency 1269 * limits and prevent one recipient from tying up all the delivery 1270 * agent resources. We use recipient@nexthop as queue name rather 1271 * than the actual recipient domain name, so that one recipient in 1272 * multiple equivalent domains cannot evade the per-recipient 1273 * concurrency limit. Split the address on the recipient delimiter if 1274 * one is defined, so that extended addresses don't get extra 1275 * delivery slots. 1276 * 1277 * Fold the result to lower case so that we don't have multiple queues 1278 * for the same name. 1279 * 1280 * Important! All recipients in a queue must have the same nexthop 1281 * value. It is OK to have multiple queues with the same nexthop 1282 * value, but only when those queues are named after recipients. 1283 * 1284 * The single-recipient code below was written for local(8) like 1285 * delivery agents, and assumes that all domains that deliver to the 1286 * same (transport + nexthop) are aliases for $nexthop. Delivery 1287 * concurrency is changed from per-domain into per-recipient, by 1288 * changing the queue name from nexthop into localpart@nexthop. 1289 * 1290 * XXX This assumption is incorrect when different destinations share 1291 * the same (transport + nexthop). In reality, such transports are 1292 * rarely configured to use single-recipient deliveries. The fix is 1293 * to decouple the per-destination recipient limit from the 1294 * per-destination concurrency. 1295 */ 1296 vstring_strcpy(queue_name, STR(reply.nexthop)); 1297 if (strcmp(transport->name, MAIL_SERVICE_ERROR) != 0 1298 && strcmp(transport->name, MAIL_SERVICE_RETRY) != 0 1299 && transport->recipient_limit == 1) { 1300 /* Copy the recipient localpart. */ 1301 at = strrchr(STR(reply.recipient), '@'); 1302 len = (at ? (at - STR(reply.recipient)) 1303 : strlen(STR(reply.recipient))); 1304 vstring_strncpy(queue_name, STR(reply.recipient), len); 1305 /* Remove the address extension from the recipient localpart. */ 1306 if (*var_rcpt_delim && split_addr(STR(queue_name), var_rcpt_delim)) 1307 vstring_truncate(queue_name, strlen(STR(queue_name))); 1308 /* Assume the recipient domain is equivalent to nexthop. */ 1309 vstring_sprintf_append(queue_name, "@%s", STR(reply.nexthop)); 1310 } 1311 lowercase(STR(queue_name)); 1312 1313 /* 1314 * This transport is alive. Find or instantiate a queue for this 1315 * recipient. 1316 */ 1317 if (queue == 0 || !STREQ(queue->name, STR(queue_name))) { 1318 if ((queue = qmgr_queue_find(transport, STR(queue_name))) == 0) 1319 queue = qmgr_queue_create(transport, STR(queue_name), 1320 STR(reply.nexthop)); 1321 } 1322 1323 /* 1324 * This message is being flushed. If need-be unthrottle the queue. 1325 */ 1326 if ((message->qflags & QMGR_FLUSH_EACH) != 0 1327 && QMGR_QUEUE_THROTTLED(queue)) 1328 qmgr_queue_unthrottle(queue); 1329 1330 /* 1331 * This queue is dead. Defer delivery to this recipient. 1332 */ 1333 if (QMGR_QUEUE_THROTTLED(queue)) { 1334 saved_dsn = queue->dsn; 1335 if ((queue = qmgr_error_queue(MAIL_SERVICE_RETRY, saved_dsn)) == 0) { 1336 qmgr_defer_recipient(message, recipient, saved_dsn); 1337 continue; 1338 } 1339 } 1340 1341 /* 1342 * This queue is alive. Bind this recipient to this queue instance. 1343 */ 1344 recipient->u.queue = queue; 1345 } 1346 resolve_clnt_free(&reply); 1347 vstring_free(queue_name); 1348 } 1349 1350 /* qmgr_message_assign - assign recipients to specific delivery requests */ 1351 1352 static void qmgr_message_assign(QMGR_MESSAGE *message) 1353 { 1354 RECIPIENT_LIST list = message->rcpt_list; 1355 RECIPIENT *recipient; 1356 QMGR_ENTRY *entry = 0; 1357 QMGR_QUEUE *queue; 1358 QMGR_JOB *job = 0; 1359 QMGR_PEER *peer = 0; 1360 1361 /* 1362 * Try to bundle as many recipients in a delivery request as we can. When 1363 * the recipient resolves to the same site and transport as an existing 1364 * recipient, do not create a new queue entry, just move that recipient 1365 * to the recipient list of the existing queue entry. All this provided 1366 * that we do not exceed the transport-specific limit on the number of 1367 * recipients per transaction. 1368 */ 1369 #define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit))) 1370 1371 for (recipient = list.info; recipient < list.info + list.len; recipient++) { 1372 1373 /* 1374 * Skip recipients with a dead transport or destination. 1375 */ 1376 if ((queue = recipient->u.queue) == 0) 1377 continue; 1378 1379 /* 1380 * Lookup or instantiate the message job if necessary. 1381 */ 1382 if (job == 0 || queue->transport != job->transport) { 1383 job = qmgr_job_obtain(message, queue->transport); 1384 peer = 0; 1385 } 1386 1387 /* 1388 * Lookup or instantiate job peer if necessary. 1389 */ 1390 if (peer == 0 || queue != peer->queue) 1391 peer = qmgr_peer_obtain(job, queue); 1392 1393 /* 1394 * Lookup old or instantiate new recipient entry. We try to reuse the 1395 * last existing entry whenever the recipient limit permits. 1396 */ 1397 entry = peer->entry_list.prev; 1398 if (message->single_rcpt || entry == 0 1399 || !LIMIT_OK(queue->transport->recipient_limit, entry->rcpt_list.len)) 1400 entry = qmgr_entry_create(peer, message); 1401 1402 /* 1403 * Add the recipient to the current entry and increase all those 1404 * recipient counters accordingly. 1405 */ 1406 recipient_list_add(&entry->rcpt_list, recipient->offset, 1407 recipient->dsn_orcpt, recipient->dsn_notify, 1408 recipient->orig_addr, recipient->address); 1409 job->rcpt_count++; 1410 message->rcpt_count++; 1411 qmgr_recipient_count++; 1412 } 1413 1414 /* 1415 * Release the message recipient list and reinitialize it for the next 1416 * time. 1417 */ 1418 recipient_list_free(&message->rcpt_list); 1419 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); 1420 1421 /* 1422 * Note that even if qmgr_job_obtain() reset the job candidate cache of 1423 * all transports to which we assigned new recipients, this message may 1424 * have other jobs which we didn't touch at all this time. But the number 1425 * of unread recipients affecting the candidate selection might have 1426 * changed considerably, so we must invalidate the caches if it might be 1427 * of some use. 1428 */ 1429 for (job = message->job_list.next; job; job = job->message_peers.next) 1430 if (job->selected_entries < job->read_entries 1431 && job->blocker_tag != job->transport->blocker_tag) 1432 job->transport->candidate_cache_current = 0; 1433 } 1434 1435 /* qmgr_message_move_limits - recycle unused recipient slots */ 1436 1437 static void qmgr_message_move_limits(QMGR_MESSAGE *message) 1438 { 1439 QMGR_JOB *job; 1440 1441 for (job = message->job_list.next; job; job = job->message_peers.next) 1442 qmgr_job_move_limits(job); 1443 } 1444 1445 /* qmgr_message_free - release memory for in-core message structure */ 1446 1447 void qmgr_message_free(QMGR_MESSAGE *message) 1448 { 1449 QMGR_JOB *job; 1450 1451 if (message->refcount != 0) 1452 msg_panic("qmgr_message_free: reference len: %d", message->refcount); 1453 if (message->fp) 1454 msg_panic("qmgr_message_free: queue file is open"); 1455 while ((job = message->job_list.next) != 0) 1456 qmgr_job_free(job); 1457 myfree(message->queue_id); 1458 myfree(message->queue_name); 1459 if (message->dsn_envid) 1460 myfree(message->dsn_envid); 1461 if (message->encoding) 1462 myfree(message->encoding); 1463 if (message->sender) 1464 myfree(message->sender); 1465 if (message->verp_delims) 1466 myfree(message->verp_delims); 1467 if (message->filter_xport) 1468 myfree(message->filter_xport); 1469 if (message->inspect_xport) 1470 myfree(message->inspect_xport); 1471 if (message->redirect_addr) 1472 myfree(message->redirect_addr); 1473 if (message->client_name) 1474 myfree(message->client_name); 1475 if (message->client_addr) 1476 myfree(message->client_addr); 1477 if (message->client_port) 1478 myfree(message->client_port); 1479 if (message->client_proto) 1480 myfree(message->client_proto); 1481 if (message->client_helo) 1482 myfree(message->client_helo); 1483 if (message->sasl_method) 1484 myfree(message->sasl_method); 1485 if (message->sasl_username) 1486 myfree(message->sasl_username); 1487 if (message->sasl_sender) 1488 myfree(message->sasl_sender); 1489 if (message->log_ident) 1490 myfree(message->log_ident); 1491 if (message->rewrite_context) 1492 myfree(message->rewrite_context); 1493 recipient_list_free(&message->rcpt_list); 1494 qmgr_message_count--; 1495 if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0) 1496 qmgr_vrfy_pend_count--; 1497 myfree((void *) message); 1498 } 1499 1500 /* qmgr_message_alloc - create in-core message structure */ 1501 1502 QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id, 1503 int qflags, mode_t mode) 1504 { 1505 const char *myname = "qmgr_message_alloc"; 1506 QMGR_MESSAGE *message; 1507 struct stat st; 1508 1509 if (msg_verbose) 1510 msg_info("%s: %s %s", myname, queue_name, queue_id); 1511 1512 /* 1513 * Create an in-core message structure. 1514 */ 1515 message = qmgr_message_create(queue_name, queue_id, qflags); 1516 1517 /* 1518 * Extract message envelope information: time of arrival, sender address, 1519 * recipient addresses. Skip files with malformed envelope information. 1520 */ 1521 #define QMGR_LOCK_MODE (MYFLOCK_OP_EXCLUSIVE | MYFLOCK_OP_NOWAIT) 1522 1523 if (qmgr_message_open(message) < 0) { 1524 qmgr_message_free(message); 1525 return (0); 1526 } 1527 if (myflock(vstream_fileno(message->fp), INTERNAL_LOCK, QMGR_LOCK_MODE) < 0) { 1528 msg_info("%s: skipped, still being delivered", queue_id); 1529 qmgr_message_close(message); 1530 qmgr_message_free(message); 1531 return (QMGR_MESSAGE_LOCKED); 1532 } 1533 if (qmgr_message_read(message) < 0) { 1534 qmgr_message_close(message); 1535 qmgr_message_free(message); 1536 return (0); 1537 } else { 1538 1539 /* 1540 * We have validated the queue file content, so it is safe to modify 1541 * the file properties now. 1542 */ 1543 if (mode != 0 && fchmod(vstream_fileno(message->fp), mode) < 0) 1544 msg_fatal("fchmod %s: %m", VSTREAM_PATH(message->fp)); 1545 1546 /* 1547 * If this message is forced to expire, use the existing defer 1548 * logfile records and do not assign any deliveries, leaving the 1549 * refcount at zero. If this message is forced to expire, but no 1550 * defer logfile records are available, assign deliveries to the 1551 * retry transport so that the sender will still find out what 1552 * recipients are affected and why. Either way, do not assign normal 1553 * deliveries because that would be undesirable especially with mail 1554 * that was expired in the 'hold' queue. 1555 */ 1556 if ((message->qflags & QMGR_FORCE_EXPIRE) != 0 1557 && stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_DEFER, 1558 queue_id), &st) == 0 && st.st_size > 0) { 1559 /* Use this defer log; don't assign deliveries (refcount == 0). */ 1560 message->flags = 1; /* simplify downstream code */ 1561 qmgr_message_close(message); 1562 return (message); 1563 } 1564 1565 /* 1566 * Reset the defer log. This code should not be here, but we must 1567 * reset the defer log *after* acquiring the exclusive lock on the 1568 * queue file and *before* resolving new recipients. Since all those 1569 * operations are encapsulated so nicely by this routine, the defer 1570 * log reset has to be done here as well. 1571 * 1572 * Note: it is safe to remove the defer logfile from a previous queue 1573 * run of this queue file, because the defer log contains information 1574 * about recipients that still exist in this queue file. 1575 */ 1576 if (mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT) 1577 msg_fatal("%s: %s: remove %s %s: %m", myname, 1578 queue_id, MAIL_QUEUE_DEFER, queue_id); 1579 qmgr_message_sort(message); 1580 qmgr_message_resolve(message); 1581 qmgr_message_sort(message); 1582 qmgr_message_assign(message); 1583 qmgr_message_close(message); 1584 if (message->rcpt_offset == 0) 1585 qmgr_message_move_limits(message); 1586 return (message); 1587 } 1588 } 1589 1590 /* qmgr_message_realloc - refresh in-core message structure */ 1591 1592 QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *message) 1593 { 1594 const char *myname = "qmgr_message_realloc"; 1595 1596 /* 1597 * Sanity checks. 1598 */ 1599 if (message->rcpt_offset <= 0) 1600 msg_panic("%s: invalid offset: %ld", myname, message->rcpt_offset); 1601 if (msg_verbose) 1602 msg_info("%s: %s %s offset %ld", myname, message->queue_name, 1603 message->queue_id, message->rcpt_offset); 1604 1605 /* 1606 * Extract recipient addresses. Skip files with malformed envelope 1607 * information. 1608 */ 1609 if (qmgr_message_open(message) < 0) 1610 return (0); 1611 if (qmgr_message_read(message) < 0) { 1612 qmgr_message_close(message); 1613 return (0); 1614 } else { 1615 qmgr_message_sort(message); 1616 qmgr_message_resolve(message); 1617 qmgr_message_sort(message); 1618 qmgr_message_assign(message); 1619 qmgr_message_close(message); 1620 if (message->rcpt_offset == 0) 1621 qmgr_message_move_limits(message); 1622 return (message); 1623 } 1624 } 1625