xref: /netbsd-src/external/ibm-public/postfix/dist/src/qmgr/qmgr_message.c (revision a24efa7dea9f1f56c3bdb15a927d3516792ace1c)
1 /*	$NetBSD: qmgr_message.c,v 1.1.1.4 2014/07/06 19:27:55 tron Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_message 3
6 /* SUMMARY
7 /*	in-core message structures
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	int	qmgr_message_count;
12 /*	int	qmgr_recipient_count;
13 /*
14 /*	QMGR_MESSAGE *qmgr_message_alloc(class, name, qflags, mode)
15 /*	const char *class;
16 /*	const char *name;
17 /*	int	qflags;
18 /*	mode_t	mode;
19 /*
20 /*	QMGR_MESSAGE *qmgr_message_realloc(message)
21 /*	QMGR_MESSAGE *message;
22 /*
23 /*	void	qmgr_message_free(message)
24 /*	QMGR_MESSAGE *message;
25 /*
26 /*	void	qmgr_message_update_warn(message)
27 /*	QMGR_MESSAGE *message;
28 /*
29 /*	void	qmgr_message_kill_record(message, offset)
30 /*	QMGR_MESSAGE *message;
31 /*	long	offset;
32 /* DESCRIPTION
33 /*	This module performs en-gross operations on queue messages.
34 /*
35 /*	qmgr_message_count is a global counter for the total number
36 /*	of in-core message structures (i.e. the total size of the
37 /*	`active' message queue).
38 /*
39 /*	qmgr_recipient_count is a global counter for the total number
40 /*	of in-core recipient structures (i.e. the sum of all recipients
41 /*	in all in-core message structures).
42 /*
43 /*	qmgr_message_alloc() creates an in-core message structure
44 /*	with sender and recipient information taken from the named queue
45 /*	file. A null result means the queue file could not be read or
46 /*	that the queue file contained incorrect information. A result
47 /*	QMGR_MESSAGE_LOCKED means delivery must be deferred. The number
48 /*	of recipients read from a queue file is limited by the global
49 /*	var_qmgr_rcpt_limit configuration parameter. When the limit
50 /*	is reached, the \fIrcpt_offset\fR structure member is set to
51 /*	the position where the read was terminated. Recipients are
52 /*	run through the resolver, and are assigned to destination
53 /*	queues. Recipients that cannot be assigned are deferred or
54 /*	bounced. Mail that has bounced twice is silently absorbed.
55 /*	A non-zero mode means change the queue file permissions.
56 /*
57 /*	qmgr_message_realloc() resumes reading recipients from the queue
58 /*	file, and updates the recipient list and \fIrcpt_offset\fR message
59 /*	structure members. A null result means that the file could not be
60 /*	read or that the file contained incorrect information. Recipient
61 /*	limit imposed this time is based on the position of the message
62 /*	job(s) on corresponding transport job list(s). It's considered
63 /*	an error to call this when the recipient slots can't be allocated.
64 /*
65 /*	qmgr_message_free() destroys an in-core message structure and makes
66 /*	the resources available for reuse. It is an error to destroy
67 /*	a message structure that is still referenced by queue entry structures.
68 /*
69 /*	qmgr_message_update_warn() takes a closed message, opens it, updates
70 /*	the warning field, and closes it again.
71 /*
72 /*	qmgr_message_kill_record() takes a closed message, opens it, updates
73 /*	the record type at the given offset to "killed", and closes the file.
74 /*	A killed envelope record is ignored. Killed records are not allowed
75 /*	inside the message content.
76 /* DIAGNOSTICS
77 /*	Warnings: malformed message file. Fatal errors: out of memory.
78 /* SEE ALSO
79 /*	envelope(3) message envelope parser
80 /* LICENSE
81 /* .ad
82 /* .fi
83 /*	The Secure Mailer license must be distributed with this software.
84 /* AUTHOR(S)
85 /*	Wietse Venema
86 /*	IBM T.J. Watson Research
87 /*	P.O. Box 704
88 /*	Yorktown Heights, NY 10598, USA
89 /*
90 /*	Preemptive scheduler enhancements:
91 /*	Patrik Rak
92 /*	Modra 6
93 /*	155 00, Prague, Czech Republic
94 /*--*/
95 
96 /* System library. */
97 
98 #include <sys_defs.h>
99 #include <sys/stat.h>
100 #include <stdlib.h>
101 #include <stdio.h>			/* sscanf() */
102 #include <fcntl.h>
103 #include <errno.h>
104 #include <unistd.h>
105 #include <string.h>
106 #include <ctype.h>
107 
108 #ifdef STRCASECMP_IN_STRINGS_H
109 #include <strings.h>
110 #endif
111 
112 /* Utility library. */
113 
114 #include <msg.h>
115 #include <mymalloc.h>
116 #include <vstring.h>
117 #include <vstream.h>
118 #include <split_at.h>
119 #include <valid_hostname.h>
120 #include <argv.h>
121 #include <stringops.h>
122 #include <myflock.h>
123 #include <sane_time.h>
124 
125 /* Global library. */
126 
127 #include <dict.h>
128 #include <mail_queue.h>
129 #include <mail_params.h>
130 #include <canon_addr.h>
131 #include <record.h>
132 #include <rec_type.h>
133 #include <sent.h>
134 #include <deliver_completed.h>
135 #include <opened.h>
136 #include <verp_sender.h>
137 #include <mail_proto.h>
138 #include <qmgr_user.h>
139 #include <split_addr.h>
140 #include <dsn_mask.h>
141 #include <rec_attr_map.h>
142 
143 /* Client stubs. */
144 
145 #include <rewrite_clnt.h>
146 #include <resolve_clnt.h>
147 
148 /* Application-specific. */
149 
150 #include "qmgr.h"
151 
152 int     qmgr_message_count;
153 int     qmgr_recipient_count;
154 
155 /* qmgr_message_create - create in-core message structure */
156 
157 static QMGR_MESSAGE *qmgr_message_create(const char *queue_name,
158 				           const char *queue_id, int qflags)
159 {
160     QMGR_MESSAGE *message;
161 
162     message = (QMGR_MESSAGE *) mymalloc(sizeof(QMGR_MESSAGE));
163     qmgr_message_count++;
164     message->flags = 0;
165     message->qflags = qflags;
166     message->tflags = 0;
167     message->tflags_offset = 0;
168     message->rflags = QMGR_READ_FLAG_DEFAULT;
169     message->fp = 0;
170     message->refcount = 0;
171     message->single_rcpt = 0;
172     message->arrival_time.tv_sec = message->arrival_time.tv_usec = 0;
173     message->create_time = 0;
174     GETTIMEOFDAY(&message->active_time);
175     message->queued_time = sane_time();
176     message->refill_time = 0;
177     message->data_offset = 0;
178     message->queue_id = mystrdup(queue_id);
179     message->queue_name = mystrdup(queue_name);
180     message->encoding = 0;
181     message->sender = 0;
182     message->dsn_envid = 0;
183     message->dsn_ret = 0;
184     message->filter_xport = 0;
185     message->inspect_xport = 0;
186     message->redirect_addr = 0;
187     message->data_size = 0;
188     message->cont_length = 0;
189     message->warn_offset = 0;
190     message->warn_time = 0;
191     message->rcpt_offset = 0;
192     message->verp_delims = 0;
193     message->client_name = 0;
194     message->client_addr = 0;
195     message->client_port = 0;
196     message->client_proto = 0;
197     message->client_helo = 0;
198     message->sasl_method = 0;
199     message->sasl_username = 0;
200     message->sasl_sender = 0;
201     message->log_ident = 0;
202     message->rewrite_context = 0;
203     recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
204     message->rcpt_count = 0;
205     message->rcpt_limit = var_qmgr_msg_rcpt_limit;
206     message->rcpt_unread = 0;
207     QMGR_LIST_INIT(message->job_list);
208     return (message);
209 }
210 
211 /* qmgr_message_close - close queue file */
212 
213 static void qmgr_message_close(QMGR_MESSAGE *message)
214 {
215     vstream_fclose(message->fp);
216     message->fp = 0;
217 }
218 
219 /* qmgr_message_open - open queue file */
220 
221 static int qmgr_message_open(QMGR_MESSAGE *message)
222 {
223 
224     /*
225      * Sanity check.
226      */
227     if (message->fp)
228 	msg_panic("%s: queue file is open", message->queue_id);
229 
230     /*
231      * Open this queue file. Skip files that we cannot open. Back off when
232      * the system appears to be running out of resources.
233      */
234     if ((message->fp = mail_queue_open(message->queue_name,
235 				       message->queue_id,
236 				       O_RDWR, 0)) == 0) {
237 	if (errno != ENOENT)
238 	    msg_fatal("open %s %s: %m", message->queue_name, message->queue_id);
239 	msg_warn("open %s %s: %m", message->queue_name, message->queue_id);
240 	return (-1);
241     }
242     return (0);
243 }
244 
245 /* qmgr_message_oldstyle_scan - support for Postfix < 1.0 queue files */
246 
247 static void qmgr_message_oldstyle_scan(QMGR_MESSAGE *message)
248 {
249     VSTRING *buf;
250     long    orig_offset, extra_offset;
251     int     rec_type;
252     char   *start;
253 
254     /*
255      * Initialize. No early returns or we have a memory leak.
256      */
257     buf = vstring_alloc(100);
258     if ((orig_offset = vstream_ftell(message->fp)) < 0)
259 	msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
260 
261     /*
262      * Rewind to the very beginning to make sure we see all records.
263      */
264     if (vstream_fseek(message->fp, 0, SEEK_SET) < 0)
265 	msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
266 
267     /*
268      * Scan through the old style queue file. Count the total number of
269      * recipients and find the data/extra sections offsets. Note that the new
270      * queue files require that data_size equals extra_offset - data_offset,
271      * so we set data_size to this as well and ignore the size record itself
272      * completely.
273      */
274     message->rcpt_unread = 0;
275     for (;;) {
276 	rec_type = rec_get(message->fp, buf, 0);
277 	if (rec_type <= 0)
278 	    /* Report missing end record later. */
279 	    break;
280 	start = vstring_str(buf);
281 	if (msg_verbose > 1)
282 	    msg_info("old-style scan record %c %s", rec_type, start);
283 	if (rec_type == REC_TYPE_END)
284 	    break;
285 	if (rec_type == REC_TYPE_DONE
286 	    || rec_type == REC_TYPE_RCPT
287 	    || rec_type == REC_TYPE_DRCP) {
288 	    message->rcpt_unread++;
289 	    continue;
290 	}
291 	if (rec_type == REC_TYPE_MESG) {
292 	    if (message->data_offset == 0) {
293 		if ((message->data_offset = vstream_ftell(message->fp)) < 0)
294 		    msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
295 		if ((extra_offset = atol(start)) <= message->data_offset)
296 		    msg_fatal("bad extra offset %s file %s",
297 			      start, VSTREAM_PATH(message->fp));
298 		if (vstream_fseek(message->fp, extra_offset, SEEK_SET) < 0)
299 		    msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
300 		message->data_size = extra_offset - message->data_offset;
301 	    }
302 	    continue;
303 	}
304     }
305 
306     /*
307      * Clean up.
308      */
309     if (vstream_fseek(message->fp, orig_offset, SEEK_SET) < 0)
310 	msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
311     vstring_free(buf);
312 
313     /*
314      * Sanity checks. Verify that all required information was found,
315      * including the queue file end marker.
316      */
317     if (message->data_offset == 0 || rec_type != REC_TYPE_END)
318 	msg_fatal("%s: envelope records out of order", message->queue_id);
319 }
320 
321 /* qmgr_message_read - read envelope records */
322 
323 static int qmgr_message_read(QMGR_MESSAGE *message)
324 {
325     VSTRING *buf;
326     int     rec_type;
327     long    curr_offset;
328     long    save_offset = message->rcpt_offset;	/* save a flag */
329     int     save_unread = message->rcpt_unread;	/* save a count */
330     char   *start;
331     int     recipient_limit;
332     const char *error_text;
333     char   *name;
334     char   *value;
335     char   *orig_rcpt = 0;
336     int     count;
337     int     dsn_notify = 0;
338     char   *dsn_orcpt = 0;
339     int     n;
340     int     have_log_client_attr = 0;
341 
342     /*
343      * Initialize. No early returns or we have a memory leak.
344      */
345     buf = vstring_alloc(100);
346 
347     /*
348      * If we re-open this file, skip over on-file recipient records that we
349      * already looked at, and refill the in-core recipient address list.
350      *
351      * For the first time, the message recipient limit is calculated from the
352      * global recipient limit. This is to avoid reading little recipients
353      * when the active queue is near empty. When the queue becomes full, only
354      * the necessary amount is read in core. Such priming is necessary
355      * because there are no message jobs yet.
356      *
357      * For the next time, the recipient limit is based solely on the message
358      * jobs' positions in the job lists and/or job stacks.
359      */
360     if (message->rcpt_offset) {
361 	if (message->rcpt_list.len)
362 	    msg_panic("%s: recipient list not empty on recipient reload",
363 		      message->queue_id);
364 	if (vstream_fseek(message->fp, message->rcpt_offset, SEEK_SET) < 0)
365 	    msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
366 	message->rcpt_offset = 0;
367 	recipient_limit = message->rcpt_limit - message->rcpt_count;
368     } else {
369 	recipient_limit = var_qmgr_rcpt_limit - qmgr_recipient_count;
370 	if (recipient_limit < message->rcpt_limit)
371 	    recipient_limit = message->rcpt_limit;
372     }
373     /* Keep interrupt latency in check. */
374     if (recipient_limit > 5000)
375 	recipient_limit = 5000;
376     if (recipient_limit <= 0)
377 	msg_panic("%s: no recipient slots available", message->queue_id);
378     if (msg_verbose)
379 	msg_info("%s: recipient limit %d", message->queue_id, recipient_limit);
380 
381     /*
382      * Read envelope records. XXX Rely on the front-end programs to enforce
383      * record size limits. Read up to recipient_limit recipients from the
384      * queue file, to protect against memory exhaustion. Recipient records
385      * may appear before or after the message content, so we keep reading
386      * from the queue file until we have enough recipients (rcpt_offset != 0)
387      * and until we know all the non-recipient information.
388      *
389      * Note that the total recipient count record is accurate only for fresh
390      * queue files. After some of the recipients are marked as done and the
391      * queue file is deferred, it can be used as upper bound estimate only.
392      * Fortunately, this poses no major problem on the scheduling algorithm,
393      * as the only impact is that the already deferred messages are not
394      * chosen by qmgr_job_candidate() as often as they could.
395      *
396      * On the first open, we must examine all non-recipient records.
397      *
398      * Optimization: when we know that recipient records are not mixed with
399      * non-recipient records, as is typical with mailing list mail, then we
400      * can avoid having to examine all the queue file records before we can
401      * start deliveries. This avoids some file system thrashing with huge
402      * mailing lists.
403      */
404     for (;;) {
405 	if ((curr_offset = vstream_ftell(message->fp)) < 0)
406 	    msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
407 	if (curr_offset == message->data_offset && curr_offset > 0) {
408 	    if (vstream_fseek(message->fp, message->data_size, SEEK_CUR) < 0)
409 		msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
410 	    curr_offset += message->data_size;
411 	}
412 	rec_type = rec_get_raw(message->fp, buf, 0, REC_FLAG_NONE);
413 	start = vstring_str(buf);
414 	if (msg_verbose > 1)
415 	    msg_info("record %c %s", rec_type, start);
416 	if (rec_type == REC_TYPE_PTR) {
417 	    if ((rec_type = rec_goto(message->fp, start)) == REC_TYPE_ERROR)
418 		break;
419 	    /* Need to update curr_offset after pointer jump. */
420 	    continue;
421 	}
422 	if (rec_type <= 0) {
423 	    msg_warn("%s: message rejected: missing end record",
424 		     message->queue_id);
425 	    break;
426 	}
427 	if (rec_type == REC_TYPE_END) {
428 	    message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT;
429 	    break;
430 	}
431 
432 	/*
433 	 * Map named attributes to pseudo record types, so that we don't have
434 	 * to pollute the queue file with records that are incompatible with
435 	 * past Postfix versions. Preferably, people should be able to back
436 	 * out from an upgrade without losing mail.
437 	 */
438 	if (rec_type == REC_TYPE_ATTR) {
439 	    if ((error_text = split_nameval(start, &name, &value)) != 0) {
440 		msg_warn("%s: ignoring bad attribute: %s: %.200s",
441 			 message->queue_id, error_text, start);
442 		rec_type = REC_TYPE_ERROR;
443 		break;
444 	    }
445 	    if ((n = rec_attr_map(name)) != 0) {
446 		start = value;
447 		rec_type = n;
448 	    }
449 	}
450 
451 	/*
452 	 * Process recipient records.
453 	 */
454 	if (rec_type == REC_TYPE_RCPT) {
455 	    /* See also below for code setting orig_rcpt etc. */
456 	    if (message->rcpt_offset == 0) {
457 		message->rcpt_unread--;
458 		recipient_list_add(&message->rcpt_list, curr_offset,
459 				   dsn_orcpt ? dsn_orcpt : "",
460 				   dsn_notify ? dsn_notify : 0,
461 				   orig_rcpt ? orig_rcpt : "", start);
462 		if (dsn_orcpt) {
463 		    myfree(dsn_orcpt);
464 		    dsn_orcpt = 0;
465 		}
466 		if (orig_rcpt) {
467 		    myfree(orig_rcpt);
468 		    orig_rcpt = 0;
469 		}
470 		if (dsn_notify)
471 		    dsn_notify = 0;
472 		if (message->rcpt_list.len >= recipient_limit) {
473 		    if ((message->rcpt_offset = vstream_ftell(message->fp)) < 0)
474 			msg_fatal("vstream_ftell %s: %m",
475 				  VSTREAM_PATH(message->fp));
476 		    if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT)
477 			/* We already examined all non-recipient records. */
478 			break;
479 		    if (message->rflags & QMGR_READ_FLAG_MIXED_RCPT_OTHER)
480 			/* Examine all remaining non-recipient records. */
481 			continue;
482 		    /* Optimizations for "pure recipient" record sections. */
483 		    if (curr_offset > message->data_offset) {
484 			/* We already examined all non-recipient records. */
485 			message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT;
486 			break;
487 		    }
488 		    /* Examine non-recipient records in extracted segment. */
489 		    if (vstream_fseek(message->fp, message->data_offset
490 				      + message->data_size, SEEK_SET) < 0)
491 			msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
492 		    continue;
493 		}
494 	    }
495 	    continue;
496 	}
497 	if (rec_type == REC_TYPE_DONE || rec_type == REC_TYPE_DRCP) {
498 	    if (message->rcpt_offset == 0) {
499 		message->rcpt_unread--;
500 		if (dsn_orcpt) {
501 		    myfree(dsn_orcpt);
502 		    dsn_orcpt = 0;
503 		}
504 		if (orig_rcpt) {
505 		    myfree(orig_rcpt);
506 		    orig_rcpt = 0;
507 		}
508 		if (dsn_notify)
509 		    dsn_notify = 0;
510 	    }
511 	    continue;
512 	}
513 	if (rec_type == REC_TYPE_DSN_ORCPT) {
514 	    /* See also above for code clearing dsn_orcpt. */
515 	    if (dsn_orcpt != 0) {
516 		msg_warn("%s: ignoring out-of-order DSN original recipient address <%.200s>",
517 			 message->queue_id, dsn_orcpt);
518 		myfree(dsn_orcpt);
519 		dsn_orcpt = 0;
520 	    }
521 	    if (message->rcpt_offset == 0)
522 		dsn_orcpt = mystrdup(start);
523 	    continue;
524 	}
525 	if (rec_type == REC_TYPE_DSN_NOTIFY) {
526 	    /* See also above for code clearing dsn_notify. */
527 	    if (dsn_notify != 0) {
528 		msg_warn("%s: ignoring out-of-order DSN notify flags <%d>",
529 			 message->queue_id, dsn_notify);
530 		dsn_notify = 0;
531 	    }
532 	    if (message->rcpt_offset == 0) {
533 		if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_NOTIFY_OK(n))
534 		    msg_warn("%s: ignoring malformed DSN notify flags <%.200s>",
535 			     message->queue_id, start);
536 		else
537 		    dsn_notify = n;
538 		continue;
539 	    }
540 	}
541 	if (rec_type == REC_TYPE_ORCP) {
542 	    /* See also above for code clearing orig_rcpt. */
543 	    if (orig_rcpt != 0) {
544 		msg_warn("%s: ignoring out-of-order original recipient <%.200s>",
545 			 message->queue_id, orig_rcpt);
546 		myfree(orig_rcpt);
547 		orig_rcpt = 0;
548 	    }
549 	    if (message->rcpt_offset == 0)
550 		orig_rcpt = mystrdup(start);
551 	    continue;
552 	}
553 
554 	/*
555 	 * Process non-recipient records.
556 	 */
557 	if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT)
558 	    /* We already examined all non-recipient records. */
559 	    continue;
560 	if (rec_type == REC_TYPE_SIZE) {
561 	    if (message->data_offset == 0) {
562 		if ((count = sscanf(start, "%ld %ld %d %d %ld",
563 				 &message->data_size, &message->data_offset,
564 				    &message->rcpt_unread, &message->rflags,
565 				    &message->cont_length)) >= 3) {
566 		    /* Postfix >= 1.0 (a.k.a. 20010228). */
567 		    if (message->data_offset <= 0 || message->data_size <= 0) {
568 			msg_warn("%s: invalid size record: %.100s",
569 				 message->queue_id, start);
570 			rec_type = REC_TYPE_ERROR;
571 			break;
572 		    }
573 		    if (message->rflags & ~QMGR_READ_FLAG_USER) {
574 			msg_warn("%s: invalid flags in size record: %.100s",
575 				 message->queue_id, start);
576 			rec_type = REC_TYPE_ERROR;
577 			break;
578 		    }
579 		} else if (count == 1) {
580 		    /* Postfix < 1.0 (a.k.a. 20010228). */
581 		    qmgr_message_oldstyle_scan(message);
582 		} else {
583 		    /* Can't happen. */
584 		    msg_warn("%s: message rejected: weird size record",
585 			     message->queue_id);
586 		    rec_type = REC_TYPE_ERROR;
587 		    break;
588 		}
589 	    }
590 	    /* Postfix < 2.4 compatibility. */
591 	    if (message->cont_length == 0) {
592 		message->cont_length = message->data_size;
593 	    } else if (message->cont_length < 0) {
594 		msg_warn("%s: invalid size record: %.100s",
595 			 message->queue_id, start);
596 		rec_type = REC_TYPE_ERROR;
597 		break;
598 	    }
599 	    continue;
600 	}
601 	if (rec_type == REC_TYPE_TIME) {
602 	    if (message->arrival_time.tv_sec == 0)
603 		REC_TYPE_TIME_SCAN(start, message->arrival_time);
604 	    continue;
605 	}
606 	if (rec_type == REC_TYPE_CTIME) {
607 	    if (message->create_time == 0)
608 		message->create_time = atol(start);
609 	    continue;
610 	}
611 	if (rec_type == REC_TYPE_FILT) {
612 	    if (message->filter_xport != 0)
613 		myfree(message->filter_xport);
614 	    message->filter_xport = mystrdup(start);
615 	    continue;
616 	}
617 	if (rec_type == REC_TYPE_INSP) {
618 	    if (message->inspect_xport != 0)
619 		myfree(message->inspect_xport);
620 	    message->inspect_xport = mystrdup(start);
621 	    continue;
622 	}
623 	if (rec_type == REC_TYPE_RDR) {
624 	    if (message->redirect_addr != 0)
625 		myfree(message->redirect_addr);
626 	    message->redirect_addr = mystrdup(start);
627 	    continue;
628 	}
629 	if (rec_type == REC_TYPE_FROM) {
630 	    if (message->sender == 0) {
631 		message->sender = mystrdup(start);
632 		opened(message->queue_id, message->sender,
633 		       message->cont_length, message->rcpt_unread,
634 		       "queue %s", message->queue_name);
635 	    }
636 	    continue;
637 	}
638 	if (rec_type == REC_TYPE_DSN_ENVID) {
639 	    if (message->dsn_envid == 0)
640 		message->dsn_envid = mystrdup(start);
641 	}
642 	if (rec_type == REC_TYPE_DSN_RET) {
643 	    if (message->dsn_ret == 0) {
644 		if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_RET_OK(n))
645 		    msg_warn("%s: ignoring malformed DSN RET flags in queue file record:%.100s",
646 			     message->queue_id, start);
647 		else
648 		    message->dsn_ret = n;
649 	    }
650 	}
651 	if (rec_type == REC_TYPE_ATTR) {
652 	    /* Allow extra segment to override envelope segment info. */
653 	    if (strcmp(name, MAIL_ATTR_ENCODING) == 0) {
654 		if (message->encoding != 0)
655 		    myfree(message->encoding);
656 		message->encoding = mystrdup(value);
657 	    }
658 
659 	    /*
660 	     * Backwards compatibility. Before Postfix 2.3, the logging
661 	     * attributes were called client_name, etc. Now they are called
662 	     * log_client_name. etc., and client_name is used for the actual
663 	     * client information. To support old queue files we accept both
664 	     * names for the purpose of logging; the new name overrides the
665 	     * old one.
666 	     *
667 	     * XXX Do not use the "legacy" client_name etc. attribute values for
668 	     * initializing the logging attributes, when this file already
669 	     * contains the "modern" log_client_name etc. logging attributes.
670 	     * Otherwise, logging attributes that are not present in the
671 	     * queue file would be set with information from the real client.
672 	     */
673 	    else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_NAME) == 0) {
674 		if (have_log_client_attr == 0 && message->client_name == 0)
675 		    message->client_name = mystrdup(value);
676 	    } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_ADDR) == 0) {
677 		if (have_log_client_attr == 0 && message->client_addr == 0)
678 		    message->client_addr = mystrdup(value);
679 	    } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_PORT) == 0) {
680 		if (have_log_client_attr == 0 && message->client_port == 0)
681 		    message->client_port = mystrdup(value);
682 	    } else if (strcmp(name, MAIL_ATTR_ACT_PROTO_NAME) == 0) {
683 		if (have_log_client_attr == 0 && message->client_proto == 0)
684 		    message->client_proto = mystrdup(value);
685 	    } else if (strcmp(name, MAIL_ATTR_ACT_HELO_NAME) == 0) {
686 		if (have_log_client_attr == 0 && message->client_helo == 0)
687 		    message->client_helo = mystrdup(value);
688 	    }
689 	    /* Original client attributes. */
690 	    else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_NAME) == 0) {
691 		if (message->client_name != 0)
692 		    myfree(message->client_name);
693 		message->client_name = mystrdup(value);
694 		have_log_client_attr = 1;
695 	    } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_ADDR) == 0) {
696 		if (message->client_addr != 0)
697 		    myfree(message->client_addr);
698 		message->client_addr = mystrdup(value);
699 		have_log_client_attr = 1;
700 	    } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_PORT) == 0) {
701 		if (message->client_port != 0)
702 		    myfree(message->client_port);
703 		message->client_port = mystrdup(value);
704 		have_log_client_attr = 1;
705 	    } else if (strcmp(name, MAIL_ATTR_LOG_PROTO_NAME) == 0) {
706 		if (message->client_proto != 0)
707 		    myfree(message->client_proto);
708 		message->client_proto = mystrdup(value);
709 		have_log_client_attr = 1;
710 	    } else if (strcmp(name, MAIL_ATTR_LOG_HELO_NAME) == 0) {
711 		if (message->client_helo != 0)
712 		    myfree(message->client_helo);
713 		message->client_helo = mystrdup(value);
714 		have_log_client_attr = 1;
715 	    } else if (strcmp(name, MAIL_ATTR_SASL_METHOD) == 0) {
716 		if (message->sasl_method == 0)
717 		    message->sasl_method = mystrdup(value);
718 		else
719 		    msg_warn("%s: ignoring multiple %s attribute: %s",
720 			   message->queue_id, MAIL_ATTR_SASL_METHOD, value);
721 	    } else if (strcmp(name, MAIL_ATTR_SASL_USERNAME) == 0) {
722 		if (message->sasl_username == 0)
723 		    message->sasl_username = mystrdup(value);
724 		else
725 		    msg_warn("%s: ignoring multiple %s attribute: %s",
726 			 message->queue_id, MAIL_ATTR_SASL_USERNAME, value);
727 	    } else if (strcmp(name, MAIL_ATTR_SASL_SENDER) == 0) {
728 		if (message->sasl_sender == 0)
729 		    message->sasl_sender = mystrdup(value);
730 		else
731 		    msg_warn("%s: ignoring multiple %s attribute: %s",
732 			   message->queue_id, MAIL_ATTR_SASL_SENDER, value);
733 	    } else if (strcmp(name, MAIL_ATTR_LOG_IDENT) == 0) {
734 		if (message->log_ident == 0)
735 		    message->log_ident = mystrdup(value);
736 		else
737 		    msg_warn("%s: ignoring multiple %s attribute: %s",
738 			     message->queue_id, MAIL_ATTR_LOG_IDENT, value);
739 	    } else if (strcmp(name, MAIL_ATTR_RWR_CONTEXT) == 0) {
740 		if (message->rewrite_context == 0)
741 		    message->rewrite_context = mystrdup(value);
742 		else
743 		    msg_warn("%s: ignoring multiple %s attribute: %s",
744 			   message->queue_id, MAIL_ATTR_RWR_CONTEXT, value);
745 	    }
746 
747 	    /*
748 	     * Optional tracing flags (verify, sendmail -v, sendmail -bv).
749 	     * This record is killed after a trace logfile report is sent and
750 	     * after the logfile is deleted.
751 	     */
752 	    else if (strcmp(name, MAIL_ATTR_TRACE_FLAGS) == 0) {
753 		message->tflags = DEL_REQ_TRACE_FLAGS(atoi(value));
754 		if (message->tflags == DEL_REQ_FLAG_RECORD)
755 		    message->tflags_offset = curr_offset;
756 		else
757 		    message->tflags_offset = 0;
758 	    }
759 	    continue;
760 	}
761 	if (rec_type == REC_TYPE_WARN) {
762 	    if (message->warn_offset == 0) {
763 		message->warn_offset = curr_offset;
764 		REC_TYPE_WARN_SCAN(start, message->warn_time);
765 	    }
766 	    continue;
767 	}
768 	if (rec_type == REC_TYPE_VERP) {
769 	    if (message->verp_delims == 0) {
770 		if (message->sender == 0 || message->sender[0] == 0) {
771 		    msg_warn("%s: ignoring VERP request for null sender",
772 			     message->queue_id);
773 		} else if (verp_delims_verify(start) != 0) {
774 		    msg_warn("%s: ignoring bad VERP request: \"%.100s\"",
775 			     message->queue_id, start);
776 		} else {
777 		    if (msg_verbose)
778 			msg_info("%s: enabling VERP for sender \"%.100s\"",
779 				 message->queue_id, message->sender);
780 		    message->single_rcpt = 1;
781 		    message->verp_delims = mystrdup(start);
782 		}
783 	    }
784 	    continue;
785 	}
786     }
787 
788     /*
789      * Grr.
790      */
791     if (dsn_orcpt != 0) {
792 	if (rec_type > 0)
793 	    msg_warn("%s: ignoring out-of-order DSN original recipient <%.200s>",
794 		     message->queue_id, dsn_orcpt);
795 	myfree(dsn_orcpt);
796     }
797     if (orig_rcpt != 0) {
798 	if (rec_type > 0)
799 	    msg_warn("%s: ignoring out-of-order original recipient <%.200s>",
800 		     message->queue_id, orig_rcpt);
801 	myfree(orig_rcpt);
802     }
803 
804     /*
805      * Remember when we have read the last recipient batch. Note that we do
806      * it here after reading as reading might have used considerable amount
807      * of time.
808      */
809     message->refill_time = sane_time();
810 
811     /*
812      * Avoid clumsiness elsewhere in the program. When sending data across an
813      * IPC channel, sending an empty string is more convenient than sending a
814      * null pointer.
815      */
816     if (message->dsn_envid == 0)
817 	message->dsn_envid = mystrdup("");
818     if (message->encoding == 0)
819 	message->encoding = mystrdup(MAIL_ATTR_ENC_NONE);
820     if (message->client_name == 0)
821 	message->client_name = mystrdup("");
822     if (message->client_addr == 0)
823 	message->client_addr = mystrdup("");
824     if (message->client_port == 0)
825 	message->client_port = mystrdup("");
826     if (message->client_proto == 0)
827 	message->client_proto = mystrdup("");
828     if (message->client_helo == 0)
829 	message->client_helo = mystrdup("");
830     if (message->sasl_method == 0)
831 	message->sasl_method = mystrdup("");
832     if (message->sasl_username == 0)
833 	message->sasl_username = mystrdup("");
834     if (message->sasl_sender == 0)
835 	message->sasl_sender = mystrdup("");
836     if (message->log_ident == 0)
837 	message->log_ident = mystrdup("");
838     if (message->rewrite_context == 0)
839 	message->rewrite_context = mystrdup(MAIL_ATTR_RWR_LOCAL);
840     /* Postfix < 2.3 compatibility. */
841     if (message->create_time == 0)
842 	message->create_time = message->arrival_time.tv_sec;
843 
844     /*
845      * Clean up.
846      */
847     vstring_free(buf);
848 
849     /*
850      * Sanity checks. Verify that all required information was found,
851      * including the queue file end marker.
852      */
853     if (message->rcpt_unread < 0
854 	|| (message->rcpt_offset == 0 && message->rcpt_unread != 0)) {
855 	msg_warn("%s: rcpt count mismatch (%d)",
856 		 message->queue_id, message->rcpt_unread);
857 	message->rcpt_unread = 0;
858     }
859     if (rec_type <= 0) {
860 	/* Already logged warning. */
861     } else if (message->arrival_time.tv_sec == 0) {
862 	msg_warn("%s: message rejected: missing arrival time record",
863 		 message->queue_id);
864     } else if (message->sender == 0) {
865 	msg_warn("%s: message rejected: missing sender record",
866 		 message->queue_id);
867     } else if (message->data_offset == 0) {
868 	msg_warn("%s: message rejected: missing size record",
869 		 message->queue_id);
870     } else {
871 	return (0);
872     }
873     message->rcpt_offset = save_offset;		/* restore flag */
874     message->rcpt_unread = save_unread;		/* restore count */
875     recipient_list_free(&message->rcpt_list);
876     recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
877     return (-1);
878 }
879 
880 /* qmgr_message_update_warn - update the time of next delay warning */
881 
882 void    qmgr_message_update_warn(QMGR_MESSAGE *message)
883 {
884 
885     /*
886      * XXX eventually this should let us schedule multiple warnings, right
887      * now it just allows for one.
888      */
889     if (qmgr_message_open(message)
890 	|| vstream_fseek(message->fp, message->warn_offset, SEEK_SET) < 0
891 	|| rec_fprintf(message->fp, REC_TYPE_WARN, REC_TYPE_WARN_FORMAT,
892 		       REC_TYPE_WARN_ARG(0)) < 0
893 	|| vstream_fflush(message->fp))
894 	msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp));
895     qmgr_message_close(message);
896 }
897 
898 /* qmgr_message_kill_record - mark one message record as killed */
899 
900 void    qmgr_message_kill_record(QMGR_MESSAGE *message, long offset)
901 {
902     if (offset <= 0)
903 	msg_panic("qmgr_message_kill_record: bad offset 0x%lx", offset);
904     if (qmgr_message_open(message)
905 	|| rec_put_type(message->fp, REC_TYPE_KILL, offset) < 0
906 	|| vstream_fflush(message->fp))
907 	msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp));
908     qmgr_message_close(message);
909 }
910 
911 /* qmgr_message_sort_compare - compare recipient information */
912 
913 static int qmgr_message_sort_compare(const void *p1, const void *p2)
914 {
915     RECIPIENT *rcpt1 = (RECIPIENT *) p1;
916     RECIPIENT *rcpt2 = (RECIPIENT *) p2;
917     QMGR_QUEUE *queue1;
918     QMGR_QUEUE *queue2;
919     char   *at1;
920     char   *at2;
921     int     result;
922 
923     /*
924      * Compare most significant to least significant recipient attributes.
925      * The comparison function must be transitive, so NULL values need to be
926      * assigned an ordinal (we set NULL last).
927      */
928 
929     queue1 = rcpt1->u.queue;
930     queue2 = rcpt2->u.queue;
931     if (queue1 != 0 && queue2 == 0)
932 	return (-1);
933     if (queue1 == 0 && queue2 != 0)
934 	return (1);
935     if (queue1 != 0 && queue2 != 0) {
936 
937 	/*
938 	 * Compare message transport.
939 	 */
940 	if ((result = strcmp(queue1->transport->name,
941 			     queue2->transport->name)) != 0)
942 	    return (result);
943 
944 	/*
945 	 * Compare queue name (nexthop or recipient@nexthop).
946 	 */
947 	if ((result = strcmp(queue1->name, queue2->name)) != 0)
948 	    return (result);
949     }
950 
951     /*
952      * Compare recipient domain.
953      */
954     at1 = strrchr(rcpt1->address, '@');
955     at2 = strrchr(rcpt2->address, '@');
956     if (at1 == 0 && at2 != 0)
957 	return (1);
958     if (at1 != 0 && at2 == 0)
959 	return (-1);
960     if (at1 != 0 && at2 != 0
961 	&& (result = strcasecmp(at1, at2)) != 0)
962 	return (result);
963 
964     /*
965      * Compare recipient address.
966      */
967     return (strcmp(rcpt1->address, rcpt2->address));
968 }
969 
970 /* qmgr_message_sort - sort message recipient addresses by domain */
971 
972 static void qmgr_message_sort(QMGR_MESSAGE *message)
973 {
974     qsort((char *) message->rcpt_list.info, message->rcpt_list.len,
975 	  sizeof(message->rcpt_list.info[0]), qmgr_message_sort_compare);
976     if (msg_verbose) {
977 	RECIPIENT_LIST list = message->rcpt_list;
978 	RECIPIENT *rcpt;
979 
980 	msg_info("start sorted recipient list");
981 	for (rcpt = list.info; rcpt < list.info + list.len; rcpt++)
982 	    msg_info("qmgr_message_sort: %s", rcpt->address);
983 	msg_info("end sorted recipient list");
984     }
985 }
986 
987 /* qmgr_resolve_one - resolve or skip one recipient */
988 
989 static int qmgr_resolve_one(QMGR_MESSAGE *message, RECIPIENT *recipient,
990 			            const char *addr, RESOLVE_REPLY *reply)
991 {
992 #define QMGR_REDIRECT(rp, tp, np) do { \
993 	(rp)->flags = 0; \
994 	vstring_strcpy((rp)->transport, (tp)); \
995 	vstring_strcpy((rp)->nexthop, (np)); \
996     } while (0)
997 
998     if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) == 0)
999 	resolve_clnt_query_from(message->sender, addr, reply);
1000     else
1001 	resolve_clnt_verify_from(message->sender, addr, reply);
1002     if (reply->flags & RESOLVE_FLAG_FAIL) {
1003 	QMGR_REDIRECT(reply, MAIL_SERVICE_RETRY,
1004 		      "4.3.0 address resolver failure");
1005 	return (0);
1006     } else if (reply->flags & RESOLVE_FLAG_ERROR) {
1007 	QMGR_REDIRECT(reply, MAIL_SERVICE_ERROR,
1008 		      "5.1.3 bad address syntax");
1009 	return (0);
1010     } else {
1011 	return (0);
1012     }
1013 }
1014 
1015 /* qmgr_message_resolve - resolve recipients */
1016 
1017 static void qmgr_message_resolve(QMGR_MESSAGE *message)
1018 {
1019     static ARGV *defer_xport_argv;
1020     RECIPIENT_LIST list = message->rcpt_list;
1021     RECIPIENT *recipient;
1022     QMGR_TRANSPORT *transport = 0;
1023     QMGR_QUEUE *queue = 0;
1024     RESOLVE_REPLY reply;
1025     VSTRING *queue_name;
1026     char   *at;
1027     char  **cpp;
1028     char   *nexthop;
1029     ssize_t len;
1030     int     status;
1031     DSN     dsn;
1032     MSG_STATS stats;
1033     DSN    *saved_dsn;
1034 
1035 #define STREQ(x,y)	(strcmp(x,y) == 0)
1036 #define STR		vstring_str
1037 #define LEN		VSTRING_LEN
1038 
1039     resolve_clnt_init(&reply);
1040     queue_name = vstring_alloc(1);
1041     for (recipient = list.info; recipient < list.info + list.len; recipient++) {
1042 
1043 	/*
1044 	 * Redirect overrides all else. But only once (per entire message).
1045 	 * For consistency with the remainder of Postfix, rewrite the address
1046 	 * to canonical form before resolving it.
1047 	 */
1048 	if (message->redirect_addr) {
1049 	    if (recipient > list.info) {
1050 		recipient->u.queue = 0;
1051 		continue;
1052 	    }
1053 	    message->rcpt_offset = 0;
1054 	    message->rcpt_unread = 0;
1055 
1056 	    rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr,
1057 				  reply.recipient);
1058 	    RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
1059 	    if (qmgr_resolve_one(message, recipient,
1060 				 recipient->address, &reply) < 0)
1061 		continue;
1062 	    if (!STREQ(recipient->address, STR(reply.recipient)))
1063 		RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
1064 	}
1065 
1066 	/*
1067 	 * Content filtering overrides the address resolver.
1068 	 *
1069 	 * XXX Bypass content_filter inspection for user-generated probes
1070 	 * (sendmail -bv). MTA-generated probes never have the "please filter
1071 	 * me" bits turned on, but we handle them here anyway for the sake of
1072 	 * future proofing.
1073 	 */
1074 #define FILTER_WITHOUT_NEXTHOP(filter, next) \
1075 	(((next) = split_at((filter), ':')) == 0 || *(next) == 0)
1076 
1077 #define RCPT_WITHOUT_DOMAIN(rcpt, next) \
1078 	((next = strrchr(rcpt, '@')) == 0 || *++(next) == 0)
1079 
1080 	else if (message->filter_xport
1081 		 && (message->tflags & DEL_REQ_TRACE_ONLY_MASK) == 0) {
1082 	    reply.flags = 0;
1083 	    vstring_strcpy(reply.transport, message->filter_xport);
1084 	    if (FILTER_WITHOUT_NEXTHOP(STR(reply.transport), nexthop)
1085 		&& *(nexthop = var_def_filter_nexthop) == 0
1086 		&& RCPT_WITHOUT_DOMAIN(recipient->address, nexthop))
1087 		nexthop = var_myhostname;
1088 	    vstring_strcpy(reply.nexthop, nexthop);
1089 	    vstring_strcpy(reply.recipient, recipient->address);
1090 	}
1091 
1092 	/*
1093 	 * Resolve the destination to (transport, nexthop, address). The
1094 	 * result address may differ from the one specified by the sender.
1095 	 */
1096 	else {
1097 	    if (qmgr_resolve_one(message, recipient,
1098 				 recipient->address, &reply) < 0)
1099 		continue;
1100 	    if (!STREQ(recipient->address, STR(reply.recipient)))
1101 		RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
1102 	}
1103 
1104 	/*
1105 	 * Bounce null recipients. This should never happen, but is most
1106 	 * likely the result of a fault in a different program, so aborting
1107 	 * the queue manager process does not help.
1108 	 */
1109 	if (recipient->address[0] == 0) {
1110 	    QMGR_REDIRECT(&reply, MAIL_SERVICE_ERROR,
1111 			  "5.1.3 null recipient address");
1112 	}
1113 
1114 	/*
1115 	 * Discard mail to the local double bounce address here, so this
1116 	 * system can run without a local delivery agent. They'd still have
1117 	 * to configure something for mail directed to the local postmaster,
1118 	 * though, but that is an RFC requirement anyway.
1119 	 *
1120 	 * XXX This lookup should be done in the resolver, and the mail should
1121 	 * be directed to a general-purpose null delivery agent.
1122 	 */
1123 	if (reply.flags & RESOLVE_CLASS_LOCAL) {
1124 	    at = strrchr(STR(reply.recipient), '@');
1125 	    len = (at ? (at - STR(reply.recipient))
1126 		   : strlen(STR(reply.recipient)));
1127 	    if (strncasecmp(STR(reply.recipient), var_double_bounce_sender,
1128 			    len) == 0
1129 		&& !var_double_bounce_sender[len]) {
1130 		status = sent(message->tflags, message->queue_id,
1131 			      QMGR_MSG_STATS(&stats, message), recipient,
1132 			      "none", DSN_SIMPLE(&dsn, "2.0.0",
1133 			"undeliverable postmaster notification discarded"));
1134 		if (status == 0) {
1135 		    deliver_completed(message->fp, recipient->offset);
1136 #if 0
1137 		    /* It's the default verification probe sender address. */
1138 		    msg_warn("%s: undeliverable postmaster notification discarded",
1139 			     message->queue_id);
1140 #endif
1141 		} else
1142 		    message->flags |= status;
1143 		continue;
1144 	    }
1145 	}
1146 
1147 	/*
1148 	 * Optionally defer deliveries over specific transports, unless the
1149 	 * restriction is lifted temporarily.
1150 	 */
1151 	if (*var_defer_xports && (message->qflags & QMGR_FLUSH_DFXP) == 0) {
1152 	    if (defer_xport_argv == 0)
1153 		defer_xport_argv = argv_split(var_defer_xports, " \t\r\n,");
1154 	    for (cpp = defer_xport_argv->argv; *cpp; cpp++)
1155 		if (strcmp(*cpp, STR(reply.transport)) == 0)
1156 		    break;
1157 	    if (*cpp) {
1158 		QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY,
1159 			      "4.3.2 deferred transport");
1160 	    }
1161 	}
1162 
1163 	/*
1164 	 * Look up or instantiate the proper transport.
1165 	 */
1166 	if (transport == 0 || !STREQ(transport->name, STR(reply.transport))) {
1167 	    if ((transport = qmgr_transport_find(STR(reply.transport))) == 0)
1168 		transport = qmgr_transport_create(STR(reply.transport));
1169 	    queue = 0;
1170 	}
1171 
1172 	/*
1173 	 * This message is being flushed. If need-be unthrottle the
1174 	 * transport.
1175 	 */
1176 	if ((message->qflags & QMGR_FLUSH_EACH) != 0
1177 	    && QMGR_TRANSPORT_THROTTLED(transport))
1178 	    qmgr_transport_unthrottle(transport);
1179 
1180 	/*
1181 	 * This transport is dead. Defer delivery to this recipient.
1182 	 */
1183 	if (QMGR_TRANSPORT_THROTTLED(transport)) {
1184 	    saved_dsn = transport->dsn;
1185 	    if ((transport = qmgr_error_transport(MAIL_SERVICE_RETRY)) != 0) {
1186 		nexthop = qmgr_error_nexthop(saved_dsn);
1187 		vstring_strcpy(reply.nexthop, nexthop);
1188 		myfree(nexthop);
1189 		queue = 0;
1190 	    } else {
1191 		qmgr_defer_recipient(message, recipient, saved_dsn);
1192 		continue;
1193 	    }
1194 	}
1195 
1196 	/*
1197 	 * The nexthop destination provides the default name for the
1198 	 * per-destination queue. When the delivery agent accepts only one
1199 	 * recipient per delivery, give each recipient its own queue, so that
1200 	 * deliveries to different recipients of the same message can happen
1201 	 * in parallel, and so that we can enforce per-recipient concurrency
1202 	 * limits and prevent one recipient from tying up all the delivery
1203 	 * agent resources. We use recipient@nexthop as queue name rather
1204 	 * than the actual recipient domain name, so that one recipient in
1205 	 * multiple equivalent domains cannot evade the per-recipient
1206 	 * concurrency limit. Split the address on the recipient delimiter if
1207 	 * one is defined, so that extended addresses don't get extra
1208 	 * delivery slots.
1209 	 *
1210 	 * Fold the result to lower case so that we don't have multiple queues
1211 	 * for the same name.
1212 	 *
1213 	 * Important! All recipients in a queue must have the same nexthop
1214 	 * value. It is OK to have multiple queues with the same nexthop
1215 	 * value, but only when those queues are named after recipients.
1216 	 *
1217 	 * The single-recipient code below was written for local(8) like
1218 	 * delivery agents, and assumes that all domains that deliver to the
1219 	 * same (transport + nexthop) are aliases for $nexthop. Delivery
1220 	 * concurrency is changed from per-domain into per-recipient, by
1221 	 * changing the queue name from nexthop into localpart@nexthop.
1222 	 *
1223 	 * XXX This assumption is incorrect when different destinations share
1224 	 * the same (transport + nexthop). In reality, such transports are
1225 	 * rarely configured to use single-recipient deliveries. The fix is
1226 	 * to decouple the per-destination recipient limit from the
1227 	 * per-destination concurrency.
1228 	 */
1229 	vstring_strcpy(queue_name, STR(reply.nexthop));
1230 	if (strcmp(transport->name, MAIL_SERVICE_ERROR) != 0
1231 	    && strcmp(transport->name, MAIL_SERVICE_RETRY) != 0
1232 	    && transport->recipient_limit == 1) {
1233 	    /* Copy the recipient localpart. */
1234 	    at = strrchr(STR(reply.recipient), '@');
1235 	    len = (at ? (at - STR(reply.recipient))
1236 		   : strlen(STR(reply.recipient)));
1237 	    vstring_strncpy(queue_name, STR(reply.recipient), len);
1238 	    /* Remove the address extension from the recipient localpart. */
1239 	    if (*var_rcpt_delim && split_addr(STR(queue_name), var_rcpt_delim))
1240 		vstring_truncate(queue_name, strlen(STR(queue_name)));
1241 	    /* Assume the recipient domain is equivalent to nexthop. */
1242 	    vstring_sprintf_append(queue_name, "@%s", STR(reply.nexthop));
1243 	}
1244 	lowercase(STR(queue_name));
1245 
1246 	/*
1247 	 * This transport is alive. Find or instantiate a queue for this
1248 	 * recipient.
1249 	 */
1250 	if (queue == 0 || !STREQ(queue->name, STR(queue_name))) {
1251 	    if ((queue = qmgr_queue_find(transport, STR(queue_name))) == 0)
1252 		queue = qmgr_queue_create(transport, STR(queue_name),
1253 					  STR(reply.nexthop));
1254 	}
1255 
1256 	/*
1257 	 * This message is being flushed. If need-be unthrottle the queue.
1258 	 */
1259 	if ((message->qflags & QMGR_FLUSH_EACH) != 0
1260 	    && QMGR_QUEUE_THROTTLED(queue))
1261 	    qmgr_queue_unthrottle(queue);
1262 
1263 	/*
1264 	 * This queue is dead. Defer delivery to this recipient.
1265 	 */
1266 	if (QMGR_QUEUE_THROTTLED(queue)) {
1267 	    saved_dsn = queue->dsn;
1268 	    if ((queue = qmgr_error_queue(MAIL_SERVICE_RETRY, saved_dsn)) == 0) {
1269 		qmgr_defer_recipient(message, recipient, saved_dsn);
1270 		continue;
1271 	    }
1272 	}
1273 
1274 	/*
1275 	 * This queue is alive. Bind this recipient to this queue instance.
1276 	 */
1277 	recipient->u.queue = queue;
1278     }
1279     resolve_clnt_free(&reply);
1280     vstring_free(queue_name);
1281 }
1282 
1283 /* qmgr_message_assign - assign recipients to specific delivery requests */
1284 
1285 static void qmgr_message_assign(QMGR_MESSAGE *message)
1286 {
1287     RECIPIENT_LIST list = message->rcpt_list;
1288     RECIPIENT *recipient;
1289     QMGR_ENTRY *entry = 0;
1290     QMGR_QUEUE *queue;
1291     QMGR_JOB *job = 0;
1292     QMGR_PEER *peer = 0;
1293 
1294     /*
1295      * Try to bundle as many recipients in a delivery request as we can. When
1296      * the recipient resolves to the same site and transport as an existing
1297      * recipient, do not create a new queue entry, just move that recipient
1298      * to the recipient list of the existing queue entry. All this provided
1299      * that we do not exceed the transport-specific limit on the number of
1300      * recipients per transaction.
1301      */
1302 #define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit)))
1303 
1304     for (recipient = list.info; recipient < list.info + list.len; recipient++) {
1305 
1306 	/*
1307 	 * Skip recipients with a dead transport or destination.
1308 	 */
1309 	if ((queue = recipient->u.queue) == 0)
1310 	    continue;
1311 
1312 	/*
1313 	 * Lookup or instantiate the message job if necessary.
1314 	 */
1315 	if (job == 0 || queue->transport != job->transport) {
1316 	    job = qmgr_job_obtain(message, queue->transport);
1317 	    peer = 0;
1318 	}
1319 
1320 	/*
1321 	 * Lookup or instantiate job peer if necessary.
1322 	 */
1323 	if (peer == 0 || queue != peer->queue)
1324 	    peer = qmgr_peer_obtain(job, queue);
1325 
1326 	/*
1327 	 * Lookup old or instantiate new recipient entry. We try to reuse the
1328 	 * last existing entry whenever the recipient limit permits.
1329 	 */
1330 	entry = peer->entry_list.prev;
1331 	if (message->single_rcpt || entry == 0
1332 	    || !LIMIT_OK(queue->transport->recipient_limit, entry->rcpt_list.len))
1333 	    entry = qmgr_entry_create(peer, message);
1334 
1335 	/*
1336 	 * Add the recipient to the current entry and increase all those
1337 	 * recipient counters accordingly.
1338 	 */
1339 	recipient_list_add(&entry->rcpt_list, recipient->offset,
1340 			   recipient->dsn_orcpt, recipient->dsn_notify,
1341 			   recipient->orig_addr, recipient->address);
1342 	job->rcpt_count++;
1343 	message->rcpt_count++;
1344 	qmgr_recipient_count++;
1345     }
1346 
1347     /*
1348      * Release the message recipient list and reinitialize it for the next
1349      * time.
1350      */
1351     recipient_list_free(&message->rcpt_list);
1352     recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
1353 
1354     /*
1355      * Note that even if qmgr_job_obtain() reset the job candidate cache of
1356      * all transports to which we assigned new recipients, this message may
1357      * have other jobs which we didn't touch at all this time. But the number
1358      * of unread recipients affecting the candidate selection might have
1359      * changed considerably, so we must invalidate the caches if it might be
1360      * of some use.
1361      */
1362     for (job = message->job_list.next; job; job = job->message_peers.next)
1363 	if (job->selected_entries < job->read_entries
1364 	    && job->blocker_tag != job->transport->blocker_tag)
1365 	    job->transport->candidate_cache_current = 0;
1366 }
1367 
1368 /* qmgr_message_move_limits - recycle unused recipient slots */
1369 
1370 static void qmgr_message_move_limits(QMGR_MESSAGE *message)
1371 {
1372     QMGR_JOB *job;
1373 
1374     for (job = message->job_list.next; job; job = job->message_peers.next)
1375 	qmgr_job_move_limits(job);
1376 }
1377 
1378 /* qmgr_message_free - release memory for in-core message structure */
1379 
1380 void    qmgr_message_free(QMGR_MESSAGE *message)
1381 {
1382     QMGR_JOB *job;
1383 
1384     if (message->refcount != 0)
1385 	msg_panic("qmgr_message_free: reference len: %d", message->refcount);
1386     if (message->fp)
1387 	msg_panic("qmgr_message_free: queue file is open");
1388     while ((job = message->job_list.next) != 0)
1389 	qmgr_job_free(job);
1390     myfree(message->queue_id);
1391     myfree(message->queue_name);
1392     if (message->dsn_envid)
1393 	myfree(message->dsn_envid);
1394     if (message->encoding)
1395 	myfree(message->encoding);
1396     if (message->sender)
1397 	myfree(message->sender);
1398     if (message->verp_delims)
1399 	myfree(message->verp_delims);
1400     if (message->filter_xport)
1401 	myfree(message->filter_xport);
1402     if (message->inspect_xport)
1403 	myfree(message->inspect_xport);
1404     if (message->redirect_addr)
1405 	myfree(message->redirect_addr);
1406     if (message->client_name)
1407 	myfree(message->client_name);
1408     if (message->client_addr)
1409 	myfree(message->client_addr);
1410     if (message->client_port)
1411 	myfree(message->client_port);
1412     if (message->client_proto)
1413 	myfree(message->client_proto);
1414     if (message->client_helo)
1415 	myfree(message->client_helo);
1416     if (message->sasl_method)
1417 	myfree(message->sasl_method);
1418     if (message->sasl_username)
1419 	myfree(message->sasl_username);
1420     if (message->sasl_sender)
1421 	myfree(message->sasl_sender);
1422     if (message->log_ident)
1423 	myfree(message->log_ident);
1424     if (message->rewrite_context)
1425 	myfree(message->rewrite_context);
1426     recipient_list_free(&message->rcpt_list);
1427     qmgr_message_count--;
1428     myfree((char *) message);
1429 }
1430 
1431 /* qmgr_message_alloc - create in-core message structure */
1432 
1433 QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id,
1434 				         int qflags, mode_t mode)
1435 {
1436     const char *myname = "qmgr_message_alloc";
1437     QMGR_MESSAGE *message;
1438 
1439     if (msg_verbose)
1440 	msg_info("%s: %s %s", myname, queue_name, queue_id);
1441 
1442     /*
1443      * Create an in-core message structure.
1444      */
1445     message = qmgr_message_create(queue_name, queue_id, qflags);
1446 
1447     /*
1448      * Extract message envelope information: time of arrival, sender address,
1449      * recipient addresses. Skip files with malformed envelope information.
1450      */
1451 #define QMGR_LOCK_MODE (MYFLOCK_OP_EXCLUSIVE | MYFLOCK_OP_NOWAIT)
1452 
1453     if (qmgr_message_open(message) < 0) {
1454 	qmgr_message_free(message);
1455 	return (0);
1456     }
1457     if (myflock(vstream_fileno(message->fp), INTERNAL_LOCK, QMGR_LOCK_MODE) < 0) {
1458 	msg_info("%s: skipped, still being delivered", queue_id);
1459 	qmgr_message_close(message);
1460 	qmgr_message_free(message);
1461 	return (QMGR_MESSAGE_LOCKED);
1462     }
1463     if (qmgr_message_read(message) < 0) {
1464 	qmgr_message_close(message);
1465 	qmgr_message_free(message);
1466 	return (0);
1467     } else {
1468 
1469 	/*
1470 	 * We have validated the queue file content, so it is safe to modify
1471 	 * the file properties now.
1472 	 */
1473 	if (mode != 0 && fchmod(vstream_fileno(message->fp), mode) < 0)
1474 	    msg_fatal("fchmod %s: %m", VSTREAM_PATH(message->fp));
1475 
1476 	/*
1477 	 * Reset the defer log. This code should not be here, but we must
1478 	 * reset the defer log *after* acquiring the exclusive lock on the
1479 	 * queue file and *before* resolving new recipients. Since all those
1480 	 * operations are encapsulated so nicely by this routine, the defer
1481 	 * log reset has to be done here as well.
1482 	 *
1483 	 * Note: it is safe to remove the defer logfile from a previous queue
1484 	 * run of this queue file, because the defer log contains information
1485 	 * about recipients that still exist in this queue file.
1486 	 */
1487 	if (mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT)
1488 	    msg_fatal("%s: %s: remove %s %s: %m", myname,
1489 		      queue_id, MAIL_QUEUE_DEFER, queue_id);
1490 	qmgr_message_sort(message);
1491 	qmgr_message_resolve(message);
1492 	qmgr_message_sort(message);
1493 	qmgr_message_assign(message);
1494 	qmgr_message_close(message);
1495 	if (message->rcpt_offset == 0)
1496 	    qmgr_message_move_limits(message);
1497 	return (message);
1498     }
1499 }
1500 
1501 /* qmgr_message_realloc - refresh in-core message structure */
1502 
1503 QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *message)
1504 {
1505     const char *myname = "qmgr_message_realloc";
1506 
1507     /*
1508      * Sanity checks.
1509      */
1510     if (message->rcpt_offset <= 0)
1511 	msg_panic("%s: invalid offset: %ld", myname, message->rcpt_offset);
1512     if (msg_verbose)
1513 	msg_info("%s: %s %s offset %ld", myname, message->queue_name,
1514 		 message->queue_id, message->rcpt_offset);
1515 
1516     /*
1517      * Extract recipient addresses. Skip files with malformed envelope
1518      * information.
1519      */
1520     if (qmgr_message_open(message) < 0)
1521 	return (0);
1522     if (qmgr_message_read(message) < 0) {
1523 	qmgr_message_close(message);
1524 	return (0);
1525     } else {
1526 	qmgr_message_sort(message);
1527 	qmgr_message_resolve(message);
1528 	qmgr_message_sort(message);
1529 	qmgr_message_assign(message);
1530 	qmgr_message_close(message);
1531 	if (message->rcpt_offset == 0)
1532 	    qmgr_message_move_limits(message);
1533 	return (message);
1534     }
1535 }
1536