1 /* $NetBSD: qmgr_message.c,v 1.1.1.4 2014/07/06 19:27:55 tron Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmgr_message 3 6 /* SUMMARY 7 /* in-core message structures 8 /* SYNOPSIS 9 /* #include "qmgr.h" 10 /* 11 /* int qmgr_message_count; 12 /* int qmgr_recipient_count; 13 /* 14 /* QMGR_MESSAGE *qmgr_message_alloc(class, name, qflags, mode) 15 /* const char *class; 16 /* const char *name; 17 /* int qflags; 18 /* mode_t mode; 19 /* 20 /* QMGR_MESSAGE *qmgr_message_realloc(message) 21 /* QMGR_MESSAGE *message; 22 /* 23 /* void qmgr_message_free(message) 24 /* QMGR_MESSAGE *message; 25 /* 26 /* void qmgr_message_update_warn(message) 27 /* QMGR_MESSAGE *message; 28 /* 29 /* void qmgr_message_kill_record(message, offset) 30 /* QMGR_MESSAGE *message; 31 /* long offset; 32 /* DESCRIPTION 33 /* This module performs en-gross operations on queue messages. 34 /* 35 /* qmgr_message_count is a global counter for the total number 36 /* of in-core message structures (i.e. the total size of the 37 /* `active' message queue). 38 /* 39 /* qmgr_recipient_count is a global counter for the total number 40 /* of in-core recipient structures (i.e. the sum of all recipients 41 /* in all in-core message structures). 42 /* 43 /* qmgr_message_alloc() creates an in-core message structure 44 /* with sender and recipient information taken from the named queue 45 /* file. A null result means the queue file could not be read or 46 /* that the queue file contained incorrect information. A result 47 /* QMGR_MESSAGE_LOCKED means delivery must be deferred. The number 48 /* of recipients read from a queue file is limited by the global 49 /* var_qmgr_rcpt_limit configuration parameter. When the limit 50 /* is reached, the \fIrcpt_offset\fR structure member is set to 51 /* the position where the read was terminated. Recipients are 52 /* run through the resolver, and are assigned to destination 53 /* queues. Recipients that cannot be assigned are deferred or 54 /* bounced. Mail that has bounced twice is silently absorbed. 55 /* A non-zero mode means change the queue file permissions. 56 /* 57 /* qmgr_message_realloc() resumes reading recipients from the queue 58 /* file, and updates the recipient list and \fIrcpt_offset\fR message 59 /* structure members. A null result means that the file could not be 60 /* read or that the file contained incorrect information. Recipient 61 /* limit imposed this time is based on the position of the message 62 /* job(s) on corresponding transport job list(s). It's considered 63 /* an error to call this when the recipient slots can't be allocated. 64 /* 65 /* qmgr_message_free() destroys an in-core message structure and makes 66 /* the resources available for reuse. It is an error to destroy 67 /* a message structure that is still referenced by queue entry structures. 68 /* 69 /* qmgr_message_update_warn() takes a closed message, opens it, updates 70 /* the warning field, and closes it again. 71 /* 72 /* qmgr_message_kill_record() takes a closed message, opens it, updates 73 /* the record type at the given offset to "killed", and closes the file. 74 /* A killed envelope record is ignored. Killed records are not allowed 75 /* inside the message content. 76 /* DIAGNOSTICS 77 /* Warnings: malformed message file. Fatal errors: out of memory. 78 /* SEE ALSO 79 /* envelope(3) message envelope parser 80 /* LICENSE 81 /* .ad 82 /* .fi 83 /* The Secure Mailer license must be distributed with this software. 84 /* AUTHOR(S) 85 /* Wietse Venema 86 /* IBM T.J. Watson Research 87 /* P.O. Box 704 88 /* Yorktown Heights, NY 10598, USA 89 /* 90 /* Preemptive scheduler enhancements: 91 /* Patrik Rak 92 /* Modra 6 93 /* 155 00, Prague, Czech Republic 94 /*--*/ 95 96 /* System library. */ 97 98 #include <sys_defs.h> 99 #include <sys/stat.h> 100 #include <stdlib.h> 101 #include <stdio.h> /* sscanf() */ 102 #include <fcntl.h> 103 #include <errno.h> 104 #include <unistd.h> 105 #include <string.h> 106 #include <ctype.h> 107 108 #ifdef STRCASECMP_IN_STRINGS_H 109 #include <strings.h> 110 #endif 111 112 /* Utility library. */ 113 114 #include <msg.h> 115 #include <mymalloc.h> 116 #include <vstring.h> 117 #include <vstream.h> 118 #include <split_at.h> 119 #include <valid_hostname.h> 120 #include <argv.h> 121 #include <stringops.h> 122 #include <myflock.h> 123 #include <sane_time.h> 124 125 /* Global library. */ 126 127 #include <dict.h> 128 #include <mail_queue.h> 129 #include <mail_params.h> 130 #include <canon_addr.h> 131 #include <record.h> 132 #include <rec_type.h> 133 #include <sent.h> 134 #include <deliver_completed.h> 135 #include <opened.h> 136 #include <verp_sender.h> 137 #include <mail_proto.h> 138 #include <qmgr_user.h> 139 #include <split_addr.h> 140 #include <dsn_mask.h> 141 #include <rec_attr_map.h> 142 143 /* Client stubs. */ 144 145 #include <rewrite_clnt.h> 146 #include <resolve_clnt.h> 147 148 /* Application-specific. */ 149 150 #include "qmgr.h" 151 152 int qmgr_message_count; 153 int qmgr_recipient_count; 154 155 /* qmgr_message_create - create in-core message structure */ 156 157 static QMGR_MESSAGE *qmgr_message_create(const char *queue_name, 158 const char *queue_id, int qflags) 159 { 160 QMGR_MESSAGE *message; 161 162 message = (QMGR_MESSAGE *) mymalloc(sizeof(QMGR_MESSAGE)); 163 qmgr_message_count++; 164 message->flags = 0; 165 message->qflags = qflags; 166 message->tflags = 0; 167 message->tflags_offset = 0; 168 message->rflags = QMGR_READ_FLAG_DEFAULT; 169 message->fp = 0; 170 message->refcount = 0; 171 message->single_rcpt = 0; 172 message->arrival_time.tv_sec = message->arrival_time.tv_usec = 0; 173 message->create_time = 0; 174 GETTIMEOFDAY(&message->active_time); 175 message->queued_time = sane_time(); 176 message->refill_time = 0; 177 message->data_offset = 0; 178 message->queue_id = mystrdup(queue_id); 179 message->queue_name = mystrdup(queue_name); 180 message->encoding = 0; 181 message->sender = 0; 182 message->dsn_envid = 0; 183 message->dsn_ret = 0; 184 message->filter_xport = 0; 185 message->inspect_xport = 0; 186 message->redirect_addr = 0; 187 message->data_size = 0; 188 message->cont_length = 0; 189 message->warn_offset = 0; 190 message->warn_time = 0; 191 message->rcpt_offset = 0; 192 message->verp_delims = 0; 193 message->client_name = 0; 194 message->client_addr = 0; 195 message->client_port = 0; 196 message->client_proto = 0; 197 message->client_helo = 0; 198 message->sasl_method = 0; 199 message->sasl_username = 0; 200 message->sasl_sender = 0; 201 message->log_ident = 0; 202 message->rewrite_context = 0; 203 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); 204 message->rcpt_count = 0; 205 message->rcpt_limit = var_qmgr_msg_rcpt_limit; 206 message->rcpt_unread = 0; 207 QMGR_LIST_INIT(message->job_list); 208 return (message); 209 } 210 211 /* qmgr_message_close - close queue file */ 212 213 static void qmgr_message_close(QMGR_MESSAGE *message) 214 { 215 vstream_fclose(message->fp); 216 message->fp = 0; 217 } 218 219 /* qmgr_message_open - open queue file */ 220 221 static int qmgr_message_open(QMGR_MESSAGE *message) 222 { 223 224 /* 225 * Sanity check. 226 */ 227 if (message->fp) 228 msg_panic("%s: queue file is open", message->queue_id); 229 230 /* 231 * Open this queue file. Skip files that we cannot open. Back off when 232 * the system appears to be running out of resources. 233 */ 234 if ((message->fp = mail_queue_open(message->queue_name, 235 message->queue_id, 236 O_RDWR, 0)) == 0) { 237 if (errno != ENOENT) 238 msg_fatal("open %s %s: %m", message->queue_name, message->queue_id); 239 msg_warn("open %s %s: %m", message->queue_name, message->queue_id); 240 return (-1); 241 } 242 return (0); 243 } 244 245 /* qmgr_message_oldstyle_scan - support for Postfix < 1.0 queue files */ 246 247 static void qmgr_message_oldstyle_scan(QMGR_MESSAGE *message) 248 { 249 VSTRING *buf; 250 long orig_offset, extra_offset; 251 int rec_type; 252 char *start; 253 254 /* 255 * Initialize. No early returns or we have a memory leak. 256 */ 257 buf = vstring_alloc(100); 258 if ((orig_offset = vstream_ftell(message->fp)) < 0) 259 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); 260 261 /* 262 * Rewind to the very beginning to make sure we see all records. 263 */ 264 if (vstream_fseek(message->fp, 0, SEEK_SET) < 0) 265 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 266 267 /* 268 * Scan through the old style queue file. Count the total number of 269 * recipients and find the data/extra sections offsets. Note that the new 270 * queue files require that data_size equals extra_offset - data_offset, 271 * so we set data_size to this as well and ignore the size record itself 272 * completely. 273 */ 274 message->rcpt_unread = 0; 275 for (;;) { 276 rec_type = rec_get(message->fp, buf, 0); 277 if (rec_type <= 0) 278 /* Report missing end record later. */ 279 break; 280 start = vstring_str(buf); 281 if (msg_verbose > 1) 282 msg_info("old-style scan record %c %s", rec_type, start); 283 if (rec_type == REC_TYPE_END) 284 break; 285 if (rec_type == REC_TYPE_DONE 286 || rec_type == REC_TYPE_RCPT 287 || rec_type == REC_TYPE_DRCP) { 288 message->rcpt_unread++; 289 continue; 290 } 291 if (rec_type == REC_TYPE_MESG) { 292 if (message->data_offset == 0) { 293 if ((message->data_offset = vstream_ftell(message->fp)) < 0) 294 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); 295 if ((extra_offset = atol(start)) <= message->data_offset) 296 msg_fatal("bad extra offset %s file %s", 297 start, VSTREAM_PATH(message->fp)); 298 if (vstream_fseek(message->fp, extra_offset, SEEK_SET) < 0) 299 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 300 message->data_size = extra_offset - message->data_offset; 301 } 302 continue; 303 } 304 } 305 306 /* 307 * Clean up. 308 */ 309 if (vstream_fseek(message->fp, orig_offset, SEEK_SET) < 0) 310 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 311 vstring_free(buf); 312 313 /* 314 * Sanity checks. Verify that all required information was found, 315 * including the queue file end marker. 316 */ 317 if (message->data_offset == 0 || rec_type != REC_TYPE_END) 318 msg_fatal("%s: envelope records out of order", message->queue_id); 319 } 320 321 /* qmgr_message_read - read envelope records */ 322 323 static int qmgr_message_read(QMGR_MESSAGE *message) 324 { 325 VSTRING *buf; 326 int rec_type; 327 long curr_offset; 328 long save_offset = message->rcpt_offset; /* save a flag */ 329 int save_unread = message->rcpt_unread; /* save a count */ 330 char *start; 331 int recipient_limit; 332 const char *error_text; 333 char *name; 334 char *value; 335 char *orig_rcpt = 0; 336 int count; 337 int dsn_notify = 0; 338 char *dsn_orcpt = 0; 339 int n; 340 int have_log_client_attr = 0; 341 342 /* 343 * Initialize. No early returns or we have a memory leak. 344 */ 345 buf = vstring_alloc(100); 346 347 /* 348 * If we re-open this file, skip over on-file recipient records that we 349 * already looked at, and refill the in-core recipient address list. 350 * 351 * For the first time, the message recipient limit is calculated from the 352 * global recipient limit. This is to avoid reading little recipients 353 * when the active queue is near empty. When the queue becomes full, only 354 * the necessary amount is read in core. Such priming is necessary 355 * because there are no message jobs yet. 356 * 357 * For the next time, the recipient limit is based solely on the message 358 * jobs' positions in the job lists and/or job stacks. 359 */ 360 if (message->rcpt_offset) { 361 if (message->rcpt_list.len) 362 msg_panic("%s: recipient list not empty on recipient reload", 363 message->queue_id); 364 if (vstream_fseek(message->fp, message->rcpt_offset, SEEK_SET) < 0) 365 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 366 message->rcpt_offset = 0; 367 recipient_limit = message->rcpt_limit - message->rcpt_count; 368 } else { 369 recipient_limit = var_qmgr_rcpt_limit - qmgr_recipient_count; 370 if (recipient_limit < message->rcpt_limit) 371 recipient_limit = message->rcpt_limit; 372 } 373 /* Keep interrupt latency in check. */ 374 if (recipient_limit > 5000) 375 recipient_limit = 5000; 376 if (recipient_limit <= 0) 377 msg_panic("%s: no recipient slots available", message->queue_id); 378 if (msg_verbose) 379 msg_info("%s: recipient limit %d", message->queue_id, recipient_limit); 380 381 /* 382 * Read envelope records. XXX Rely on the front-end programs to enforce 383 * record size limits. Read up to recipient_limit recipients from the 384 * queue file, to protect against memory exhaustion. Recipient records 385 * may appear before or after the message content, so we keep reading 386 * from the queue file until we have enough recipients (rcpt_offset != 0) 387 * and until we know all the non-recipient information. 388 * 389 * Note that the total recipient count record is accurate only for fresh 390 * queue files. After some of the recipients are marked as done and the 391 * queue file is deferred, it can be used as upper bound estimate only. 392 * Fortunately, this poses no major problem on the scheduling algorithm, 393 * as the only impact is that the already deferred messages are not 394 * chosen by qmgr_job_candidate() as often as they could. 395 * 396 * On the first open, we must examine all non-recipient records. 397 * 398 * Optimization: when we know that recipient records are not mixed with 399 * non-recipient records, as is typical with mailing list mail, then we 400 * can avoid having to examine all the queue file records before we can 401 * start deliveries. This avoids some file system thrashing with huge 402 * mailing lists. 403 */ 404 for (;;) { 405 if ((curr_offset = vstream_ftell(message->fp)) < 0) 406 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); 407 if (curr_offset == message->data_offset && curr_offset > 0) { 408 if (vstream_fseek(message->fp, message->data_size, SEEK_CUR) < 0) 409 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 410 curr_offset += message->data_size; 411 } 412 rec_type = rec_get_raw(message->fp, buf, 0, REC_FLAG_NONE); 413 start = vstring_str(buf); 414 if (msg_verbose > 1) 415 msg_info("record %c %s", rec_type, start); 416 if (rec_type == REC_TYPE_PTR) { 417 if ((rec_type = rec_goto(message->fp, start)) == REC_TYPE_ERROR) 418 break; 419 /* Need to update curr_offset after pointer jump. */ 420 continue; 421 } 422 if (rec_type <= 0) { 423 msg_warn("%s: message rejected: missing end record", 424 message->queue_id); 425 break; 426 } 427 if (rec_type == REC_TYPE_END) { 428 message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; 429 break; 430 } 431 432 /* 433 * Map named attributes to pseudo record types, so that we don't have 434 * to pollute the queue file with records that are incompatible with 435 * past Postfix versions. Preferably, people should be able to back 436 * out from an upgrade without losing mail. 437 */ 438 if (rec_type == REC_TYPE_ATTR) { 439 if ((error_text = split_nameval(start, &name, &value)) != 0) { 440 msg_warn("%s: ignoring bad attribute: %s: %.200s", 441 message->queue_id, error_text, start); 442 rec_type = REC_TYPE_ERROR; 443 break; 444 } 445 if ((n = rec_attr_map(name)) != 0) { 446 start = value; 447 rec_type = n; 448 } 449 } 450 451 /* 452 * Process recipient records. 453 */ 454 if (rec_type == REC_TYPE_RCPT) { 455 /* See also below for code setting orig_rcpt etc. */ 456 if (message->rcpt_offset == 0) { 457 message->rcpt_unread--; 458 recipient_list_add(&message->rcpt_list, curr_offset, 459 dsn_orcpt ? dsn_orcpt : "", 460 dsn_notify ? dsn_notify : 0, 461 orig_rcpt ? orig_rcpt : "", start); 462 if (dsn_orcpt) { 463 myfree(dsn_orcpt); 464 dsn_orcpt = 0; 465 } 466 if (orig_rcpt) { 467 myfree(orig_rcpt); 468 orig_rcpt = 0; 469 } 470 if (dsn_notify) 471 dsn_notify = 0; 472 if (message->rcpt_list.len >= recipient_limit) { 473 if ((message->rcpt_offset = vstream_ftell(message->fp)) < 0) 474 msg_fatal("vstream_ftell %s: %m", 475 VSTREAM_PATH(message->fp)); 476 if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) 477 /* We already examined all non-recipient records. */ 478 break; 479 if (message->rflags & QMGR_READ_FLAG_MIXED_RCPT_OTHER) 480 /* Examine all remaining non-recipient records. */ 481 continue; 482 /* Optimizations for "pure recipient" record sections. */ 483 if (curr_offset > message->data_offset) { 484 /* We already examined all non-recipient records. */ 485 message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; 486 break; 487 } 488 /* Examine non-recipient records in extracted segment. */ 489 if (vstream_fseek(message->fp, message->data_offset 490 + message->data_size, SEEK_SET) < 0) 491 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 492 continue; 493 } 494 } 495 continue; 496 } 497 if (rec_type == REC_TYPE_DONE || rec_type == REC_TYPE_DRCP) { 498 if (message->rcpt_offset == 0) { 499 message->rcpt_unread--; 500 if (dsn_orcpt) { 501 myfree(dsn_orcpt); 502 dsn_orcpt = 0; 503 } 504 if (orig_rcpt) { 505 myfree(orig_rcpt); 506 orig_rcpt = 0; 507 } 508 if (dsn_notify) 509 dsn_notify = 0; 510 } 511 continue; 512 } 513 if (rec_type == REC_TYPE_DSN_ORCPT) { 514 /* See also above for code clearing dsn_orcpt. */ 515 if (dsn_orcpt != 0) { 516 msg_warn("%s: ignoring out-of-order DSN original recipient address <%.200s>", 517 message->queue_id, dsn_orcpt); 518 myfree(dsn_orcpt); 519 dsn_orcpt = 0; 520 } 521 if (message->rcpt_offset == 0) 522 dsn_orcpt = mystrdup(start); 523 continue; 524 } 525 if (rec_type == REC_TYPE_DSN_NOTIFY) { 526 /* See also above for code clearing dsn_notify. */ 527 if (dsn_notify != 0) { 528 msg_warn("%s: ignoring out-of-order DSN notify flags <%d>", 529 message->queue_id, dsn_notify); 530 dsn_notify = 0; 531 } 532 if (message->rcpt_offset == 0) { 533 if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_NOTIFY_OK(n)) 534 msg_warn("%s: ignoring malformed DSN notify flags <%.200s>", 535 message->queue_id, start); 536 else 537 dsn_notify = n; 538 continue; 539 } 540 } 541 if (rec_type == REC_TYPE_ORCP) { 542 /* See also above for code clearing orig_rcpt. */ 543 if (orig_rcpt != 0) { 544 msg_warn("%s: ignoring out-of-order original recipient <%.200s>", 545 message->queue_id, orig_rcpt); 546 myfree(orig_rcpt); 547 orig_rcpt = 0; 548 } 549 if (message->rcpt_offset == 0) 550 orig_rcpt = mystrdup(start); 551 continue; 552 } 553 554 /* 555 * Process non-recipient records. 556 */ 557 if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) 558 /* We already examined all non-recipient records. */ 559 continue; 560 if (rec_type == REC_TYPE_SIZE) { 561 if (message->data_offset == 0) { 562 if ((count = sscanf(start, "%ld %ld %d %d %ld", 563 &message->data_size, &message->data_offset, 564 &message->rcpt_unread, &message->rflags, 565 &message->cont_length)) >= 3) { 566 /* Postfix >= 1.0 (a.k.a. 20010228). */ 567 if (message->data_offset <= 0 || message->data_size <= 0) { 568 msg_warn("%s: invalid size record: %.100s", 569 message->queue_id, start); 570 rec_type = REC_TYPE_ERROR; 571 break; 572 } 573 if (message->rflags & ~QMGR_READ_FLAG_USER) { 574 msg_warn("%s: invalid flags in size record: %.100s", 575 message->queue_id, start); 576 rec_type = REC_TYPE_ERROR; 577 break; 578 } 579 } else if (count == 1) { 580 /* Postfix < 1.0 (a.k.a. 20010228). */ 581 qmgr_message_oldstyle_scan(message); 582 } else { 583 /* Can't happen. */ 584 msg_warn("%s: message rejected: weird size record", 585 message->queue_id); 586 rec_type = REC_TYPE_ERROR; 587 break; 588 } 589 } 590 /* Postfix < 2.4 compatibility. */ 591 if (message->cont_length == 0) { 592 message->cont_length = message->data_size; 593 } else if (message->cont_length < 0) { 594 msg_warn("%s: invalid size record: %.100s", 595 message->queue_id, start); 596 rec_type = REC_TYPE_ERROR; 597 break; 598 } 599 continue; 600 } 601 if (rec_type == REC_TYPE_TIME) { 602 if (message->arrival_time.tv_sec == 0) 603 REC_TYPE_TIME_SCAN(start, message->arrival_time); 604 continue; 605 } 606 if (rec_type == REC_TYPE_CTIME) { 607 if (message->create_time == 0) 608 message->create_time = atol(start); 609 continue; 610 } 611 if (rec_type == REC_TYPE_FILT) { 612 if (message->filter_xport != 0) 613 myfree(message->filter_xport); 614 message->filter_xport = mystrdup(start); 615 continue; 616 } 617 if (rec_type == REC_TYPE_INSP) { 618 if (message->inspect_xport != 0) 619 myfree(message->inspect_xport); 620 message->inspect_xport = mystrdup(start); 621 continue; 622 } 623 if (rec_type == REC_TYPE_RDR) { 624 if (message->redirect_addr != 0) 625 myfree(message->redirect_addr); 626 message->redirect_addr = mystrdup(start); 627 continue; 628 } 629 if (rec_type == REC_TYPE_FROM) { 630 if (message->sender == 0) { 631 message->sender = mystrdup(start); 632 opened(message->queue_id, message->sender, 633 message->cont_length, message->rcpt_unread, 634 "queue %s", message->queue_name); 635 } 636 continue; 637 } 638 if (rec_type == REC_TYPE_DSN_ENVID) { 639 if (message->dsn_envid == 0) 640 message->dsn_envid = mystrdup(start); 641 } 642 if (rec_type == REC_TYPE_DSN_RET) { 643 if (message->dsn_ret == 0) { 644 if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_RET_OK(n)) 645 msg_warn("%s: ignoring malformed DSN RET flags in queue file record:%.100s", 646 message->queue_id, start); 647 else 648 message->dsn_ret = n; 649 } 650 } 651 if (rec_type == REC_TYPE_ATTR) { 652 /* Allow extra segment to override envelope segment info. */ 653 if (strcmp(name, MAIL_ATTR_ENCODING) == 0) { 654 if (message->encoding != 0) 655 myfree(message->encoding); 656 message->encoding = mystrdup(value); 657 } 658 659 /* 660 * Backwards compatibility. Before Postfix 2.3, the logging 661 * attributes were called client_name, etc. Now they are called 662 * log_client_name. etc., and client_name is used for the actual 663 * client information. To support old queue files we accept both 664 * names for the purpose of logging; the new name overrides the 665 * old one. 666 * 667 * XXX Do not use the "legacy" client_name etc. attribute values for 668 * initializing the logging attributes, when this file already 669 * contains the "modern" log_client_name etc. logging attributes. 670 * Otherwise, logging attributes that are not present in the 671 * queue file would be set with information from the real client. 672 */ 673 else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_NAME) == 0) { 674 if (have_log_client_attr == 0 && message->client_name == 0) 675 message->client_name = mystrdup(value); 676 } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_ADDR) == 0) { 677 if (have_log_client_attr == 0 && message->client_addr == 0) 678 message->client_addr = mystrdup(value); 679 } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_PORT) == 0) { 680 if (have_log_client_attr == 0 && message->client_port == 0) 681 message->client_port = mystrdup(value); 682 } else if (strcmp(name, MAIL_ATTR_ACT_PROTO_NAME) == 0) { 683 if (have_log_client_attr == 0 && message->client_proto == 0) 684 message->client_proto = mystrdup(value); 685 } else if (strcmp(name, MAIL_ATTR_ACT_HELO_NAME) == 0) { 686 if (have_log_client_attr == 0 && message->client_helo == 0) 687 message->client_helo = mystrdup(value); 688 } 689 /* Original client attributes. */ 690 else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_NAME) == 0) { 691 if (message->client_name != 0) 692 myfree(message->client_name); 693 message->client_name = mystrdup(value); 694 have_log_client_attr = 1; 695 } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_ADDR) == 0) { 696 if (message->client_addr != 0) 697 myfree(message->client_addr); 698 message->client_addr = mystrdup(value); 699 have_log_client_attr = 1; 700 } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_PORT) == 0) { 701 if (message->client_port != 0) 702 myfree(message->client_port); 703 message->client_port = mystrdup(value); 704 have_log_client_attr = 1; 705 } else if (strcmp(name, MAIL_ATTR_LOG_PROTO_NAME) == 0) { 706 if (message->client_proto != 0) 707 myfree(message->client_proto); 708 message->client_proto = mystrdup(value); 709 have_log_client_attr = 1; 710 } else if (strcmp(name, MAIL_ATTR_LOG_HELO_NAME) == 0) { 711 if (message->client_helo != 0) 712 myfree(message->client_helo); 713 message->client_helo = mystrdup(value); 714 have_log_client_attr = 1; 715 } else if (strcmp(name, MAIL_ATTR_SASL_METHOD) == 0) { 716 if (message->sasl_method == 0) 717 message->sasl_method = mystrdup(value); 718 else 719 msg_warn("%s: ignoring multiple %s attribute: %s", 720 message->queue_id, MAIL_ATTR_SASL_METHOD, value); 721 } else if (strcmp(name, MAIL_ATTR_SASL_USERNAME) == 0) { 722 if (message->sasl_username == 0) 723 message->sasl_username = mystrdup(value); 724 else 725 msg_warn("%s: ignoring multiple %s attribute: %s", 726 message->queue_id, MAIL_ATTR_SASL_USERNAME, value); 727 } else if (strcmp(name, MAIL_ATTR_SASL_SENDER) == 0) { 728 if (message->sasl_sender == 0) 729 message->sasl_sender = mystrdup(value); 730 else 731 msg_warn("%s: ignoring multiple %s attribute: %s", 732 message->queue_id, MAIL_ATTR_SASL_SENDER, value); 733 } else if (strcmp(name, MAIL_ATTR_LOG_IDENT) == 0) { 734 if (message->log_ident == 0) 735 message->log_ident = mystrdup(value); 736 else 737 msg_warn("%s: ignoring multiple %s attribute: %s", 738 message->queue_id, MAIL_ATTR_LOG_IDENT, value); 739 } else if (strcmp(name, MAIL_ATTR_RWR_CONTEXT) == 0) { 740 if (message->rewrite_context == 0) 741 message->rewrite_context = mystrdup(value); 742 else 743 msg_warn("%s: ignoring multiple %s attribute: %s", 744 message->queue_id, MAIL_ATTR_RWR_CONTEXT, value); 745 } 746 747 /* 748 * Optional tracing flags (verify, sendmail -v, sendmail -bv). 749 * This record is killed after a trace logfile report is sent and 750 * after the logfile is deleted. 751 */ 752 else if (strcmp(name, MAIL_ATTR_TRACE_FLAGS) == 0) { 753 message->tflags = DEL_REQ_TRACE_FLAGS(atoi(value)); 754 if (message->tflags == DEL_REQ_FLAG_RECORD) 755 message->tflags_offset = curr_offset; 756 else 757 message->tflags_offset = 0; 758 } 759 continue; 760 } 761 if (rec_type == REC_TYPE_WARN) { 762 if (message->warn_offset == 0) { 763 message->warn_offset = curr_offset; 764 REC_TYPE_WARN_SCAN(start, message->warn_time); 765 } 766 continue; 767 } 768 if (rec_type == REC_TYPE_VERP) { 769 if (message->verp_delims == 0) { 770 if (message->sender == 0 || message->sender[0] == 0) { 771 msg_warn("%s: ignoring VERP request for null sender", 772 message->queue_id); 773 } else if (verp_delims_verify(start) != 0) { 774 msg_warn("%s: ignoring bad VERP request: \"%.100s\"", 775 message->queue_id, start); 776 } else { 777 if (msg_verbose) 778 msg_info("%s: enabling VERP for sender \"%.100s\"", 779 message->queue_id, message->sender); 780 message->single_rcpt = 1; 781 message->verp_delims = mystrdup(start); 782 } 783 } 784 continue; 785 } 786 } 787 788 /* 789 * Grr. 790 */ 791 if (dsn_orcpt != 0) { 792 if (rec_type > 0) 793 msg_warn("%s: ignoring out-of-order DSN original recipient <%.200s>", 794 message->queue_id, dsn_orcpt); 795 myfree(dsn_orcpt); 796 } 797 if (orig_rcpt != 0) { 798 if (rec_type > 0) 799 msg_warn("%s: ignoring out-of-order original recipient <%.200s>", 800 message->queue_id, orig_rcpt); 801 myfree(orig_rcpt); 802 } 803 804 /* 805 * Remember when we have read the last recipient batch. Note that we do 806 * it here after reading as reading might have used considerable amount 807 * of time. 808 */ 809 message->refill_time = sane_time(); 810 811 /* 812 * Avoid clumsiness elsewhere in the program. When sending data across an 813 * IPC channel, sending an empty string is more convenient than sending a 814 * null pointer. 815 */ 816 if (message->dsn_envid == 0) 817 message->dsn_envid = mystrdup(""); 818 if (message->encoding == 0) 819 message->encoding = mystrdup(MAIL_ATTR_ENC_NONE); 820 if (message->client_name == 0) 821 message->client_name = mystrdup(""); 822 if (message->client_addr == 0) 823 message->client_addr = mystrdup(""); 824 if (message->client_port == 0) 825 message->client_port = mystrdup(""); 826 if (message->client_proto == 0) 827 message->client_proto = mystrdup(""); 828 if (message->client_helo == 0) 829 message->client_helo = mystrdup(""); 830 if (message->sasl_method == 0) 831 message->sasl_method = mystrdup(""); 832 if (message->sasl_username == 0) 833 message->sasl_username = mystrdup(""); 834 if (message->sasl_sender == 0) 835 message->sasl_sender = mystrdup(""); 836 if (message->log_ident == 0) 837 message->log_ident = mystrdup(""); 838 if (message->rewrite_context == 0) 839 message->rewrite_context = mystrdup(MAIL_ATTR_RWR_LOCAL); 840 /* Postfix < 2.3 compatibility. */ 841 if (message->create_time == 0) 842 message->create_time = message->arrival_time.tv_sec; 843 844 /* 845 * Clean up. 846 */ 847 vstring_free(buf); 848 849 /* 850 * Sanity checks. Verify that all required information was found, 851 * including the queue file end marker. 852 */ 853 if (message->rcpt_unread < 0 854 || (message->rcpt_offset == 0 && message->rcpt_unread != 0)) { 855 msg_warn("%s: rcpt count mismatch (%d)", 856 message->queue_id, message->rcpt_unread); 857 message->rcpt_unread = 0; 858 } 859 if (rec_type <= 0) { 860 /* Already logged warning. */ 861 } else if (message->arrival_time.tv_sec == 0) { 862 msg_warn("%s: message rejected: missing arrival time record", 863 message->queue_id); 864 } else if (message->sender == 0) { 865 msg_warn("%s: message rejected: missing sender record", 866 message->queue_id); 867 } else if (message->data_offset == 0) { 868 msg_warn("%s: message rejected: missing size record", 869 message->queue_id); 870 } else { 871 return (0); 872 } 873 message->rcpt_offset = save_offset; /* restore flag */ 874 message->rcpt_unread = save_unread; /* restore count */ 875 recipient_list_free(&message->rcpt_list); 876 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); 877 return (-1); 878 } 879 880 /* qmgr_message_update_warn - update the time of next delay warning */ 881 882 void qmgr_message_update_warn(QMGR_MESSAGE *message) 883 { 884 885 /* 886 * XXX eventually this should let us schedule multiple warnings, right 887 * now it just allows for one. 888 */ 889 if (qmgr_message_open(message) 890 || vstream_fseek(message->fp, message->warn_offset, SEEK_SET) < 0 891 || rec_fprintf(message->fp, REC_TYPE_WARN, REC_TYPE_WARN_FORMAT, 892 REC_TYPE_WARN_ARG(0)) < 0 893 || vstream_fflush(message->fp)) 894 msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); 895 qmgr_message_close(message); 896 } 897 898 /* qmgr_message_kill_record - mark one message record as killed */ 899 900 void qmgr_message_kill_record(QMGR_MESSAGE *message, long offset) 901 { 902 if (offset <= 0) 903 msg_panic("qmgr_message_kill_record: bad offset 0x%lx", offset); 904 if (qmgr_message_open(message) 905 || rec_put_type(message->fp, REC_TYPE_KILL, offset) < 0 906 || vstream_fflush(message->fp)) 907 msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); 908 qmgr_message_close(message); 909 } 910 911 /* qmgr_message_sort_compare - compare recipient information */ 912 913 static int qmgr_message_sort_compare(const void *p1, const void *p2) 914 { 915 RECIPIENT *rcpt1 = (RECIPIENT *) p1; 916 RECIPIENT *rcpt2 = (RECIPIENT *) p2; 917 QMGR_QUEUE *queue1; 918 QMGR_QUEUE *queue2; 919 char *at1; 920 char *at2; 921 int result; 922 923 /* 924 * Compare most significant to least significant recipient attributes. 925 * The comparison function must be transitive, so NULL values need to be 926 * assigned an ordinal (we set NULL last). 927 */ 928 929 queue1 = rcpt1->u.queue; 930 queue2 = rcpt2->u.queue; 931 if (queue1 != 0 && queue2 == 0) 932 return (-1); 933 if (queue1 == 0 && queue2 != 0) 934 return (1); 935 if (queue1 != 0 && queue2 != 0) { 936 937 /* 938 * Compare message transport. 939 */ 940 if ((result = strcmp(queue1->transport->name, 941 queue2->transport->name)) != 0) 942 return (result); 943 944 /* 945 * Compare queue name (nexthop or recipient@nexthop). 946 */ 947 if ((result = strcmp(queue1->name, queue2->name)) != 0) 948 return (result); 949 } 950 951 /* 952 * Compare recipient domain. 953 */ 954 at1 = strrchr(rcpt1->address, '@'); 955 at2 = strrchr(rcpt2->address, '@'); 956 if (at1 == 0 && at2 != 0) 957 return (1); 958 if (at1 != 0 && at2 == 0) 959 return (-1); 960 if (at1 != 0 && at2 != 0 961 && (result = strcasecmp(at1, at2)) != 0) 962 return (result); 963 964 /* 965 * Compare recipient address. 966 */ 967 return (strcmp(rcpt1->address, rcpt2->address)); 968 } 969 970 /* qmgr_message_sort - sort message recipient addresses by domain */ 971 972 static void qmgr_message_sort(QMGR_MESSAGE *message) 973 { 974 qsort((char *) message->rcpt_list.info, message->rcpt_list.len, 975 sizeof(message->rcpt_list.info[0]), qmgr_message_sort_compare); 976 if (msg_verbose) { 977 RECIPIENT_LIST list = message->rcpt_list; 978 RECIPIENT *rcpt; 979 980 msg_info("start sorted recipient list"); 981 for (rcpt = list.info; rcpt < list.info + list.len; rcpt++) 982 msg_info("qmgr_message_sort: %s", rcpt->address); 983 msg_info("end sorted recipient list"); 984 } 985 } 986 987 /* qmgr_resolve_one - resolve or skip one recipient */ 988 989 static int qmgr_resolve_one(QMGR_MESSAGE *message, RECIPIENT *recipient, 990 const char *addr, RESOLVE_REPLY *reply) 991 { 992 #define QMGR_REDIRECT(rp, tp, np) do { \ 993 (rp)->flags = 0; \ 994 vstring_strcpy((rp)->transport, (tp)); \ 995 vstring_strcpy((rp)->nexthop, (np)); \ 996 } while (0) 997 998 if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) == 0) 999 resolve_clnt_query_from(message->sender, addr, reply); 1000 else 1001 resolve_clnt_verify_from(message->sender, addr, reply); 1002 if (reply->flags & RESOLVE_FLAG_FAIL) { 1003 QMGR_REDIRECT(reply, MAIL_SERVICE_RETRY, 1004 "4.3.0 address resolver failure"); 1005 return (0); 1006 } else if (reply->flags & RESOLVE_FLAG_ERROR) { 1007 QMGR_REDIRECT(reply, MAIL_SERVICE_ERROR, 1008 "5.1.3 bad address syntax"); 1009 return (0); 1010 } else { 1011 return (0); 1012 } 1013 } 1014 1015 /* qmgr_message_resolve - resolve recipients */ 1016 1017 static void qmgr_message_resolve(QMGR_MESSAGE *message) 1018 { 1019 static ARGV *defer_xport_argv; 1020 RECIPIENT_LIST list = message->rcpt_list; 1021 RECIPIENT *recipient; 1022 QMGR_TRANSPORT *transport = 0; 1023 QMGR_QUEUE *queue = 0; 1024 RESOLVE_REPLY reply; 1025 VSTRING *queue_name; 1026 char *at; 1027 char **cpp; 1028 char *nexthop; 1029 ssize_t len; 1030 int status; 1031 DSN dsn; 1032 MSG_STATS stats; 1033 DSN *saved_dsn; 1034 1035 #define STREQ(x,y) (strcmp(x,y) == 0) 1036 #define STR vstring_str 1037 #define LEN VSTRING_LEN 1038 1039 resolve_clnt_init(&reply); 1040 queue_name = vstring_alloc(1); 1041 for (recipient = list.info; recipient < list.info + list.len; recipient++) { 1042 1043 /* 1044 * Redirect overrides all else. But only once (per entire message). 1045 * For consistency with the remainder of Postfix, rewrite the address 1046 * to canonical form before resolving it. 1047 */ 1048 if (message->redirect_addr) { 1049 if (recipient > list.info) { 1050 recipient->u.queue = 0; 1051 continue; 1052 } 1053 message->rcpt_offset = 0; 1054 message->rcpt_unread = 0; 1055 1056 rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr, 1057 reply.recipient); 1058 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); 1059 if (qmgr_resolve_one(message, recipient, 1060 recipient->address, &reply) < 0) 1061 continue; 1062 if (!STREQ(recipient->address, STR(reply.recipient))) 1063 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); 1064 } 1065 1066 /* 1067 * Content filtering overrides the address resolver. 1068 * 1069 * XXX Bypass content_filter inspection for user-generated probes 1070 * (sendmail -bv). MTA-generated probes never have the "please filter 1071 * me" bits turned on, but we handle them here anyway for the sake of 1072 * future proofing. 1073 */ 1074 #define FILTER_WITHOUT_NEXTHOP(filter, next) \ 1075 (((next) = split_at((filter), ':')) == 0 || *(next) == 0) 1076 1077 #define RCPT_WITHOUT_DOMAIN(rcpt, next) \ 1078 ((next = strrchr(rcpt, '@')) == 0 || *++(next) == 0) 1079 1080 else if (message->filter_xport 1081 && (message->tflags & DEL_REQ_TRACE_ONLY_MASK) == 0) { 1082 reply.flags = 0; 1083 vstring_strcpy(reply.transport, message->filter_xport); 1084 if (FILTER_WITHOUT_NEXTHOP(STR(reply.transport), nexthop) 1085 && *(nexthop = var_def_filter_nexthop) == 0 1086 && RCPT_WITHOUT_DOMAIN(recipient->address, nexthop)) 1087 nexthop = var_myhostname; 1088 vstring_strcpy(reply.nexthop, nexthop); 1089 vstring_strcpy(reply.recipient, recipient->address); 1090 } 1091 1092 /* 1093 * Resolve the destination to (transport, nexthop, address). The 1094 * result address may differ from the one specified by the sender. 1095 */ 1096 else { 1097 if (qmgr_resolve_one(message, recipient, 1098 recipient->address, &reply) < 0) 1099 continue; 1100 if (!STREQ(recipient->address, STR(reply.recipient))) 1101 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); 1102 } 1103 1104 /* 1105 * Bounce null recipients. This should never happen, but is most 1106 * likely the result of a fault in a different program, so aborting 1107 * the queue manager process does not help. 1108 */ 1109 if (recipient->address[0] == 0) { 1110 QMGR_REDIRECT(&reply, MAIL_SERVICE_ERROR, 1111 "5.1.3 null recipient address"); 1112 } 1113 1114 /* 1115 * Discard mail to the local double bounce address here, so this 1116 * system can run without a local delivery agent. They'd still have 1117 * to configure something for mail directed to the local postmaster, 1118 * though, but that is an RFC requirement anyway. 1119 * 1120 * XXX This lookup should be done in the resolver, and the mail should 1121 * be directed to a general-purpose null delivery agent. 1122 */ 1123 if (reply.flags & RESOLVE_CLASS_LOCAL) { 1124 at = strrchr(STR(reply.recipient), '@'); 1125 len = (at ? (at - STR(reply.recipient)) 1126 : strlen(STR(reply.recipient))); 1127 if (strncasecmp(STR(reply.recipient), var_double_bounce_sender, 1128 len) == 0 1129 && !var_double_bounce_sender[len]) { 1130 status = sent(message->tflags, message->queue_id, 1131 QMGR_MSG_STATS(&stats, message), recipient, 1132 "none", DSN_SIMPLE(&dsn, "2.0.0", 1133 "undeliverable postmaster notification discarded")); 1134 if (status == 0) { 1135 deliver_completed(message->fp, recipient->offset); 1136 #if 0 1137 /* It's the default verification probe sender address. */ 1138 msg_warn("%s: undeliverable postmaster notification discarded", 1139 message->queue_id); 1140 #endif 1141 } else 1142 message->flags |= status; 1143 continue; 1144 } 1145 } 1146 1147 /* 1148 * Optionally defer deliveries over specific transports, unless the 1149 * restriction is lifted temporarily. 1150 */ 1151 if (*var_defer_xports && (message->qflags & QMGR_FLUSH_DFXP) == 0) { 1152 if (defer_xport_argv == 0) 1153 defer_xport_argv = argv_split(var_defer_xports, " \t\r\n,"); 1154 for (cpp = defer_xport_argv->argv; *cpp; cpp++) 1155 if (strcmp(*cpp, STR(reply.transport)) == 0) 1156 break; 1157 if (*cpp) { 1158 QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, 1159 "4.3.2 deferred transport"); 1160 } 1161 } 1162 1163 /* 1164 * Look up or instantiate the proper transport. 1165 */ 1166 if (transport == 0 || !STREQ(transport->name, STR(reply.transport))) { 1167 if ((transport = qmgr_transport_find(STR(reply.transport))) == 0) 1168 transport = qmgr_transport_create(STR(reply.transport)); 1169 queue = 0; 1170 } 1171 1172 /* 1173 * This message is being flushed. If need-be unthrottle the 1174 * transport. 1175 */ 1176 if ((message->qflags & QMGR_FLUSH_EACH) != 0 1177 && QMGR_TRANSPORT_THROTTLED(transport)) 1178 qmgr_transport_unthrottle(transport); 1179 1180 /* 1181 * This transport is dead. Defer delivery to this recipient. 1182 */ 1183 if (QMGR_TRANSPORT_THROTTLED(transport)) { 1184 saved_dsn = transport->dsn; 1185 if ((transport = qmgr_error_transport(MAIL_SERVICE_RETRY)) != 0) { 1186 nexthop = qmgr_error_nexthop(saved_dsn); 1187 vstring_strcpy(reply.nexthop, nexthop); 1188 myfree(nexthop); 1189 queue = 0; 1190 } else { 1191 qmgr_defer_recipient(message, recipient, saved_dsn); 1192 continue; 1193 } 1194 } 1195 1196 /* 1197 * The nexthop destination provides the default name for the 1198 * per-destination queue. When the delivery agent accepts only one 1199 * recipient per delivery, give each recipient its own queue, so that 1200 * deliveries to different recipients of the same message can happen 1201 * in parallel, and so that we can enforce per-recipient concurrency 1202 * limits and prevent one recipient from tying up all the delivery 1203 * agent resources. We use recipient@nexthop as queue name rather 1204 * than the actual recipient domain name, so that one recipient in 1205 * multiple equivalent domains cannot evade the per-recipient 1206 * concurrency limit. Split the address on the recipient delimiter if 1207 * one is defined, so that extended addresses don't get extra 1208 * delivery slots. 1209 * 1210 * Fold the result to lower case so that we don't have multiple queues 1211 * for the same name. 1212 * 1213 * Important! All recipients in a queue must have the same nexthop 1214 * value. It is OK to have multiple queues with the same nexthop 1215 * value, but only when those queues are named after recipients. 1216 * 1217 * The single-recipient code below was written for local(8) like 1218 * delivery agents, and assumes that all domains that deliver to the 1219 * same (transport + nexthop) are aliases for $nexthop. Delivery 1220 * concurrency is changed from per-domain into per-recipient, by 1221 * changing the queue name from nexthop into localpart@nexthop. 1222 * 1223 * XXX This assumption is incorrect when different destinations share 1224 * the same (transport + nexthop). In reality, such transports are 1225 * rarely configured to use single-recipient deliveries. The fix is 1226 * to decouple the per-destination recipient limit from the 1227 * per-destination concurrency. 1228 */ 1229 vstring_strcpy(queue_name, STR(reply.nexthop)); 1230 if (strcmp(transport->name, MAIL_SERVICE_ERROR) != 0 1231 && strcmp(transport->name, MAIL_SERVICE_RETRY) != 0 1232 && transport->recipient_limit == 1) { 1233 /* Copy the recipient localpart. */ 1234 at = strrchr(STR(reply.recipient), '@'); 1235 len = (at ? (at - STR(reply.recipient)) 1236 : strlen(STR(reply.recipient))); 1237 vstring_strncpy(queue_name, STR(reply.recipient), len); 1238 /* Remove the address extension from the recipient localpart. */ 1239 if (*var_rcpt_delim && split_addr(STR(queue_name), var_rcpt_delim)) 1240 vstring_truncate(queue_name, strlen(STR(queue_name))); 1241 /* Assume the recipient domain is equivalent to nexthop. */ 1242 vstring_sprintf_append(queue_name, "@%s", STR(reply.nexthop)); 1243 } 1244 lowercase(STR(queue_name)); 1245 1246 /* 1247 * This transport is alive. Find or instantiate a queue for this 1248 * recipient. 1249 */ 1250 if (queue == 0 || !STREQ(queue->name, STR(queue_name))) { 1251 if ((queue = qmgr_queue_find(transport, STR(queue_name))) == 0) 1252 queue = qmgr_queue_create(transport, STR(queue_name), 1253 STR(reply.nexthop)); 1254 } 1255 1256 /* 1257 * This message is being flushed. If need-be unthrottle the queue. 1258 */ 1259 if ((message->qflags & QMGR_FLUSH_EACH) != 0 1260 && QMGR_QUEUE_THROTTLED(queue)) 1261 qmgr_queue_unthrottle(queue); 1262 1263 /* 1264 * This queue is dead. Defer delivery to this recipient. 1265 */ 1266 if (QMGR_QUEUE_THROTTLED(queue)) { 1267 saved_dsn = queue->dsn; 1268 if ((queue = qmgr_error_queue(MAIL_SERVICE_RETRY, saved_dsn)) == 0) { 1269 qmgr_defer_recipient(message, recipient, saved_dsn); 1270 continue; 1271 } 1272 } 1273 1274 /* 1275 * This queue is alive. Bind this recipient to this queue instance. 1276 */ 1277 recipient->u.queue = queue; 1278 } 1279 resolve_clnt_free(&reply); 1280 vstring_free(queue_name); 1281 } 1282 1283 /* qmgr_message_assign - assign recipients to specific delivery requests */ 1284 1285 static void qmgr_message_assign(QMGR_MESSAGE *message) 1286 { 1287 RECIPIENT_LIST list = message->rcpt_list; 1288 RECIPIENT *recipient; 1289 QMGR_ENTRY *entry = 0; 1290 QMGR_QUEUE *queue; 1291 QMGR_JOB *job = 0; 1292 QMGR_PEER *peer = 0; 1293 1294 /* 1295 * Try to bundle as many recipients in a delivery request as we can. When 1296 * the recipient resolves to the same site and transport as an existing 1297 * recipient, do not create a new queue entry, just move that recipient 1298 * to the recipient list of the existing queue entry. All this provided 1299 * that we do not exceed the transport-specific limit on the number of 1300 * recipients per transaction. 1301 */ 1302 #define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit))) 1303 1304 for (recipient = list.info; recipient < list.info + list.len; recipient++) { 1305 1306 /* 1307 * Skip recipients with a dead transport or destination. 1308 */ 1309 if ((queue = recipient->u.queue) == 0) 1310 continue; 1311 1312 /* 1313 * Lookup or instantiate the message job if necessary. 1314 */ 1315 if (job == 0 || queue->transport != job->transport) { 1316 job = qmgr_job_obtain(message, queue->transport); 1317 peer = 0; 1318 } 1319 1320 /* 1321 * Lookup or instantiate job peer if necessary. 1322 */ 1323 if (peer == 0 || queue != peer->queue) 1324 peer = qmgr_peer_obtain(job, queue); 1325 1326 /* 1327 * Lookup old or instantiate new recipient entry. We try to reuse the 1328 * last existing entry whenever the recipient limit permits. 1329 */ 1330 entry = peer->entry_list.prev; 1331 if (message->single_rcpt || entry == 0 1332 || !LIMIT_OK(queue->transport->recipient_limit, entry->rcpt_list.len)) 1333 entry = qmgr_entry_create(peer, message); 1334 1335 /* 1336 * Add the recipient to the current entry and increase all those 1337 * recipient counters accordingly. 1338 */ 1339 recipient_list_add(&entry->rcpt_list, recipient->offset, 1340 recipient->dsn_orcpt, recipient->dsn_notify, 1341 recipient->orig_addr, recipient->address); 1342 job->rcpt_count++; 1343 message->rcpt_count++; 1344 qmgr_recipient_count++; 1345 } 1346 1347 /* 1348 * Release the message recipient list and reinitialize it for the next 1349 * time. 1350 */ 1351 recipient_list_free(&message->rcpt_list); 1352 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); 1353 1354 /* 1355 * Note that even if qmgr_job_obtain() reset the job candidate cache of 1356 * all transports to which we assigned new recipients, this message may 1357 * have other jobs which we didn't touch at all this time. But the number 1358 * of unread recipients affecting the candidate selection might have 1359 * changed considerably, so we must invalidate the caches if it might be 1360 * of some use. 1361 */ 1362 for (job = message->job_list.next; job; job = job->message_peers.next) 1363 if (job->selected_entries < job->read_entries 1364 && job->blocker_tag != job->transport->blocker_tag) 1365 job->transport->candidate_cache_current = 0; 1366 } 1367 1368 /* qmgr_message_move_limits - recycle unused recipient slots */ 1369 1370 static void qmgr_message_move_limits(QMGR_MESSAGE *message) 1371 { 1372 QMGR_JOB *job; 1373 1374 for (job = message->job_list.next; job; job = job->message_peers.next) 1375 qmgr_job_move_limits(job); 1376 } 1377 1378 /* qmgr_message_free - release memory for in-core message structure */ 1379 1380 void qmgr_message_free(QMGR_MESSAGE *message) 1381 { 1382 QMGR_JOB *job; 1383 1384 if (message->refcount != 0) 1385 msg_panic("qmgr_message_free: reference len: %d", message->refcount); 1386 if (message->fp) 1387 msg_panic("qmgr_message_free: queue file is open"); 1388 while ((job = message->job_list.next) != 0) 1389 qmgr_job_free(job); 1390 myfree(message->queue_id); 1391 myfree(message->queue_name); 1392 if (message->dsn_envid) 1393 myfree(message->dsn_envid); 1394 if (message->encoding) 1395 myfree(message->encoding); 1396 if (message->sender) 1397 myfree(message->sender); 1398 if (message->verp_delims) 1399 myfree(message->verp_delims); 1400 if (message->filter_xport) 1401 myfree(message->filter_xport); 1402 if (message->inspect_xport) 1403 myfree(message->inspect_xport); 1404 if (message->redirect_addr) 1405 myfree(message->redirect_addr); 1406 if (message->client_name) 1407 myfree(message->client_name); 1408 if (message->client_addr) 1409 myfree(message->client_addr); 1410 if (message->client_port) 1411 myfree(message->client_port); 1412 if (message->client_proto) 1413 myfree(message->client_proto); 1414 if (message->client_helo) 1415 myfree(message->client_helo); 1416 if (message->sasl_method) 1417 myfree(message->sasl_method); 1418 if (message->sasl_username) 1419 myfree(message->sasl_username); 1420 if (message->sasl_sender) 1421 myfree(message->sasl_sender); 1422 if (message->log_ident) 1423 myfree(message->log_ident); 1424 if (message->rewrite_context) 1425 myfree(message->rewrite_context); 1426 recipient_list_free(&message->rcpt_list); 1427 qmgr_message_count--; 1428 myfree((char *) message); 1429 } 1430 1431 /* qmgr_message_alloc - create in-core message structure */ 1432 1433 QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id, 1434 int qflags, mode_t mode) 1435 { 1436 const char *myname = "qmgr_message_alloc"; 1437 QMGR_MESSAGE *message; 1438 1439 if (msg_verbose) 1440 msg_info("%s: %s %s", myname, queue_name, queue_id); 1441 1442 /* 1443 * Create an in-core message structure. 1444 */ 1445 message = qmgr_message_create(queue_name, queue_id, qflags); 1446 1447 /* 1448 * Extract message envelope information: time of arrival, sender address, 1449 * recipient addresses. Skip files with malformed envelope information. 1450 */ 1451 #define QMGR_LOCK_MODE (MYFLOCK_OP_EXCLUSIVE | MYFLOCK_OP_NOWAIT) 1452 1453 if (qmgr_message_open(message) < 0) { 1454 qmgr_message_free(message); 1455 return (0); 1456 } 1457 if (myflock(vstream_fileno(message->fp), INTERNAL_LOCK, QMGR_LOCK_MODE) < 0) { 1458 msg_info("%s: skipped, still being delivered", queue_id); 1459 qmgr_message_close(message); 1460 qmgr_message_free(message); 1461 return (QMGR_MESSAGE_LOCKED); 1462 } 1463 if (qmgr_message_read(message) < 0) { 1464 qmgr_message_close(message); 1465 qmgr_message_free(message); 1466 return (0); 1467 } else { 1468 1469 /* 1470 * We have validated the queue file content, so it is safe to modify 1471 * the file properties now. 1472 */ 1473 if (mode != 0 && fchmod(vstream_fileno(message->fp), mode) < 0) 1474 msg_fatal("fchmod %s: %m", VSTREAM_PATH(message->fp)); 1475 1476 /* 1477 * Reset the defer log. This code should not be here, but we must 1478 * reset the defer log *after* acquiring the exclusive lock on the 1479 * queue file and *before* resolving new recipients. Since all those 1480 * operations are encapsulated so nicely by this routine, the defer 1481 * log reset has to be done here as well. 1482 * 1483 * Note: it is safe to remove the defer logfile from a previous queue 1484 * run of this queue file, because the defer log contains information 1485 * about recipients that still exist in this queue file. 1486 */ 1487 if (mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT) 1488 msg_fatal("%s: %s: remove %s %s: %m", myname, 1489 queue_id, MAIL_QUEUE_DEFER, queue_id); 1490 qmgr_message_sort(message); 1491 qmgr_message_resolve(message); 1492 qmgr_message_sort(message); 1493 qmgr_message_assign(message); 1494 qmgr_message_close(message); 1495 if (message->rcpt_offset == 0) 1496 qmgr_message_move_limits(message); 1497 return (message); 1498 } 1499 } 1500 1501 /* qmgr_message_realloc - refresh in-core message structure */ 1502 1503 QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *message) 1504 { 1505 const char *myname = "qmgr_message_realloc"; 1506 1507 /* 1508 * Sanity checks. 1509 */ 1510 if (message->rcpt_offset <= 0) 1511 msg_panic("%s: invalid offset: %ld", myname, message->rcpt_offset); 1512 if (msg_verbose) 1513 msg_info("%s: %s %s offset %ld", myname, message->queue_name, 1514 message->queue_id, message->rcpt_offset); 1515 1516 /* 1517 * Extract recipient addresses. Skip files with malformed envelope 1518 * information. 1519 */ 1520 if (qmgr_message_open(message) < 0) 1521 return (0); 1522 if (qmgr_message_read(message) < 0) { 1523 qmgr_message_close(message); 1524 return (0); 1525 } else { 1526 qmgr_message_sort(message); 1527 qmgr_message_resolve(message); 1528 qmgr_message_sort(message); 1529 qmgr_message_assign(message); 1530 qmgr_message_close(message); 1531 if (message->rcpt_offset == 0) 1532 qmgr_message_move_limits(message); 1533 return (message); 1534 } 1535 } 1536