1 /* $NetBSD: qmgr_active.c,v 1.3 2020/03/18 19:05:17 christos Exp $ */
2
3 /*++
4 /* NAME
5 /* qmgr_active 3
6 /* SUMMARY
7 /* active queue management
8 /* SYNOPSIS
9 /* #include "qmgr.h"
10 /*
11 /* void qmgr_active_feed(scan_info, queue_id)
12 /* QMGR_SCAN *scan_info;
13 /* const char *queue_id;
14 /*
15 /* void qmgr_active_drain()
16 /*
17 /* int qmgr_active_done(message)
18 /* QMGR_MESSAGE *message;
19 /* DESCRIPTION
20 /* These functions maintain the active message queue: the set
21 /* of messages that the queue manager is actually working on.
22 /* The active queue is limited in size. Messages are drained
23 /* from the active queue by allocating a delivery process and
24 /* by delivering mail via that process. Messages leak into the
25 /* active queue only when the active queue is small enough.
26 /* Damaged message files are saved to the "corrupt" directory.
27 /*
28 /* qmgr_active_feed() inserts the named message file into
29 /* the active queue. Message files with the wrong name or
30 /* with other wrong properties are skipped but not removed.
31 /* The following queue flags are recognized, other flags being
32 /* ignored:
33 /* .IP QMGR_SCAN_ALL
34 /* Examine all queue files. Normally, deferred queue files with
35 /* future time stamps are ignored, and incoming queue files with
36 /* future time stamps are frowned upon.
37 /* .PP
38 /* qmgr_active_drain() allocates one delivery process.
39 /* Process allocation is asynchronous. Once the delivery
40 /* process is available, an attempt is made to deliver
41 /* a message via it. Message delivery is asynchronous, too.
42 /*
43 /* qmgr_active_done() deals with a message after delivery
44 /* has been tried for all in-core recipients. If the message
45 /* was bounced, a bounce message is sent to the sender, or
46 /* to the Errors-To: address if one was specified.
47 /* If there are more on-file recipients, a new batch of
48 /* in-core recipients is read from the queue file. Otherwise,
49 /* if a delivery agent marked the queue file as corrupt,
50 /* the queue file is moved to the "corrupt" queue (surprise);
51 /* if at least one delivery failed, the message is moved
52 /* to the deferred queue. The time stamps of a deferred queue
53 /* file are set to the nearest wakeup time of its recipient
54 /* sites (if delivery failed due to a problem with a next-hop
55 /* host), are set into the future by the amount of time the
56 /* message was queued (per-message exponential backoff), or are set
57 /* into the future by a minimal backoff time, whichever is more.
58 /* The minimal_backoff_time parameter specifies the minimal
59 /* amount of time between delivery attempts; maximal_backoff_time
60 /* specifies an upper limit.
61 /* DIAGNOSTICS
62 /* Fatal: queue file access failures, out of memory.
63 /* Panic: interface violations, internal consistency errors.
64 /* Warnings: corrupt message file. A corrupt message is saved
65 /* to the "corrupt" queue for further inspection.
66 /* LICENSE
67 /* .ad
68 /* .fi
69 /* The Secure Mailer license must be distributed with this software.
70 /* AUTHOR(S)
71 /* Wietse Venema
72 /* IBM T.J. Watson Research
73 /* P.O. Box 704
74 /* Yorktown Heights, NY 10598, USA
75 /*
76 /* Wietse Venema
77 /* Google, Inc.
78 /* 111 8th Avenue
79 /* New York, NY 10011, USA
80 /*--*/
81
82 /* System library. */
83
84 #include <sys_defs.h>
85 #include <sys/stat.h>
86 #include <dirent.h>
87 #include <stdlib.h>
88 #include <unistd.h>
89 #include <string.h>
90 #include <utime.h>
91 #include <errno.h>
92
93 #ifndef S_IRWXU /* What? no POSIX system? */
94 #define S_IRWXU 0700
95 #endif
96
97 /* Utility library. */
98
99 #include <msg.h>
100 #include <events.h>
101 #include <mymalloc.h>
102 #include <vstream.h>
103 #include <warn_stat.h>
104
105 /* Global library. */
106
107 #include <mail_params.h>
108 #include <mail_open_ok.h>
109 #include <mail_queue.h>
110 #include <recipient_list.h>
111 #include <bounce.h>
112 #include <defer.h>
113 #include <trace.h>
114 #include <abounce.h>
115 #include <rec_type.h>
116 #include <qmgr_user.h>
117 #include <info_log_addr_form.h>
118
119 /* Application-specific. */
120
121 #include "qmgr.h"
122
123 /*
124 * A bunch of call-back routines.
125 */
126 static void qmgr_active_done_2_bounce_flush(int, void *);
127 static void qmgr_active_done_2_generic(QMGR_MESSAGE *);
128 static void qmgr_active_done_25_trace_flush(int, void *);
129 static void qmgr_active_done_25_generic(QMGR_MESSAGE *);
130 static void qmgr_active_done_3_defer_flush(int, void *);
131 static void qmgr_active_done_3_defer_warn(int, void *);
132 static void qmgr_active_done_3_generic(QMGR_MESSAGE *);
133
134 /* qmgr_active_corrupt - move corrupted file out of the way */
135
qmgr_active_corrupt(const char * queue_id)136 static void qmgr_active_corrupt(const char *queue_id)
137 {
138 const char *myname = "qmgr_active_corrupt";
139
140 if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) {
141 if (errno != ENOENT)
142 msg_fatal("%s: save corrupt file queue %s id %s: %m",
143 myname, MAIL_QUEUE_ACTIVE, queue_id);
144 } else {
145 msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"",
146 queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT);
147 }
148 }
149
150 /* qmgr_active_defer - defer queue file */
151
qmgr_active_defer(const char * queue_name,const char * queue_id,const char * dest_queue,int delay)152 static void qmgr_active_defer(const char *queue_name, const char *queue_id,
153 const char *dest_queue, int delay)
154 {
155 const char *myname = "qmgr_active_defer";
156 const char *path;
157 struct utimbuf tbuf;
158
159 if (msg_verbose)
160 msg_info("wakeup %s after %ld secs", queue_id, (long) delay);
161
162 tbuf.actime = tbuf.modtime = event_time() + delay;
163 path = mail_queue_path((VSTRING *) 0, queue_name, queue_id);
164 if (utime(path, &tbuf) < 0 && errno != ENOENT)
165 msg_fatal("%s: update %s time stamps: %m", myname, path);
166 if (mail_queue_rename(queue_id, queue_name, dest_queue)) {
167 if (errno != ENOENT)
168 msg_fatal("%s: rename %s from %s to %s: %m", myname,
169 queue_id, queue_name, dest_queue);
170 msg_warn("%s: rename %s from %s to %s: %m", myname,
171 queue_id, queue_name, dest_queue);
172 } else if (msg_verbose) {
173 msg_info("%s: defer %s", myname, queue_id);
174 }
175 }
176
177 /* qmgr_active_feed - feed one message into active queue */
178
qmgr_active_feed(QMGR_SCAN * scan_info,const char * queue_id)179 int qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
180 {
181 const char *myname = "qmgr_active_feed";
182 QMGR_MESSAGE *message;
183 struct stat st;
184 const char *path;
185
186 if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0)
187 msg_panic("%s: bad queue %s", myname, scan_info->queue);
188 if (msg_verbose)
189 msg_info("%s: queue %s", myname, scan_info->queue);
190
191 /*
192 * Make sure this is something we are willing to open.
193 */
194 if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO)
195 return (0);
196
197 if (msg_verbose)
198 msg_info("%s: %s", myname, path);
199
200 /*
201 * Skip files that have time stamps into the future. They need to cool
202 * down. Incoming and deferred files can have future time stamps.
203 */
204 if ((scan_info->flags & QMGR_SCAN_ALL) == 0
205 && st.st_mtime > time((time_t *) 0) + 1) {
206 if (msg_verbose)
207 msg_info("%s: skip %s (%ld seconds)", myname, queue_id,
208 (long) (st.st_mtime - event_time()));
209 return (0);
210 }
211
212 /*
213 * Move the message to the active queue. File access errors are fatal.
214 */
215 if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) {
216 if (errno != ENOENT)
217 msg_fatal("%s: %s: rename from %s to %s: %m", myname,
218 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
219 msg_warn("%s: %s: rename from %s to %s: %m", myname,
220 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
221 return (0);
222 }
223
224 /*
225 * Extract envelope information: sender and recipients. At this point,
226 * mail addresses have been processed by the cleanup service so they
227 * should be in canonical form. Generate requests to deliver this
228 * message.
229 *
230 * Throwing away queue files seems bad, especially when they made it this
231 * far into the mail system. Therefore we save bad files to a separate
232 * directory for further inspection.
233 *
234 * After queue manager restart it is possible that a queue file is still
235 * being delivered. In that case (the file is locked), defer delivery by
236 * a minimal amount of time.
237 */
238 #define QMGR_FLUSH_AFTER (QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
239 #define MAYBE_FLUSH_AFTER(mode) \
240 (((mode) & MAIL_QUEUE_STAT_UNTHROTTLE) ? QMGR_FLUSH_AFTER : 0)
241 #define MAYBE_FORCE_EXPIRE(mode) \
242 (((mode) & MAIL_QUEUE_STAT_EXPIRE) ? QMGR_FORCE_EXPIRE : 0)
243 #define MAYBE_UPDATE_MODE(mode) \
244 (((mode) & MAIL_QUEUE_STAT_UNTHROTTLE) ? \
245 (mode) & ~MAIL_QUEUE_STAT_UNTHROTTLE : 0)
246
247 if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
248 scan_info->flags
249 | MAYBE_FLUSH_AFTER(st.st_mode)
250 | MAYBE_FORCE_EXPIRE(st.st_mode),
251 MAYBE_UPDATE_MODE(st.st_mode))) == 0) {
252 qmgr_active_corrupt(queue_id);
253 return (0);
254 } else if (message == QMGR_MESSAGE_LOCKED) {
255 qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60);
256 return (0);
257 } else {
258
259 /*
260 * Special case if all recipients were already delivered. Send any
261 * bounces and clean up.
262 */
263 if (message->refcount == 0)
264 qmgr_active_done(message);
265 return (1);
266 }
267 }
268
269 /* qmgr_active_done - dispose of message after recipients have been tried */
270
qmgr_active_done(QMGR_MESSAGE * message)271 void qmgr_active_done(QMGR_MESSAGE *message)
272 {
273 const char *myname = "qmgr_active_done";
274 struct stat st;
275
276 if (msg_verbose)
277 msg_info("%s: %s", myname, message->queue_id);
278
279 /*
280 * During a previous iteration, an attempt to bounce this message may
281 * have failed, so there may still be a bounce log lying around. XXX By
282 * groping around in the bounce queue, we're trespassing on the bounce
283 * service's territory. But doing so is more robust than depending on the
284 * bounce daemon to do the lookup for us, and for us to do the deleting
285 * after we have received a successful status from the bounce service.
286 * The bounce queue directory blocks are most likely in memory anyway. If
287 * these lookups become a performance problem we will have to build an
288 * in-core cache into the bounce daemon.
289 *
290 * Don't bounce when the bounce log is empty. The bounce process obviously
291 * failed, and the delivery agent will have requested that the message be
292 * deferred.
293 *
294 * Bounces are sent asynchronously to avoid stalling while the cleanup
295 * daemon waits for the qmgr to accept the "new mail" trigger.
296 *
297 * See also code in cleanup_bounce.c.
298 */
299 if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) {
300 if (st.st_size == 0) {
301 if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id))
302 msg_fatal("remove %s %s: %m",
303 MAIL_QUEUE_BOUNCE, message->queue_id);
304 } else {
305 if (msg_verbose)
306 msg_info("%s: bounce %s", myname, message->queue_id);
307 if (message->verp_delims == 0 || var_verp_bounce_off)
308 abounce_flush(BOUNCE_FLAG_KEEP,
309 message->queue_name,
310 message->queue_id,
311 message->encoding,
312 message->smtputf8,
313 message->sender,
314 message->dsn_envid,
315 message->dsn_ret,
316 qmgr_active_done_2_bounce_flush,
317 (void *) message);
318 else
319 abounce_flush_verp(BOUNCE_FLAG_KEEP,
320 message->queue_name,
321 message->queue_id,
322 message->encoding,
323 message->smtputf8,
324 message->sender,
325 message->dsn_envid,
326 message->dsn_ret,
327 message->verp_delims,
328 qmgr_active_done_2_bounce_flush,
329 (void *) message);
330 return;
331 }
332 }
333
334 /*
335 * Asynchronous processing does not reach this point.
336 */
337 qmgr_active_done_2_generic(message);
338 }
339
340 /* qmgr_active_done_2_bounce_flush - process abounce_flush() status */
341
qmgr_active_done_2_bounce_flush(int status,void * context)342 static void qmgr_active_done_2_bounce_flush(int status, void *context)
343 {
344 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
345
346 /*
347 * Process abounce_flush() status and continue processing.
348 */
349 message->flags |= status;
350 qmgr_active_done_2_generic(message);
351 }
352
353 /* qmgr_active_done_2_generic - continue processing */
354
qmgr_active_done_2_generic(QMGR_MESSAGE * message)355 static void qmgr_active_done_2_generic(QMGR_MESSAGE *message)
356 {
357 const char *path;
358 struct stat st;
359
360 /*
361 * A delivery agent marks a queue file as corrupt by changing its
362 * attributes, and by pretending that delivery was deferred.
363 */
364 if (message->flags
365 && mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) {
366 qmgr_active_corrupt(message->queue_id);
367 qmgr_message_free(message);
368 return;
369 }
370
371 /*
372 * If we did not read all recipients from this file, go read some more,
373 * but remember whether some recipients have to be tried again.
374 *
375 * Throwing away queue files seems bad, especially when they made it this
376 * far into the mail system. Therefore we save bad files to a separate
377 * directory for further inspection by a human being.
378 */
379 if (message->rcpt_offset > 0) {
380 if (qmgr_message_realloc(message) == 0) {
381 qmgr_active_corrupt(message->queue_id);
382 qmgr_message_free(message);
383 } else {
384 if (message->refcount == 0)
385 qmgr_active_done(message); /* recurse for consistency */
386 }
387 return;
388 }
389
390 /*
391 * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS
392 * and others not. Depending on what subset of recipients are delivered,
393 * a trace file may or may not be created. Even when the last partial
394 * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may
395 * still exist from a previous partial delivery attempt. So as long as
396 * any recipient has NOTIFY=SUCCESS we have to always look for the trace
397 * file and be prepared for the file not to exist.
398 *
399 * See also comments in bounce/bounce_notify_util.c.
400 */
401 if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD
402 | DEL_REQ_FLAG_REC_DLY_SENT))
403 || (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) {
404 atrace_flush(message->tflags,
405 message->queue_name,
406 message->queue_id,
407 message->encoding,
408 message->smtputf8,
409 message->sender,
410 message->dsn_envid,
411 message->dsn_ret,
412 qmgr_active_done_25_trace_flush,
413 (void *) message);
414 return;
415 }
416
417 /*
418 * Asynchronous processing does not reach this point.
419 */
420 qmgr_active_done_25_generic(message);
421 }
422
423 /* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */
424
qmgr_active_done_25_trace_flush(int status,void * context)425 static void qmgr_active_done_25_trace_flush(int status, void *context)
426 {
427 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
428
429 /*
430 * Process atrace_flush() status and continue processing.
431 */
432 if (status == 0 && message->tflags_offset)
433 qmgr_message_kill_record(message, message->tflags_offset);
434 message->flags |= status;
435 qmgr_active_done_25_generic(message);
436 }
437
438 /* qmgr_active_done_25_generic - continue processing */
439
qmgr_active_done_25_generic(QMGR_MESSAGE * message)440 static void qmgr_active_done_25_generic(QMGR_MESSAGE *message)
441 {
442 const char *myname = "qmgr_active_done_25_generic";
443 const char *expire_status = 0;
444
445 /*
446 * If we get to this point we have tried all recipients for this message.
447 * If the message is too old, try to bounce it.
448 *
449 * Bounces are sent asynchronously to avoid stalling while the cleanup
450 * daemon waits for the qmgr to accept the "new mail" trigger.
451 */
452 if (message->flags) {
453 if ((message->qflags & QMGR_FORCE_EXPIRE) != 0) {
454 expire_status = "force-expired";
455 } else if (event_time() >= message->create_time +
456 (*message->sender ? var_max_queue_time : var_dsn_queue_time)) {
457 expire_status = "expired";
458 } else {
459 expire_status = 0;
460 }
461 if (expire_status != 0) {
462 msg_info("%s: from=<%s>, status=%s, returned to sender",
463 message->queue_id, info_log_addr_form_sender(message->sender),
464 expire_status);
465 if (message->verp_delims == 0 || var_verp_bounce_off)
466 adefer_flush(BOUNCE_FLAG_KEEP,
467 message->queue_name,
468 message->queue_id,
469 message->encoding,
470 message->smtputf8,
471 message->sender,
472 message->dsn_envid,
473 message->dsn_ret,
474 qmgr_active_done_3_defer_flush,
475 (void *) message);
476 else
477 adefer_flush_verp(BOUNCE_FLAG_KEEP,
478 message->queue_name,
479 message->queue_id,
480 message->encoding,
481 message->smtputf8,
482 message->sender,
483 message->dsn_envid,
484 message->dsn_ret,
485 message->verp_delims,
486 qmgr_active_done_3_defer_flush,
487 (void *) message);
488 return;
489 } else if (message->warn_time > 0
490 && event_time() >= message->warn_time - 1) {
491 if (msg_verbose)
492 msg_info("%s: sending defer warning for %s", myname, message->queue_id);
493 adefer_warn(BOUNCE_FLAG_KEEP,
494 message->queue_name,
495 message->queue_id,
496 message->encoding,
497 message->smtputf8,
498 message->sender,
499 message->dsn_envid,
500 message->dsn_ret,
501 qmgr_active_done_3_defer_warn,
502 (void *) message);
503 return;
504 }
505 }
506
507 /*
508 * Asynchronous processing does not reach this point.
509 */
510 qmgr_active_done_3_generic(message);
511 }
512
513 /* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */
514
qmgr_active_done_3_defer_warn(int status,void * context)515 static void qmgr_active_done_3_defer_warn(int status, void *context)
516 {
517 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
518
519 /*
520 * Process adefer_warn() completion status and continue processing.
521 */
522 if (status == 0)
523 qmgr_message_update_warn(message);
524 qmgr_active_done_3_generic(message);
525 }
526
527 /* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */
528
qmgr_active_done_3_defer_flush(int status,void * context)529 static void qmgr_active_done_3_defer_flush(int status, void *context)
530 {
531 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
532
533 /*
534 * Process adefer_flush() status and continue processing.
535 */
536 message->flags = status;
537 qmgr_active_done_3_generic(message);
538 }
539
540 /* qmgr_active_done_3_generic - continue processing */
541
qmgr_active_done_3_generic(QMGR_MESSAGE * message)542 static void qmgr_active_done_3_generic(QMGR_MESSAGE *message)
543 {
544 const char *myname = "qmgr_active_done_3_generic";
545 int delay;
546
547 /*
548 * Some recipients need to be tried again. Move the queue file time
549 * stamps into the future by the amount of time that the message is
550 * delayed, and move the message to the deferred queue. Impose minimal
551 * and maximal backoff times.
552 *
553 * Since we look at actual time in queue, not time since last delivery
554 * attempt, backoff times will be distributed. However, we can still see
555 * spikes in delivery activity because the interval between deferred
556 * queue scans is finite.
557 */
558 if (message->flags) {
559 if (message->create_time > 0) {
560 delay = event_time() - message->create_time;
561 if (delay > var_max_backoff_time)
562 delay = var_max_backoff_time;
563 if (delay < var_min_backoff_time)
564 delay = var_min_backoff_time;
565 } else {
566 delay = var_min_backoff_time;
567 }
568 qmgr_active_defer(message->queue_name, message->queue_id,
569 MAIL_QUEUE_DEFERRED, delay);
570 }
571
572 /*
573 * All recipients done. Remove the queue file.
574 */
575 else {
576 if (mail_queue_remove(message->queue_name, message->queue_id)) {
577 if (errno != ENOENT)
578 msg_fatal("%s: remove %s from %s: %m", myname,
579 message->queue_id, message->queue_name);
580 msg_warn("%s: remove %s from %s: %m", myname,
581 message->queue_id, message->queue_name);
582 } else {
583 /* Same format as logged by postsuper. */
584 msg_info("%s: removed", message->queue_id);
585 }
586 }
587
588 /*
589 * Finally, delete the in-core message structure.
590 */
591 qmgr_message_free(message);
592 }
593
594 /* qmgr_active_drain - drain active queue by allocating a delivery process */
595
qmgr_active_drain(void)596 void qmgr_active_drain(void)
597 {
598 QMGR_TRANSPORT *transport;
599
600 /*
601 * Allocate one delivery process for every transport with pending mail.
602 * The process allocation completes asynchronously.
603 */
604 while ((transport = qmgr_transport_select()) != 0) {
605 if (msg_verbose)
606 msg_info("qmgr_active_drain: allocate %s", transport->name);
607 qmgr_transport_alloc(transport, qmgr_deliver);
608 }
609 }
610