xref: /netbsd-src/external/ibm-public/postfix/dist/src/oqmgr/qmgr_active.c (revision 796c32c94f6e154afc9de0f63da35c91bb739b45)
1 /*	$NetBSD: qmgr_active.c,v 1.2 2017/02/14 01:16:46 christos Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_active 3
6 /* SUMMARY
7 /*	active queue management
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	void	qmgr_active_feed(scan_info, queue_id)
12 /*	QMGR_SCAN *scan_info;
13 /*	const char *queue_id;
14 /*
15 /*	void	qmgr_active_drain()
16 /*
17 /*	int	qmgr_active_done(message)
18 /*	QMGR_MESSAGE *message;
19 /* DESCRIPTION
20 /*	These functions maintain the active message queue: the set
21 /*	of messages that the queue manager is actually working on.
22 /*	The active queue is limited in size. Messages are drained
23 /*	from the active queue by allocating a delivery process and
24 /*	by delivering mail via that process.  Messages leak into the
25 /*	active queue only when the active queue is small enough.
26 /*	Damaged message files are saved to the "corrupt" directory.
27 /*
28 /*	qmgr_active_feed() inserts the named message file into
29 /*	the active queue. Message files with the wrong name or
30 /*	with other wrong properties are skipped but not removed.
31 /*	The following queue flags are recognized, other flags being
32 /*	ignored:
33 /* .IP QMGR_SCAN_ALL
34 /*	Examine all queue files. Normally, deferred queue files with
35 /*	future time stamps are ignored, and incoming queue files with
36 /*	future time stamps are frowned upon.
37 /* .PP
38 /*	qmgr_active_drain() allocates one delivery process.
39 /*	Process allocation is asynchronous. Once the delivery
40 /*	process is available, an attempt is made to deliver
41 /*	a message via it. Message delivery is asynchronous, too.
42 /*
43 /*	qmgr_active_done() deals with a message after delivery
44 /*	has been tried for all in-core recipients. If the message
45 /*	was bounced, a bounce message is sent to the sender, or
46 /*	to the Errors-To: address if one was specified.
47 /*	If there are more on-file recipients, a new batch of
48 /*	in-core recipients is read from the queue file. Otherwise,
49 /*	if a delivery agent marked the queue file as corrupt,
50 /*	the queue file is moved to the "corrupt" queue (surprise);
51 /*	if at least one delivery failed, the message is moved
52 /*	to the deferred queue. The time stamps of a deferred queue
53 /*	file are set to the nearest wakeup time of its recipient
54 /*	sites (if delivery failed due to a problem with a next-hop
55 /*	host), are set into the future by the amount of time the
56 /*	message was queued (per-message exponential backoff), or are set
57 /*	into the future by a minimal backoff time, whichever is more.
58 /*	The minimal_backoff_time parameter specifies the minimal
59 /*	amount of time between delivery attempts; maximal_backoff_time
60 /*	specifies an upper limit.
61 /* DIAGNOSTICS
62 /*	Fatal: queue file access failures, out of memory.
63 /*	Panic: interface violations, internal consistency errors.
64 /*	Warnings: corrupt message file. A corrupt message is saved
65 /*	to the "corrupt" queue for further inspection.
66 /* LICENSE
67 /* .ad
68 /* .fi
69 /*	The Secure Mailer license must be distributed with this software.
70 /* AUTHOR(S)
71 /*	Wietse Venema
72 /*	IBM T.J. Watson Research
73 /*	P.O. Box 704
74 /*	Yorktown Heights, NY 10598, USA
75 /*--*/
76 
77 /* System library. */
78 
79 #include <sys_defs.h>
80 #include <sys/stat.h>
81 #include <dirent.h>
82 #include <stdlib.h>
83 #include <unistd.h>
84 #include <string.h>
85 #include <utime.h>
86 #include <errno.h>
87 
88 #ifndef S_IRWXU				/* What? no POSIX system? */
89 #define S_IRWXU 0700
90 #endif
91 
92 /* Utility library. */
93 
94 #include <msg.h>
95 #include <events.h>
96 #include <mymalloc.h>
97 #include <vstream.h>
98 #include <warn_stat.h>
99 
100 /* Global library. */
101 
102 #include <mail_params.h>
103 #include <mail_open_ok.h>
104 #include <mail_queue.h>
105 #include <recipient_list.h>
106 #include <bounce.h>
107 #include <defer.h>
108 #include <trace.h>
109 #include <abounce.h>
110 #include <rec_type.h>
111 #include <qmgr_user.h>
112 
113 /* Application-specific. */
114 
115 #include "qmgr.h"
116 
117  /*
118   * A bunch of call-back routines.
119   */
120 static void qmgr_active_done_2_bounce_flush(int, void *);
121 static void qmgr_active_done_2_generic(QMGR_MESSAGE *);
122 static void qmgr_active_done_25_trace_flush(int, void *);
123 static void qmgr_active_done_25_generic(QMGR_MESSAGE *);
124 static void qmgr_active_done_3_defer_flush(int, void *);
125 static void qmgr_active_done_3_defer_warn(int, void *);
126 static void qmgr_active_done_3_generic(QMGR_MESSAGE *);
127 
128 /* qmgr_active_corrupt - move corrupted file out of the way */
129 
130 static void qmgr_active_corrupt(const char *queue_id)
131 {
132     const char *myname = "qmgr_active_corrupt";
133 
134     if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) {
135 	if (errno != ENOENT)
136 	    msg_fatal("%s: save corrupt file queue %s id %s: %m",
137 		      myname, MAIL_QUEUE_ACTIVE, queue_id);
138     } else {
139 	msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"",
140 		 queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT);
141     }
142 }
143 
144 /* qmgr_active_defer - defer queue file */
145 
146 static void qmgr_active_defer(const char *queue_name, const char *queue_id,
147 			              const char *dest_queue, int delay)
148 {
149     const char *myname = "qmgr_active_defer";
150     const char *path;
151     struct utimbuf tbuf;
152 
153     if (msg_verbose)
154 	msg_info("wakeup %s after %ld secs", queue_id, (long) delay);
155 
156     tbuf.actime = tbuf.modtime = event_time() + delay;
157     path = mail_queue_path((VSTRING *) 0, queue_name, queue_id);
158     if (utime(path, &tbuf) < 0 && errno != ENOENT)
159 	msg_fatal("%s: update %s time stamps: %m", myname, path);
160     if (mail_queue_rename(queue_id, queue_name, dest_queue)) {
161 	if (errno != ENOENT)
162 	    msg_fatal("%s: rename %s from %s to %s: %m", myname,
163 		      queue_id, queue_name, dest_queue);
164 	msg_warn("%s: rename %s from %s to %s: %m", myname,
165 		 queue_id, queue_name, dest_queue);
166     } else if (msg_verbose) {
167 	msg_info("%s: defer %s", myname, queue_id);
168     }
169 }
170 
171 /* qmgr_active_feed - feed one message into active queue */
172 
173 int     qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
174 {
175     const char *myname = "qmgr_active_feed";
176     QMGR_MESSAGE *message;
177     struct stat st;
178     const char *path;
179 
180     if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0)
181 	msg_panic("%s: bad queue %s", myname, scan_info->queue);
182     if (msg_verbose)
183 	msg_info("%s: queue %s", myname, scan_info->queue);
184 
185     /*
186      * Make sure this is something we are willing to open.
187      */
188     if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO)
189 	return (0);
190 
191     if (msg_verbose)
192 	msg_info("%s: %s", myname, path);
193 
194     /*
195      * Skip files that have time stamps into the future. They need to cool
196      * down. Incoming and deferred files can have future time stamps.
197      */
198     if ((scan_info->flags & QMGR_SCAN_ALL) == 0
199 	&& st.st_mtime > time((time_t *) 0) + 1) {
200 	if (msg_verbose)
201 	    msg_info("%s: skip %s (%ld seconds)", myname, queue_id,
202 		     (long) (st.st_mtime - event_time()));
203 	return (0);
204     }
205 
206     /*
207      * Move the message to the active queue. File access errors are fatal.
208      */
209     if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) {
210 	if (errno != ENOENT)
211 	    msg_fatal("%s: %s: rename from %s to %s: %m", myname,
212 		      queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
213 	msg_warn("%s: %s: rename from %s to %s: %m", myname,
214 		 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
215 	return (0);
216     }
217 
218     /*
219      * Extract envelope information: sender and recipients. At this point,
220      * mail addresses have been processed by the cleanup service so they
221      * should be in canonical form. Generate requests to deliver this
222      * message.
223      *
224      * Throwing away queue files seems bad, especially when they made it this
225      * far into the mail system. Therefore we save bad files to a separate
226      * directory for further inspection.
227      *
228      * After queue manager restart it is possible that a queue file is still
229      * being delivered. In that case (the file is locked), defer delivery by
230      * a minimal amount of time.
231      */
232 #define QMGR_FLUSH_AFTER	(QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
233 
234     if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
235 				 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
236 				      scan_info->flags | QMGR_FLUSH_AFTER :
237 				      scan_info->flags,
238 				 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
239 				  st.st_mode & ~MAIL_QUEUE_STAT_UNTHROTTLE :
240 				      0)) == 0) {
241 	qmgr_active_corrupt(queue_id);
242 	return (0);
243     } else if (message == QMGR_MESSAGE_LOCKED) {
244 	qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60);
245 	return (0);
246     } else {
247 
248 	/*
249 	 * Special case if all recipients were already delivered. Send any
250 	 * bounces and clean up.
251 	 */
252 	if (message->refcount == 0)
253 	    qmgr_active_done(message);
254 	return (1);
255     }
256 }
257 
258 /* qmgr_active_done - dispose of message after recipients have been tried */
259 
260 void    qmgr_active_done(QMGR_MESSAGE *message)
261 {
262     const char *myname = "qmgr_active_done";
263     struct stat st;
264 
265     if (msg_verbose)
266 	msg_info("%s: %s", myname, message->queue_id);
267 
268     /*
269      * During a previous iteration, an attempt to bounce this message may
270      * have failed, so there may still be a bounce log lying around. XXX By
271      * groping around in the bounce queue, we're trespassing on the bounce
272      * service's territory. But doing so is more robust than depending on the
273      * bounce daemon to do the lookup for us, and for us to do the deleting
274      * after we have received a successful status from the bounce service.
275      * The bounce queue directory blocks are most likely in memory anyway. If
276      * these lookups become a performance problem we will have to build an
277      * in-core cache into the bounce daemon.
278      *
279      * Don't bounce when the bounce log is empty. The bounce process obviously
280      * failed, and the delivery agent will have requested that the message be
281      * deferred.
282      *
283      * Bounces are sent asynchronously to avoid stalling while the cleanup
284      * daemon waits for the qmgr to accept the "new mail" trigger.
285      *
286      * See also code in cleanup_bounce.c.
287      */
288     if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) {
289 	if (st.st_size == 0) {
290 	    if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id))
291 		msg_fatal("remove %s %s: %m",
292 			  MAIL_QUEUE_BOUNCE, message->queue_id);
293 	} else {
294 	    if (msg_verbose)
295 		msg_info("%s: bounce %s", myname, message->queue_id);
296 	    if (message->verp_delims == 0 || var_verp_bounce_off)
297 		abounce_flush(BOUNCE_FLAG_KEEP,
298 			      message->queue_name,
299 			      message->queue_id,
300 			      message->encoding,
301 			      message->smtputf8,
302 			      message->sender,
303 			      message->dsn_envid,
304 			      message->dsn_ret,
305 			      qmgr_active_done_2_bounce_flush,
306 			      (void *) message);
307 	    else
308 		abounce_flush_verp(BOUNCE_FLAG_KEEP,
309 				   message->queue_name,
310 				   message->queue_id,
311 				   message->encoding,
312 				   message->smtputf8,
313 				   message->sender,
314 				   message->dsn_envid,
315 				   message->dsn_ret,
316 				   message->verp_delims,
317 				   qmgr_active_done_2_bounce_flush,
318 				   (void *) message);
319 	    return;
320 	}
321     }
322 
323     /*
324      * Asynchronous processing does not reach this point.
325      */
326     qmgr_active_done_2_generic(message);
327 }
328 
329 /* qmgr_active_done_2_bounce_flush - process abounce_flush() status */
330 
331 static void qmgr_active_done_2_bounce_flush(int status, void *context)
332 {
333     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
334 
335     /*
336      * Process abounce_flush() status and continue processing.
337      */
338     message->flags |= status;
339     qmgr_active_done_2_generic(message);
340 }
341 
342 /* qmgr_active_done_2_generic - continue processing */
343 
344 static void qmgr_active_done_2_generic(QMGR_MESSAGE *message)
345 {
346     const char *path;
347     struct stat st;
348 
349     /*
350      * A delivery agent marks a queue file as corrupt by changing its
351      * attributes, and by pretending that delivery was deferred.
352      */
353     if (message->flags
354 	&& mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) {
355 	qmgr_active_corrupt(message->queue_id);
356 	qmgr_message_free(message);
357 	return;
358     }
359 
360     /*
361      * If we did not read all recipients from this file, go read some more,
362      * but remember whether some recipients have to be tried again.
363      *
364      * Throwing away queue files seems bad, especially when they made it this
365      * far into the mail system. Therefore we save bad files to a separate
366      * directory for further inspection by a human being.
367      */
368     if (message->rcpt_offset > 0) {
369 	if (qmgr_message_realloc(message) == 0) {
370 	    qmgr_active_corrupt(message->queue_id);
371 	    qmgr_message_free(message);
372 	} else {
373 	    if (message->refcount == 0)
374 		qmgr_active_done(message);	/* recurse for consistency */
375 	}
376 	return;
377     }
378 
379     /*
380      * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS
381      * and others not. Depending on what subset of recipients are delivered,
382      * a trace file may or may not be created. Even when the last partial
383      * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may
384      * still exist from a previous partial delivery attempt. So as long as
385      * any recipient has NOTIFY=SUCCESS we have to always look for the trace
386      * file and be prepared for the file not to exist.
387      *
388      * See also comments in bounce/bounce_notify_util.c.
389      */
390     if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD
391 			    | DEL_REQ_FLAG_REC_DLY_SENT))
392 	|| (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) {
393 	atrace_flush(message->tflags,
394 		     message->queue_name,
395 		     message->queue_id,
396 		     message->encoding,
397 		     message->smtputf8,
398 		     message->sender,
399 		     message->dsn_envid,
400 		     message->dsn_ret,
401 		     qmgr_active_done_25_trace_flush,
402 		     (void *) message);
403 	return;
404     }
405 
406     /*
407      * Asynchronous processing does not reach this point.
408      */
409     qmgr_active_done_25_generic(message);
410 }
411 
412 /* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */
413 
414 static void qmgr_active_done_25_trace_flush(int status, void *context)
415 {
416     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
417 
418     /*
419      * Process atrace_flush() status and continue processing.
420      */
421     if (status == 0 && message->tflags_offset)
422 	qmgr_message_kill_record(message, message->tflags_offset);
423     message->flags |= status;
424     qmgr_active_done_25_generic(message);
425 }
426 
427 /* qmgr_active_done_25_generic - continue processing */
428 
429 static void qmgr_active_done_25_generic(QMGR_MESSAGE *message)
430 {
431     const char *myname = "qmgr_active_done_25_generic";
432 
433     /*
434      * If we get to this point we have tried all recipients for this message.
435      * If the message is too old, try to bounce it.
436      *
437      * Bounces are sent asynchronously to avoid stalling while the cleanup
438      * daemon waits for the qmgr to accept the "new mail" trigger.
439      */
440     if (message->flags) {
441 	if (event_time() >= message->create_time +
442 	    (*message->sender ? var_max_queue_time : var_dsn_queue_time)) {
443 	    msg_info("%s: from=<%s>, status=expired, returned to sender",
444 		     message->queue_id, message->sender);
445 	    if (message->verp_delims == 0 || var_verp_bounce_off)
446 		adefer_flush(BOUNCE_FLAG_KEEP,
447 			     message->queue_name,
448 			     message->queue_id,
449 			     message->encoding,
450 			     message->smtputf8,
451 			     message->sender,
452 			     message->dsn_envid,
453 			     message->dsn_ret,
454 			     qmgr_active_done_3_defer_flush,
455 			     (void *) message);
456 	    else
457 		adefer_flush_verp(BOUNCE_FLAG_KEEP,
458 				  message->queue_name,
459 				  message->queue_id,
460 				  message->encoding,
461 				  message->smtputf8,
462 				  message->sender,
463 				  message->dsn_envid,
464 				  message->dsn_ret,
465 				  message->verp_delims,
466 				  qmgr_active_done_3_defer_flush,
467 				  (void *) message);
468 	    return;
469 	} else if (message->warn_time > 0
470 		   && event_time() >= message->warn_time - 1) {
471 	    if (msg_verbose)
472 		msg_info("%s: sending defer warning for %s", myname, message->queue_id);
473 	    adefer_warn(BOUNCE_FLAG_KEEP,
474 			message->queue_name,
475 			message->queue_id,
476 			message->encoding,
477 			message->smtputf8,
478 			message->sender,
479 			message->dsn_envid,
480 			message->dsn_ret,
481 			qmgr_active_done_3_defer_warn,
482 			(void *) message);
483 	    return;
484 	}
485     }
486 
487     /*
488      * Asynchronous processing does not reach this point.
489      */
490     qmgr_active_done_3_generic(message);
491 }
492 
493 /* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */
494 
495 static void qmgr_active_done_3_defer_warn(int status, void *context)
496 {
497     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
498 
499     /*
500      * Process adefer_warn() completion status and continue processing.
501      */
502     if (status == 0)
503 	qmgr_message_update_warn(message);
504     qmgr_active_done_3_generic(message);
505 }
506 
507 /* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */
508 
509 static void qmgr_active_done_3_defer_flush(int status, void *context)
510 {
511     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
512 
513     /*
514      * Process adefer_flush() status and continue processing.
515      */
516     message->flags = status;
517     qmgr_active_done_3_generic(message);
518 }
519 
520 /* qmgr_active_done_3_generic - continue processing */
521 
522 static void qmgr_active_done_3_generic(QMGR_MESSAGE *message)
523 {
524     const char *myname = "qmgr_active_done_3_generic";
525     int     delay;
526 
527     /*
528      * Some recipients need to be tried again. Move the queue file time
529      * stamps into the future by the amount of time that the message is
530      * delayed, and move the message to the deferred queue. Impose minimal
531      * and maximal backoff times.
532      *
533      * Since we look at actual time in queue, not time since last delivery
534      * attempt, backoff times will be distributed. However, we can still see
535      * spikes in delivery activity because the interval between deferred
536      * queue scans is finite.
537      */
538     if (message->flags) {
539 	if (message->create_time > 0) {
540 	    delay = event_time() - message->create_time;
541 	    if (delay > var_max_backoff_time)
542 		delay = var_max_backoff_time;
543 	    if (delay < var_min_backoff_time)
544 		delay = var_min_backoff_time;
545 	} else {
546 	    delay = var_min_backoff_time;
547 	}
548 	qmgr_active_defer(message->queue_name, message->queue_id,
549 			  MAIL_QUEUE_DEFERRED, delay);
550     }
551 
552     /*
553      * All recipients done. Remove the queue file.
554      */
555     else {
556 	if (mail_queue_remove(message->queue_name, message->queue_id)) {
557 	    if (errno != ENOENT)
558 		msg_fatal("%s: remove %s from %s: %m", myname,
559 			  message->queue_id, message->queue_name);
560 	    msg_warn("%s: remove %s from %s: %m", myname,
561 		     message->queue_id, message->queue_name);
562 	} else {
563 	    /* Same format as logged by postsuper. */
564 	    msg_info("%s: removed", message->queue_id);
565 	}
566     }
567 
568     /*
569      * Finally, delete the in-core message structure.
570      */
571     qmgr_message_free(message);
572 }
573 
574 /* qmgr_active_drain - drain active queue by allocating a delivery process */
575 
576 void    qmgr_active_drain(void)
577 {
578     QMGR_TRANSPORT *transport;
579 
580     /*
581      * Allocate one delivery process for every transport with pending mail.
582      * The process allocation completes asynchronously.
583      */
584     while ((transport = qmgr_transport_select()) != 0) {
585 	if (msg_verbose)
586 	    msg_info("qmgr_active_drain: allocate %s", transport->name);
587 	qmgr_transport_alloc(transport, qmgr_deliver);
588     }
589 }
590