xref: /netbsd-src/external/ibm-public/postfix/dist/src/oqmgr/qmgr_active.c (revision 33881f779a77dce6440bdc44610d94de75bebefe)
1 /*	$NetBSD: qmgr_active.c,v 1.3 2020/03/18 19:05:17 christos Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_active 3
6 /* SUMMARY
7 /*	active queue management
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	void	qmgr_active_feed(scan_info, queue_id)
12 /*	QMGR_SCAN *scan_info;
13 /*	const char *queue_id;
14 /*
15 /*	void	qmgr_active_drain()
16 /*
17 /*	int	qmgr_active_done(message)
18 /*	QMGR_MESSAGE *message;
19 /* DESCRIPTION
20 /*	These functions maintain the active message queue: the set
21 /*	of messages that the queue manager is actually working on.
22 /*	The active queue is limited in size. Messages are drained
23 /*	from the active queue by allocating a delivery process and
24 /*	by delivering mail via that process.  Messages leak into the
25 /*	active queue only when the active queue is small enough.
26 /*	Damaged message files are saved to the "corrupt" directory.
27 /*
28 /*	qmgr_active_feed() inserts the named message file into
29 /*	the active queue. Message files with the wrong name or
30 /*	with other wrong properties are skipped but not removed.
31 /*	The following queue flags are recognized, other flags being
32 /*	ignored:
33 /* .IP QMGR_SCAN_ALL
34 /*	Examine all queue files. Normally, deferred queue files with
35 /*	future time stamps are ignored, and incoming queue files with
36 /*	future time stamps are frowned upon.
37 /* .PP
38 /*	qmgr_active_drain() allocates one delivery process.
39 /*	Process allocation is asynchronous. Once the delivery
40 /*	process is available, an attempt is made to deliver
41 /*	a message via it. Message delivery is asynchronous, too.
42 /*
43 /*	qmgr_active_done() deals with a message after delivery
44 /*	has been tried for all in-core recipients. If the message
45 /*	was bounced, a bounce message is sent to the sender, or
46 /*	to the Errors-To: address if one was specified.
47 /*	If there are more on-file recipients, a new batch of
48 /*	in-core recipients is read from the queue file. Otherwise,
49 /*	if a delivery agent marked the queue file as corrupt,
50 /*	the queue file is moved to the "corrupt" queue (surprise);
51 /*	if at least one delivery failed, the message is moved
52 /*	to the deferred queue. The time stamps of a deferred queue
53 /*	file are set to the nearest wakeup time of its recipient
54 /*	sites (if delivery failed due to a problem with a next-hop
55 /*	host), are set into the future by the amount of time the
56 /*	message was queued (per-message exponential backoff), or are set
57 /*	into the future by a minimal backoff time, whichever is more.
58 /*	The minimal_backoff_time parameter specifies the minimal
59 /*	amount of time between delivery attempts; maximal_backoff_time
60 /*	specifies an upper limit.
61 /* DIAGNOSTICS
62 /*	Fatal: queue file access failures, out of memory.
63 /*	Panic: interface violations, internal consistency errors.
64 /*	Warnings: corrupt message file. A corrupt message is saved
65 /*	to the "corrupt" queue for further inspection.
66 /* LICENSE
67 /* .ad
68 /* .fi
69 /*	The Secure Mailer license must be distributed with this software.
70 /* AUTHOR(S)
71 /*	Wietse Venema
72 /*	IBM T.J. Watson Research
73 /*	P.O. Box 704
74 /*	Yorktown Heights, NY 10598, USA
75 /*
76 /*	Wietse Venema
77 /*	Google, Inc.
78 /*	111 8th Avenue
79 /*	New York, NY 10011, USA
80 /*--*/
81 
82 /* System library. */
83 
84 #include <sys_defs.h>
85 #include <sys/stat.h>
86 #include <dirent.h>
87 #include <stdlib.h>
88 #include <unistd.h>
89 #include <string.h>
90 #include <utime.h>
91 #include <errno.h>
92 
93 #ifndef S_IRWXU				/* What? no POSIX system? */
94 #define S_IRWXU 0700
95 #endif
96 
97 /* Utility library. */
98 
99 #include <msg.h>
100 #include <events.h>
101 #include <mymalloc.h>
102 #include <vstream.h>
103 #include <warn_stat.h>
104 
105 /* Global library. */
106 
107 #include <mail_params.h>
108 #include <mail_open_ok.h>
109 #include <mail_queue.h>
110 #include <recipient_list.h>
111 #include <bounce.h>
112 #include <defer.h>
113 #include <trace.h>
114 #include <abounce.h>
115 #include <rec_type.h>
116 #include <qmgr_user.h>
117 #include <info_log_addr_form.h>
118 
119 /* Application-specific. */
120 
121 #include "qmgr.h"
122 
123  /*
124   * A bunch of call-back routines.
125   */
126 static void qmgr_active_done_2_bounce_flush(int, void *);
127 static void qmgr_active_done_2_generic(QMGR_MESSAGE *);
128 static void qmgr_active_done_25_trace_flush(int, void *);
129 static void qmgr_active_done_25_generic(QMGR_MESSAGE *);
130 static void qmgr_active_done_3_defer_flush(int, void *);
131 static void qmgr_active_done_3_defer_warn(int, void *);
132 static void qmgr_active_done_3_generic(QMGR_MESSAGE *);
133 
134 /* qmgr_active_corrupt - move corrupted file out of the way */
135 
qmgr_active_corrupt(const char * queue_id)136 static void qmgr_active_corrupt(const char *queue_id)
137 {
138     const char *myname = "qmgr_active_corrupt";
139 
140     if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) {
141 	if (errno != ENOENT)
142 	    msg_fatal("%s: save corrupt file queue %s id %s: %m",
143 		      myname, MAIL_QUEUE_ACTIVE, queue_id);
144     } else {
145 	msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"",
146 		 queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT);
147     }
148 }
149 
150 /* qmgr_active_defer - defer queue file */
151 
qmgr_active_defer(const char * queue_name,const char * queue_id,const char * dest_queue,int delay)152 static void qmgr_active_defer(const char *queue_name, const char *queue_id,
153 			              const char *dest_queue, int delay)
154 {
155     const char *myname = "qmgr_active_defer";
156     const char *path;
157     struct utimbuf tbuf;
158 
159     if (msg_verbose)
160 	msg_info("wakeup %s after %ld secs", queue_id, (long) delay);
161 
162     tbuf.actime = tbuf.modtime = event_time() + delay;
163     path = mail_queue_path((VSTRING *) 0, queue_name, queue_id);
164     if (utime(path, &tbuf) < 0 && errno != ENOENT)
165 	msg_fatal("%s: update %s time stamps: %m", myname, path);
166     if (mail_queue_rename(queue_id, queue_name, dest_queue)) {
167 	if (errno != ENOENT)
168 	    msg_fatal("%s: rename %s from %s to %s: %m", myname,
169 		      queue_id, queue_name, dest_queue);
170 	msg_warn("%s: rename %s from %s to %s: %m", myname,
171 		 queue_id, queue_name, dest_queue);
172     } else if (msg_verbose) {
173 	msg_info("%s: defer %s", myname, queue_id);
174     }
175 }
176 
177 /* qmgr_active_feed - feed one message into active queue */
178 
qmgr_active_feed(QMGR_SCAN * scan_info,const char * queue_id)179 int     qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
180 {
181     const char *myname = "qmgr_active_feed";
182     QMGR_MESSAGE *message;
183     struct stat st;
184     const char *path;
185 
186     if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0)
187 	msg_panic("%s: bad queue %s", myname, scan_info->queue);
188     if (msg_verbose)
189 	msg_info("%s: queue %s", myname, scan_info->queue);
190 
191     /*
192      * Make sure this is something we are willing to open.
193      */
194     if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO)
195 	return (0);
196 
197     if (msg_verbose)
198 	msg_info("%s: %s", myname, path);
199 
200     /*
201      * Skip files that have time stamps into the future. They need to cool
202      * down. Incoming and deferred files can have future time stamps.
203      */
204     if ((scan_info->flags & QMGR_SCAN_ALL) == 0
205 	&& st.st_mtime > time((time_t *) 0) + 1) {
206 	if (msg_verbose)
207 	    msg_info("%s: skip %s (%ld seconds)", myname, queue_id,
208 		     (long) (st.st_mtime - event_time()));
209 	return (0);
210     }
211 
212     /*
213      * Move the message to the active queue. File access errors are fatal.
214      */
215     if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) {
216 	if (errno != ENOENT)
217 	    msg_fatal("%s: %s: rename from %s to %s: %m", myname,
218 		      queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
219 	msg_warn("%s: %s: rename from %s to %s: %m", myname,
220 		 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
221 	return (0);
222     }
223 
224     /*
225      * Extract envelope information: sender and recipients. At this point,
226      * mail addresses have been processed by the cleanup service so they
227      * should be in canonical form. Generate requests to deliver this
228      * message.
229      *
230      * Throwing away queue files seems bad, especially when they made it this
231      * far into the mail system. Therefore we save bad files to a separate
232      * directory for further inspection.
233      *
234      * After queue manager restart it is possible that a queue file is still
235      * being delivered. In that case (the file is locked), defer delivery by
236      * a minimal amount of time.
237      */
238 #define QMGR_FLUSH_AFTER	(QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
239 #define MAYBE_FLUSH_AFTER(mode) \
240 	(((mode) & MAIL_QUEUE_STAT_UNTHROTTLE) ? QMGR_FLUSH_AFTER : 0)
241 #define MAYBE_FORCE_EXPIRE(mode) \
242 	(((mode) & MAIL_QUEUE_STAT_EXPIRE) ? QMGR_FORCE_EXPIRE : 0)
243 #define MAYBE_UPDATE_MODE(mode) \
244 	(((mode) & MAIL_QUEUE_STAT_UNTHROTTLE) ? \
245 	(mode) & ~MAIL_QUEUE_STAT_UNTHROTTLE : 0)
246 
247     if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
248 				      scan_info->flags
249 				      | MAYBE_FLUSH_AFTER(st.st_mode)
250 				      | MAYBE_FORCE_EXPIRE(st.st_mode),
251 				      MAYBE_UPDATE_MODE(st.st_mode))) == 0) {
252 	qmgr_active_corrupt(queue_id);
253 	return (0);
254     } else if (message == QMGR_MESSAGE_LOCKED) {
255 	qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60);
256 	return (0);
257     } else {
258 
259 	/*
260 	 * Special case if all recipients were already delivered. Send any
261 	 * bounces and clean up.
262 	 */
263 	if (message->refcount == 0)
264 	    qmgr_active_done(message);
265 	return (1);
266     }
267 }
268 
269 /* qmgr_active_done - dispose of message after recipients have been tried */
270 
qmgr_active_done(QMGR_MESSAGE * message)271 void    qmgr_active_done(QMGR_MESSAGE *message)
272 {
273     const char *myname = "qmgr_active_done";
274     struct stat st;
275 
276     if (msg_verbose)
277 	msg_info("%s: %s", myname, message->queue_id);
278 
279     /*
280      * During a previous iteration, an attempt to bounce this message may
281      * have failed, so there may still be a bounce log lying around. XXX By
282      * groping around in the bounce queue, we're trespassing on the bounce
283      * service's territory. But doing so is more robust than depending on the
284      * bounce daemon to do the lookup for us, and for us to do the deleting
285      * after we have received a successful status from the bounce service.
286      * The bounce queue directory blocks are most likely in memory anyway. If
287      * these lookups become a performance problem we will have to build an
288      * in-core cache into the bounce daemon.
289      *
290      * Don't bounce when the bounce log is empty. The bounce process obviously
291      * failed, and the delivery agent will have requested that the message be
292      * deferred.
293      *
294      * Bounces are sent asynchronously to avoid stalling while the cleanup
295      * daemon waits for the qmgr to accept the "new mail" trigger.
296      *
297      * See also code in cleanup_bounce.c.
298      */
299     if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) {
300 	if (st.st_size == 0) {
301 	    if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id))
302 		msg_fatal("remove %s %s: %m",
303 			  MAIL_QUEUE_BOUNCE, message->queue_id);
304 	} else {
305 	    if (msg_verbose)
306 		msg_info("%s: bounce %s", myname, message->queue_id);
307 	    if (message->verp_delims == 0 || var_verp_bounce_off)
308 		abounce_flush(BOUNCE_FLAG_KEEP,
309 			      message->queue_name,
310 			      message->queue_id,
311 			      message->encoding,
312 			      message->smtputf8,
313 			      message->sender,
314 			      message->dsn_envid,
315 			      message->dsn_ret,
316 			      qmgr_active_done_2_bounce_flush,
317 			      (void *) message);
318 	    else
319 		abounce_flush_verp(BOUNCE_FLAG_KEEP,
320 				   message->queue_name,
321 				   message->queue_id,
322 				   message->encoding,
323 				   message->smtputf8,
324 				   message->sender,
325 				   message->dsn_envid,
326 				   message->dsn_ret,
327 				   message->verp_delims,
328 				   qmgr_active_done_2_bounce_flush,
329 				   (void *) message);
330 	    return;
331 	}
332     }
333 
334     /*
335      * Asynchronous processing does not reach this point.
336      */
337     qmgr_active_done_2_generic(message);
338 }
339 
340 /* qmgr_active_done_2_bounce_flush - process abounce_flush() status */
341 
qmgr_active_done_2_bounce_flush(int status,void * context)342 static void qmgr_active_done_2_bounce_flush(int status, void *context)
343 {
344     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
345 
346     /*
347      * Process abounce_flush() status and continue processing.
348      */
349     message->flags |= status;
350     qmgr_active_done_2_generic(message);
351 }
352 
353 /* qmgr_active_done_2_generic - continue processing */
354 
qmgr_active_done_2_generic(QMGR_MESSAGE * message)355 static void qmgr_active_done_2_generic(QMGR_MESSAGE *message)
356 {
357     const char *path;
358     struct stat st;
359 
360     /*
361      * A delivery agent marks a queue file as corrupt by changing its
362      * attributes, and by pretending that delivery was deferred.
363      */
364     if (message->flags
365 	&& mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) {
366 	qmgr_active_corrupt(message->queue_id);
367 	qmgr_message_free(message);
368 	return;
369     }
370 
371     /*
372      * If we did not read all recipients from this file, go read some more,
373      * but remember whether some recipients have to be tried again.
374      *
375      * Throwing away queue files seems bad, especially when they made it this
376      * far into the mail system. Therefore we save bad files to a separate
377      * directory for further inspection by a human being.
378      */
379     if (message->rcpt_offset > 0) {
380 	if (qmgr_message_realloc(message) == 0) {
381 	    qmgr_active_corrupt(message->queue_id);
382 	    qmgr_message_free(message);
383 	} else {
384 	    if (message->refcount == 0)
385 		qmgr_active_done(message);	/* recurse for consistency */
386 	}
387 	return;
388     }
389 
390     /*
391      * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS
392      * and others not. Depending on what subset of recipients are delivered,
393      * a trace file may or may not be created. Even when the last partial
394      * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may
395      * still exist from a previous partial delivery attempt. So as long as
396      * any recipient has NOTIFY=SUCCESS we have to always look for the trace
397      * file and be prepared for the file not to exist.
398      *
399      * See also comments in bounce/bounce_notify_util.c.
400      */
401     if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD
402 			    | DEL_REQ_FLAG_REC_DLY_SENT))
403 	|| (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) {
404 	atrace_flush(message->tflags,
405 		     message->queue_name,
406 		     message->queue_id,
407 		     message->encoding,
408 		     message->smtputf8,
409 		     message->sender,
410 		     message->dsn_envid,
411 		     message->dsn_ret,
412 		     qmgr_active_done_25_trace_flush,
413 		     (void *) message);
414 	return;
415     }
416 
417     /*
418      * Asynchronous processing does not reach this point.
419      */
420     qmgr_active_done_25_generic(message);
421 }
422 
423 /* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */
424 
qmgr_active_done_25_trace_flush(int status,void * context)425 static void qmgr_active_done_25_trace_flush(int status, void *context)
426 {
427     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
428 
429     /*
430      * Process atrace_flush() status and continue processing.
431      */
432     if (status == 0 && message->tflags_offset)
433 	qmgr_message_kill_record(message, message->tflags_offset);
434     message->flags |= status;
435     qmgr_active_done_25_generic(message);
436 }
437 
438 /* qmgr_active_done_25_generic - continue processing */
439 
qmgr_active_done_25_generic(QMGR_MESSAGE * message)440 static void qmgr_active_done_25_generic(QMGR_MESSAGE *message)
441 {
442     const char *myname = "qmgr_active_done_25_generic";
443     const char *expire_status = 0;
444 
445     /*
446      * If we get to this point we have tried all recipients for this message.
447      * If the message is too old, try to bounce it.
448      *
449      * Bounces are sent asynchronously to avoid stalling while the cleanup
450      * daemon waits for the qmgr to accept the "new mail" trigger.
451      */
452     if (message->flags) {
453 	if ((message->qflags & QMGR_FORCE_EXPIRE) != 0) {
454 	    expire_status = "force-expired";
455 	} else if (event_time() >= message->create_time +
456 	     (*message->sender ? var_max_queue_time : var_dsn_queue_time)) {
457 	    expire_status = "expired";
458 	} else {
459 	    expire_status = 0;
460 	}
461 	if (expire_status != 0) {
462 	    msg_info("%s: from=<%s>, status=%s, returned to sender",
463 	      message->queue_id, info_log_addr_form_sender(message->sender),
464 		     expire_status);
465 	    if (message->verp_delims == 0 || var_verp_bounce_off)
466 		adefer_flush(BOUNCE_FLAG_KEEP,
467 			     message->queue_name,
468 			     message->queue_id,
469 			     message->encoding,
470 			     message->smtputf8,
471 			     message->sender,
472 			     message->dsn_envid,
473 			     message->dsn_ret,
474 			     qmgr_active_done_3_defer_flush,
475 			     (void *) message);
476 	    else
477 		adefer_flush_verp(BOUNCE_FLAG_KEEP,
478 				  message->queue_name,
479 				  message->queue_id,
480 				  message->encoding,
481 				  message->smtputf8,
482 				  message->sender,
483 				  message->dsn_envid,
484 				  message->dsn_ret,
485 				  message->verp_delims,
486 				  qmgr_active_done_3_defer_flush,
487 				  (void *) message);
488 	    return;
489 	} else if (message->warn_time > 0
490 		   && event_time() >= message->warn_time - 1) {
491 	    if (msg_verbose)
492 		msg_info("%s: sending defer warning for %s", myname, message->queue_id);
493 	    adefer_warn(BOUNCE_FLAG_KEEP,
494 			message->queue_name,
495 			message->queue_id,
496 			message->encoding,
497 			message->smtputf8,
498 			message->sender,
499 			message->dsn_envid,
500 			message->dsn_ret,
501 			qmgr_active_done_3_defer_warn,
502 			(void *) message);
503 	    return;
504 	}
505     }
506 
507     /*
508      * Asynchronous processing does not reach this point.
509      */
510     qmgr_active_done_3_generic(message);
511 }
512 
513 /* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */
514 
qmgr_active_done_3_defer_warn(int status,void * context)515 static void qmgr_active_done_3_defer_warn(int status, void *context)
516 {
517     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
518 
519     /*
520      * Process adefer_warn() completion status and continue processing.
521      */
522     if (status == 0)
523 	qmgr_message_update_warn(message);
524     qmgr_active_done_3_generic(message);
525 }
526 
527 /* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */
528 
qmgr_active_done_3_defer_flush(int status,void * context)529 static void qmgr_active_done_3_defer_flush(int status, void *context)
530 {
531     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
532 
533     /*
534      * Process adefer_flush() status and continue processing.
535      */
536     message->flags = status;
537     qmgr_active_done_3_generic(message);
538 }
539 
540 /* qmgr_active_done_3_generic - continue processing */
541 
qmgr_active_done_3_generic(QMGR_MESSAGE * message)542 static void qmgr_active_done_3_generic(QMGR_MESSAGE *message)
543 {
544     const char *myname = "qmgr_active_done_3_generic";
545     int     delay;
546 
547     /*
548      * Some recipients need to be tried again. Move the queue file time
549      * stamps into the future by the amount of time that the message is
550      * delayed, and move the message to the deferred queue. Impose minimal
551      * and maximal backoff times.
552      *
553      * Since we look at actual time in queue, not time since last delivery
554      * attempt, backoff times will be distributed. However, we can still see
555      * spikes in delivery activity because the interval between deferred
556      * queue scans is finite.
557      */
558     if (message->flags) {
559 	if (message->create_time > 0) {
560 	    delay = event_time() - message->create_time;
561 	    if (delay > var_max_backoff_time)
562 		delay = var_max_backoff_time;
563 	    if (delay < var_min_backoff_time)
564 		delay = var_min_backoff_time;
565 	} else {
566 	    delay = var_min_backoff_time;
567 	}
568 	qmgr_active_defer(message->queue_name, message->queue_id,
569 			  MAIL_QUEUE_DEFERRED, delay);
570     }
571 
572     /*
573      * All recipients done. Remove the queue file.
574      */
575     else {
576 	if (mail_queue_remove(message->queue_name, message->queue_id)) {
577 	    if (errno != ENOENT)
578 		msg_fatal("%s: remove %s from %s: %m", myname,
579 			  message->queue_id, message->queue_name);
580 	    msg_warn("%s: remove %s from %s: %m", myname,
581 		     message->queue_id, message->queue_name);
582 	} else {
583 	    /* Same format as logged by postsuper. */
584 	    msg_info("%s: removed", message->queue_id);
585 	}
586     }
587 
588     /*
589      * Finally, delete the in-core message structure.
590      */
591     qmgr_message_free(message);
592 }
593 
594 /* qmgr_active_drain - drain active queue by allocating a delivery process */
595 
qmgr_active_drain(void)596 void    qmgr_active_drain(void)
597 {
598     QMGR_TRANSPORT *transport;
599 
600     /*
601      * Allocate one delivery process for every transport with pending mail.
602      * The process allocation completes asynchronously.
603      */
604     while ((transport = qmgr_transport_select()) != 0) {
605 	if (msg_verbose)
606 	    msg_info("qmgr_active_drain: allocate %s", transport->name);
607 	qmgr_transport_alloc(transport, qmgr_deliver);
608     }
609 }
610