1 /* $NetBSD: qmgr_deliver.c,v 1.3 2022/10/08 16:12:46 christos Exp $ */
2
3 /*++
4 /* NAME
5 /* qmgr_deliver 3
6 /* SUMMARY
7 /* deliver one per-site queue entry to that site
8 /* SYNOPSIS
9 /* #include "qmgr.h"
10 /*
11 /* int qmgr_deliver_concurrency;
12 /*
13 /* int qmgr_deliver(transport, fp)
14 /* QMGR_TRANSPORT *transport;
15 /* VSTREAM *fp;
16 /* DESCRIPTION
17 /* This module implements the client side of the `queue manager
18 /* to delivery agent' protocol. The queue manager uses
19 /* asynchronous I/O so that it can drive multiple delivery
20 /* agents in parallel. Depending on the outcome of a delivery
21 /* attempt, the status of messages, queues and transports is
22 /* updated.
23 /*
24 /* qmgr_deliver_concurrency is a global counter that says how
25 /* many delivery processes are in use. This can be used, for
26 /* example, to control the size of the `active' message queue.
27 /*
28 /* qmgr_deliver() executes when a delivery process announces its
29 /* availability for the named transport. It arranges for delivery
30 /* of a suitable queue entry. The \fIfp\fR argument specifies a
31 /* stream that is connected to the delivery process, or a null
32 /* pointer if the transport accepts no connection. Upon completion
33 /* of delivery (successful or not), the stream is closed, so that the
34 /* delivery process is released.
35 /* DIAGNOSTICS
36 /* LICENSE
37 /* .ad
38 /* .fi
39 /* The Secure Mailer license must be distributed with this software.
40 /* AUTHOR(S)
41 /* Wietse Venema
42 /* IBM T.J. Watson Research
43 /* P.O. Box 704
44 /* Yorktown Heights, NY 10598, USA
45 /*
46 /* Wietse Venema
47 /* Google, Inc.
48 /* 111 8th Avenue
49 /* New York, NY 10011, USA
50 /*--*/
51
52 /* System library. */
53
54 #include <sys_defs.h>
55 #include <time.h>
56 #include <string.h>
57
58 /* Utility library. */
59
60 #include <msg.h>
61 #include <vstring.h>
62 #include <vstream.h>
63 #include <vstring_vstream.h>
64 #include <events.h>
65 #include <iostuff.h>
66 #include <stringops.h>
67 #include <mymalloc.h>
68
69 /* Global library. */
70
71 #include <mail_queue.h>
72 #include <mail_proto.h>
73 #include <recipient_list.h>
74 #include <mail_params.h>
75 #include <deliver_request.h>
76 #include <verp_sender.h>
77 #include <dsn_util.h>
78 #include <dsn_buf.h>
79 #include <dsb_scan.h>
80 #include <rcpt_print.h>
81 #include <smtputf8.h>
82
83 /* Application-specific. */
84
85 #include "qmgr.h"
86
87 /*
88 * Important note on the _transport_rate_delay implementation: after
89 * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all
90 * code paths must directly or indirectly invoke qmgr_transport_unthrottle()
91 * or qmgr_transport_throttle(). Otherwise, transports with non-zero
92 * _transport_rate_delay will become stuck.
93 */
94
95 int qmgr_deliver_concurrency;
96
97 /*
98 * Message delivery status codes.
99 */
100 #define DELIVER_STAT_OK 0 /* all recipients delivered */
101 #define DELIVER_STAT_DEFER 1 /* try some recipients later */
102 #define DELIVER_STAT_CRASH 2 /* mailer internal problem */
103
104 /* qmgr_deliver_initial_reply - retrieve initial delivery process response */
105
qmgr_deliver_initial_reply(VSTREAM * stream)106 static int qmgr_deliver_initial_reply(VSTREAM *stream)
107 {
108 if (peekfd(vstream_fileno(stream)) < 0) {
109 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
110 return (DELIVER_STAT_CRASH);
111 } else if (attr_scan(stream, ATTR_FLAG_STRICT,
112 RECV_ATTR_STREQ(MAIL_ATTR_PROTO, MAIL_ATTR_PROTO_DELIVER),
113 ATTR_TYPE_END) != 0) {
114 msg_warn("%s: malformed response", VSTREAM_PATH(stream));
115 return (DELIVER_STAT_DEFER);
116 } else {
117 return (0);
118 }
119 }
120
121 /* qmgr_deliver_final_reply - retrieve final delivery process response */
122
qmgr_deliver_final_reply(VSTREAM * stream,DSN_BUF * dsb)123 static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb)
124 {
125 int stat;
126
127 if (peekfd(vstream_fileno(stream)) < 0) {
128 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
129 return (DELIVER_STAT_CRASH);
130 } else if (attr_scan(stream, ATTR_FLAG_STRICT,
131 RECV_ATTR_FUNC(dsb_scan, (void *) dsb),
132 RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat),
133 ATTR_TYPE_END) != 2) {
134 msg_warn("%s: malformed response", VSTREAM_PATH(stream));
135 return (DELIVER_STAT_CRASH);
136 } else {
137 return (stat ? DELIVER_STAT_DEFER : 0);
138 }
139 }
140
141 /* qmgr_deliver_send_request - send delivery request to delivery process */
142
qmgr_deliver_send_request(QMGR_ENTRY * entry,VSTREAM * stream)143 static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
144 {
145 RECIPIENT_LIST list = entry->rcpt_list;
146 RECIPIENT *recipient;
147 QMGR_MESSAGE *message = entry->message;
148 VSTRING *sender_buf = 0;
149 MSG_STATS stats;
150 char *sender;
151 int flags;
152 int smtputf8 = message->smtputf8;
153 const char *addr;
154
155 /*
156 * Todo: integrate with code up-stream that builds the delivery request.
157 */
158 for (recipient = list.info; recipient < list.info + list.len; recipient++)
159 if (var_smtputf8_enable && (addr = recipient->address)[0]
160 && !allascii(addr) && valid_utf8_string(addr, strlen(addr))) {
161 smtputf8 |= SMTPUTF8_FLAG_RECIPIENT;
162 if (message->verp_delims)
163 smtputf8 |= SMTPUTF8_FLAG_SENDER;
164 }
165
166 /*
167 * If variable envelope return path is requested, change prefix+@origin
168 * into prefix+user=domain@origin. Note that with VERP there is only one
169 * recipient per delivery.
170 */
171 if (message->verp_delims == 0) {
172 sender = message->sender;
173 } else {
174 sender_buf = vstring_alloc(100);
175 verp_sender(sender_buf, message->verp_delims,
176 message->sender, list.info);
177 sender = vstring_str(sender_buf);
178 }
179
180 flags = message->tflags
181 | entry->queue->dflags
182 | (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
183 (void) QMGR_MSG_STATS(&stats, message);
184 attr_print(stream, ATTR_FLAG_NONE,
185 SEND_ATTR_INT(MAIL_ATTR_FLAGS, flags),
186 SEND_ATTR_STR(MAIL_ATTR_QUEUE, message->queue_name),
187 SEND_ATTR_STR(MAIL_ATTR_QUEUEID, message->queue_id),
188 SEND_ATTR_LONG(MAIL_ATTR_OFFSET, message->data_offset),
189 SEND_ATTR_LONG(MAIL_ATTR_SIZE, message->cont_length),
190 SEND_ATTR_STR(MAIL_ATTR_NEXTHOP, entry->queue->nexthop),
191 SEND_ATTR_STR(MAIL_ATTR_ENCODING, message->encoding),
192 SEND_ATTR_INT(MAIL_ATTR_SMTPUTF8, smtputf8),
193 SEND_ATTR_STR(MAIL_ATTR_SENDER, sender),
194 SEND_ATTR_STR(MAIL_ATTR_DSN_ENVID, message->dsn_envid),
195 SEND_ATTR_INT(MAIL_ATTR_DSN_RET, message->dsn_ret),
196 SEND_ATTR_FUNC(msg_stats_print, (const void *) &stats),
197 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
198 SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_NAME, message->client_name),
199 SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr),
200 SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_PORT, message->client_port),
201 SEND_ATTR_STR(MAIL_ATTR_LOG_PROTO_NAME, message->client_proto),
202 SEND_ATTR_STR(MAIL_ATTR_LOG_HELO_NAME, message->client_helo),
203 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
204 SEND_ATTR_STR(MAIL_ATTR_SASL_METHOD, message->sasl_method),
205 SEND_ATTR_STR(MAIL_ATTR_SASL_USERNAME, message->sasl_username),
206 SEND_ATTR_STR(MAIL_ATTR_SASL_SENDER, message->sasl_sender),
207 /* XXX Ditto if we want to pass TLS certificate info. */
208 SEND_ATTR_STR(MAIL_ATTR_LOG_IDENT, message->log_ident),
209 SEND_ATTR_STR(MAIL_ATTR_RWR_CONTEXT, message->rewrite_context),
210 SEND_ATTR_INT(MAIL_ATTR_RCPT_COUNT, list.len),
211 ATTR_TYPE_END);
212 if (sender_buf != 0)
213 vstring_free(sender_buf);
214 for (recipient = list.info; recipient < list.info + list.len; recipient++)
215 attr_print(stream, ATTR_FLAG_NONE,
216 SEND_ATTR_FUNC(rcpt_print, (const void *) recipient),
217 ATTR_TYPE_END);
218 if (vstream_fflush(stream) != 0) {
219 msg_warn("write to process (%s): %m", entry->queue->transport->name);
220 return (-1);
221 } else {
222 if (msg_verbose)
223 msg_info("qmgr_deliver: site `%s'", entry->queue->name);
224 return (0);
225 }
226 }
227
228 /* qmgr_deliver_abort - transport response watchdog */
229
qmgr_deliver_abort(int unused_event,void * context)230 static void qmgr_deliver_abort(int unused_event, void *context)
231 {
232 QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
233 QMGR_QUEUE *queue = entry->queue;
234 QMGR_TRANSPORT *transport = queue->transport;
235 QMGR_MESSAGE *message = entry->message;
236
237 msg_fatal("%s: timeout receiving delivery status from transport: %s",
238 message->queue_id, transport->name);
239 }
240
241 /* qmgr_deliver_update - process delivery status report */
242
qmgr_deliver_update(int unused_event,void * context)243 static void qmgr_deliver_update(int unused_event, void *context)
244 {
245 QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
246 QMGR_QUEUE *queue = entry->queue;
247 QMGR_TRANSPORT *transport = queue->transport;
248 QMGR_MESSAGE *message = entry->message;
249 static DSN_BUF *dsb;
250 int status;
251
252 /*
253 * Release the delivery agent from a "hot" queue entry.
254 */
255 #define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
256 event_disable_readwrite(vstream_fileno(entry->stream)); \
257 (void) vstream_fclose(entry->stream); \
258 entry->stream = 0; \
259 qmgr_deliver_concurrency--; \
260 } while (0)
261
262 if (dsb == 0)
263 dsb = dsb_create();
264
265 /*
266 * The message transport has responded. Stop the watchdog timer.
267 */
268 event_cancel_timer(qmgr_deliver_abort, context);
269
270 /*
271 * Retrieve the delivery agent status report. The numerical status code
272 * indicates if delivery should be tried again. The reason text is sent
273 * only when a site should be avoided for a while, so that the queue
274 * manager can log why it does not even try to schedule delivery to the
275 * affected recipients.
276 */
277 status = qmgr_deliver_final_reply(entry->stream, dsb);
278
279 /*
280 * The mail delivery process failed for some reason (although delivery
281 * may have been successful). Back off with this transport type for a
282 * while. Dispose of queue entries for this transport that await
283 * selection (the todo lists). Stay away from queue entries that have
284 * been selected (the busy lists), or we would have dangling pointers.
285 * The queue itself won't go away before we dispose of the current queue
286 * entry.
287 */
288 if (status == DELIVER_STAT_CRASH) {
289 message->flags |= DELIVER_STAT_DEFER;
290 #if 0
291 whatsup = concatenate("unknown ", transport->name,
292 " mail transport error", (char *) 0);
293 qmgr_transport_throttle(transport,
294 DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup));
295 myfree(whatsup);
296 #else
297 qmgr_transport_throttle(transport,
298 DSN_SIMPLE(&dsb->dsn, "4.3.0",
299 "unknown mail transport error"));
300 #endif
301 msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
302 transport->name);
303
304 /*
305 * Assume the worst and write a defer logfile record for each
306 * recipient. This omission was already present in the first queue
307 * manager implementation of 199703, and was fixed 200511.
308 *
309 * To avoid the synchronous qmgr_defer_recipient() operation for each
310 * recipient of this queue entry, release the delivery process and
311 * move the entry back to the todo queue. Let qmgr_defer_transport()
312 * log the recipient asynchronously if possible, and get out of here.
313 * Note: if asynchronous logging is not possible,
314 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and
315 * the entry becomes a dangling pointer.
316 */
317 QMGR_DELIVER_RELEASE_AGENT(entry);
318 qmgr_entry_unselect(queue, entry);
319 qmgr_defer_transport(transport, &dsb->dsn);
320 return;
321 }
322
323 /*
324 * This message must be tried again.
325 *
326 * If we have a problem talking to this site, back off with this site for a
327 * while; dispose of queue entries for this site that await selection
328 * (the todo list); stay away from queue entries that have been selected
329 * (the busy list), or we would have dangling pointers. The queue itself
330 * won't go away before we dispose of the current queue entry.
331 *
332 * XXX Caution: DSN_COPY() will panic on empty status or reason.
333 */
334 #define SUSPENDED "delivery temporarily suspended: "
335
336 if (status == DELIVER_STAT_DEFER) {
337 message->flags |= DELIVER_STAT_DEFER;
338 if (VSTRING_LEN(dsb->status)) {
339 /* Sanitize the DSN status/reason from the delivery agent. */
340 if (!dsn_valid(vstring_str(dsb->status)))
341 vstring_strcpy(dsb->status, "4.0.0");
342 if (VSTRING_LEN(dsb->reason) == 0)
343 vstring_strcpy(dsb->reason, "unknown error");
344 vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1);
345 if (QMGR_QUEUE_READY(queue)) {
346 qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb));
347 if (QMGR_QUEUE_THROTTLED(queue))
348 qmgr_defer_todo(queue, &dsb->dsn);
349 }
350 }
351 }
352
353 /*
354 * No problems detected. Mark the transport and queue as alive. The queue
355 * itself won't go away before we dispose of the current queue entry.
356 */
357 if (status != DELIVER_STAT_CRASH) {
358 qmgr_transport_unthrottle(transport);
359 if (VSTRING_LEN(dsb->reason) == 0)
360 qmgr_queue_unthrottle(queue);
361 }
362
363 /*
364 * Release the delivery process, and give some other queue entry a chance
365 * to be delivered. When all recipients for a message have been tried,
366 * decide what to do next with this message: defer, bounce, delete.
367 */
368 QMGR_DELIVER_RELEASE_AGENT(entry);
369 qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
370 }
371
372 /* qmgr_deliver - deliver one per-site queue entry */
373
qmgr_deliver(QMGR_TRANSPORT * transport,VSTREAM * stream)374 void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
375 {
376 QMGR_QUEUE *queue;
377 QMGR_ENTRY *entry;
378 DSN dsn;
379
380 /*
381 * Find out if this delivery process is really available. Once elected,
382 * the delivery process is supposed to express its happiness. If there is
383 * a problem, wipe the pending deliveries for this transport. This
384 * routine runs in response to an external event, so it does not run
385 * while some other queue manipulation is happening.
386 */
387 if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) {
388 #if 0
389 whatsup = concatenate(transport->name,
390 " mail transport unavailable", (char *) 0);
391 qmgr_transport_throttle(transport,
392 DSN_SIMPLE(&dsn, "4.3.0", whatsup));
393 myfree(whatsup);
394 #else
395 qmgr_transport_throttle(transport,
396 DSN_SIMPLE(&dsn, "4.3.0",
397 "mail transport unavailable"));
398 #endif
399 qmgr_defer_transport(transport, &dsn);
400 if (stream)
401 (void) vstream_fclose(stream);
402 return;
403 }
404
405 /*
406 * Find a suitable queue entry. Things may have changed since this
407 * transport was allocated. If no suitable entry is found,
408 * unceremoniously disconnect from the delivery process. The delivery
409 * agent request reading routine is prepared for the queue manager to
410 * change its mind for no apparent reason.
411 */
412 if ((queue = qmgr_queue_select(transport)) == 0
413 || (entry = qmgr_entry_select(queue)) == 0) {
414 (void) vstream_fclose(stream);
415 return;
416 }
417
418 /*
419 * Send the queue file info and recipient info to the delivery process.
420 * If there is a problem, wipe the pending deliveries for this transport.
421 * This routine runs in response to an external event, so it does not run
422 * while some other queue manipulation is happening.
423 */
424 if (qmgr_deliver_send_request(entry, stream) < 0) {
425 qmgr_entry_unselect(queue, entry);
426 #if 0
427 whatsup = concatenate(transport->name,
428 " mail transport unavailable", (char *) 0);
429 qmgr_transport_throttle(transport,
430 DSN_SIMPLE(&dsn, "4.3.0", whatsup));
431 myfree(whatsup);
432 #else
433 qmgr_transport_throttle(transport,
434 DSN_SIMPLE(&dsn, "4.3.0",
435 "mail transport unavailable"));
436 #endif
437 qmgr_defer_transport(transport, &dsn);
438 /* warning: entry and queue may be dangling pointers here */
439 (void) vstream_fclose(stream);
440 return;
441 }
442
443 /*
444 * If we get this far, go wait for the delivery status report.
445 */
446 qmgr_deliver_concurrency++;
447 entry->stream = stream;
448 event_enable_read(vstream_fileno(stream),
449 qmgr_deliver_update, (void *) entry);
450
451 /*
452 * Guard against broken systems.
453 */
454 event_request_timer(qmgr_deliver_abort, (void *) entry, var_daemon_timeout);
455 }
456