xref: /netbsd-src/external/ibm-public/postfix/dist/src/oqmgr/qmgr_deliver.c (revision 67b9b338a7386232ac596b5fd0cd5a9cc8a03c71)
1 /*	$NetBSD: qmgr_deliver.c,v 1.3 2022/10/08 16:12:46 christos Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_deliver 3
6 /* SUMMARY
7 /*	deliver one per-site queue entry to that site
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	int	qmgr_deliver_concurrency;
12 /*
13 /*	int	qmgr_deliver(transport, fp)
14 /*	QMGR_TRANSPORT *transport;
15 /*	VSTREAM	*fp;
16 /* DESCRIPTION
17 /*	This module implements the client side of the `queue manager
18 /*	to delivery agent' protocol. The queue manager uses
19 /*	asynchronous I/O so that it can drive multiple delivery
20 /*	agents in parallel. Depending on the outcome of a delivery
21 /*	attempt, the status of messages, queues and transports is
22 /*	updated.
23 /*
24 /*	qmgr_deliver_concurrency is a global counter that says how
25 /*	many delivery processes are in use. This can be used, for
26 /*	example, to control the size of the `active' message queue.
27 /*
28 /*	qmgr_deliver() executes when a delivery process announces its
29 /*	availability for the named transport. It arranges for delivery
30 /*	of a suitable queue entry.  The \fIfp\fR argument specifies a
31 /*	stream that is connected to the delivery process, or a null
32 /*	pointer if the transport accepts no connection. Upon completion
33 /*	of delivery (successful or not), the stream is closed, so that the
34 /*	delivery process is released.
35 /* DIAGNOSTICS
36 /* LICENSE
37 /* .ad
38 /* .fi
39 /*	The Secure Mailer license must be distributed with this software.
40 /* AUTHOR(S)
41 /*	Wietse Venema
42 /*	IBM T.J. Watson Research
43 /*	P.O. Box 704
44 /*	Yorktown Heights, NY 10598, USA
45 /*
46 /*	Wietse Venema
47 /*	Google, Inc.
48 /*	111 8th Avenue
49 /*	New York, NY 10011, USA
50 /*--*/
51 
52 /* System library. */
53 
54 #include <sys_defs.h>
55 #include <time.h>
56 #include <string.h>
57 
58 /* Utility library. */
59 
60 #include <msg.h>
61 #include <vstring.h>
62 #include <vstream.h>
63 #include <vstring_vstream.h>
64 #include <events.h>
65 #include <iostuff.h>
66 #include <stringops.h>
67 #include <mymalloc.h>
68 
69 /* Global library. */
70 
71 #include <mail_queue.h>
72 #include <mail_proto.h>
73 #include <recipient_list.h>
74 #include <mail_params.h>
75 #include <deliver_request.h>
76 #include <verp_sender.h>
77 #include <dsn_util.h>
78 #include <dsn_buf.h>
79 #include <dsb_scan.h>
80 #include <rcpt_print.h>
81 #include <smtputf8.h>
82 
83 /* Application-specific. */
84 
85 #include "qmgr.h"
86 
87  /*
88   * Important note on the _transport_rate_delay implementation: after
89   * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all
90   * code paths must directly or indirectly invoke qmgr_transport_unthrottle()
91   * or qmgr_transport_throttle(). Otherwise, transports with non-zero
92   * _transport_rate_delay will become stuck.
93   */
94 
95 int     qmgr_deliver_concurrency;
96 
97  /*
98   * Message delivery status codes.
99   */
100 #define DELIVER_STAT_OK		0	/* all recipients delivered */
101 #define DELIVER_STAT_DEFER	1	/* try some recipients later */
102 #define DELIVER_STAT_CRASH	2	/* mailer internal problem */
103 
104 /* qmgr_deliver_initial_reply - retrieve initial delivery process response */
105 
qmgr_deliver_initial_reply(VSTREAM * stream)106 static int qmgr_deliver_initial_reply(VSTREAM *stream)
107 {
108     if (peekfd(vstream_fileno(stream)) < 0) {
109 	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
110 	return (DELIVER_STAT_CRASH);
111     } else if (attr_scan(stream, ATTR_FLAG_STRICT,
112 		  RECV_ATTR_STREQ(MAIL_ATTR_PROTO, MAIL_ATTR_PROTO_DELIVER),
113 			 ATTR_TYPE_END) != 0) {
114 	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
115 	return (DELIVER_STAT_DEFER);
116     } else {
117 	return (0);
118     }
119 }
120 
121 /* qmgr_deliver_final_reply - retrieve final delivery process response */
122 
qmgr_deliver_final_reply(VSTREAM * stream,DSN_BUF * dsb)123 static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb)
124 {
125     int     stat;
126 
127     if (peekfd(vstream_fileno(stream)) < 0) {
128 	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
129 	return (DELIVER_STAT_CRASH);
130     } else if (attr_scan(stream, ATTR_FLAG_STRICT,
131 			 RECV_ATTR_FUNC(dsb_scan, (void *) dsb),
132 			 RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat),
133 			 ATTR_TYPE_END) != 2) {
134 	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
135 	return (DELIVER_STAT_CRASH);
136     } else {
137 	return (stat ? DELIVER_STAT_DEFER : 0);
138     }
139 }
140 
141 /* qmgr_deliver_send_request - send delivery request to delivery process */
142 
qmgr_deliver_send_request(QMGR_ENTRY * entry,VSTREAM * stream)143 static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
144 {
145     RECIPIENT_LIST list = entry->rcpt_list;
146     RECIPIENT *recipient;
147     QMGR_MESSAGE *message = entry->message;
148     VSTRING *sender_buf = 0;
149     MSG_STATS stats;
150     char   *sender;
151     int     flags;
152     int     smtputf8 = message->smtputf8;
153     const char *addr;
154 
155     /*
156      * Todo: integrate with code up-stream that builds the delivery request.
157      */
158     for (recipient = list.info; recipient < list.info + list.len; recipient++)
159 	if (var_smtputf8_enable && (addr = recipient->address)[0]
160 	    && !allascii(addr) && valid_utf8_string(addr, strlen(addr))) {
161 	    smtputf8 |= SMTPUTF8_FLAG_RECIPIENT;
162 	    if (message->verp_delims)
163 		smtputf8 |= SMTPUTF8_FLAG_SENDER;
164 	}
165 
166     /*
167      * If variable envelope return path is requested, change prefix+@origin
168      * into prefix+user=domain@origin. Note that with VERP there is only one
169      * recipient per delivery.
170      */
171     if (message->verp_delims == 0) {
172 	sender = message->sender;
173     } else {
174 	sender_buf = vstring_alloc(100);
175 	verp_sender(sender_buf, message->verp_delims,
176 		    message->sender, list.info);
177 	sender = vstring_str(sender_buf);
178     }
179 
180     flags = message->tflags
181 	| entry->queue->dflags
182 	| (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
183     (void) QMGR_MSG_STATS(&stats, message);
184     attr_print(stream, ATTR_FLAG_NONE,
185 	       SEND_ATTR_INT(MAIL_ATTR_FLAGS, flags),
186 	       SEND_ATTR_STR(MAIL_ATTR_QUEUE, message->queue_name),
187 	       SEND_ATTR_STR(MAIL_ATTR_QUEUEID, message->queue_id),
188 	       SEND_ATTR_LONG(MAIL_ATTR_OFFSET, message->data_offset),
189 	       SEND_ATTR_LONG(MAIL_ATTR_SIZE, message->cont_length),
190 	       SEND_ATTR_STR(MAIL_ATTR_NEXTHOP, entry->queue->nexthop),
191 	       SEND_ATTR_STR(MAIL_ATTR_ENCODING, message->encoding),
192 	       SEND_ATTR_INT(MAIL_ATTR_SMTPUTF8, smtputf8),
193 	       SEND_ATTR_STR(MAIL_ATTR_SENDER, sender),
194 	       SEND_ATTR_STR(MAIL_ATTR_DSN_ENVID, message->dsn_envid),
195 	       SEND_ATTR_INT(MAIL_ATTR_DSN_RET, message->dsn_ret),
196 	       SEND_ATTR_FUNC(msg_stats_print, (const void *) &stats),
197     /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
198 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_NAME, message->client_name),
199 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr),
200 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_PORT, message->client_port),
201 	     SEND_ATTR_STR(MAIL_ATTR_LOG_PROTO_NAME, message->client_proto),
202 	       SEND_ATTR_STR(MAIL_ATTR_LOG_HELO_NAME, message->client_helo),
203     /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
204 	       SEND_ATTR_STR(MAIL_ATTR_SASL_METHOD, message->sasl_method),
205 	     SEND_ATTR_STR(MAIL_ATTR_SASL_USERNAME, message->sasl_username),
206 	       SEND_ATTR_STR(MAIL_ATTR_SASL_SENDER, message->sasl_sender),
207     /* XXX Ditto if we want to pass TLS certificate info. */
208 	       SEND_ATTR_STR(MAIL_ATTR_LOG_IDENT, message->log_ident),
209 	     SEND_ATTR_STR(MAIL_ATTR_RWR_CONTEXT, message->rewrite_context),
210 	       SEND_ATTR_INT(MAIL_ATTR_RCPT_COUNT, list.len),
211 	       ATTR_TYPE_END);
212     if (sender_buf != 0)
213 	vstring_free(sender_buf);
214     for (recipient = list.info; recipient < list.info + list.len; recipient++)
215 	attr_print(stream, ATTR_FLAG_NONE,
216 		   SEND_ATTR_FUNC(rcpt_print, (const void *) recipient),
217 		   ATTR_TYPE_END);
218     if (vstream_fflush(stream) != 0) {
219 	msg_warn("write to process (%s): %m", entry->queue->transport->name);
220 	return (-1);
221     } else {
222 	if (msg_verbose)
223 	    msg_info("qmgr_deliver: site `%s'", entry->queue->name);
224 	return (0);
225     }
226 }
227 
228 /* qmgr_deliver_abort - transport response watchdog */
229 
qmgr_deliver_abort(int unused_event,void * context)230 static void qmgr_deliver_abort(int unused_event, void *context)
231 {
232     QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
233     QMGR_QUEUE *queue = entry->queue;
234     QMGR_TRANSPORT *transport = queue->transport;
235     QMGR_MESSAGE *message = entry->message;
236 
237     msg_fatal("%s: timeout receiving delivery status from transport: %s",
238 	      message->queue_id, transport->name);
239 }
240 
241 /* qmgr_deliver_update - process delivery status report */
242 
qmgr_deliver_update(int unused_event,void * context)243 static void qmgr_deliver_update(int unused_event, void *context)
244 {
245     QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
246     QMGR_QUEUE *queue = entry->queue;
247     QMGR_TRANSPORT *transport = queue->transport;
248     QMGR_MESSAGE *message = entry->message;
249     static DSN_BUF *dsb;
250     int     status;
251 
252     /*
253      * Release the delivery agent from a "hot" queue entry.
254      */
255 #define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
256 	event_disable_readwrite(vstream_fileno(entry->stream)); \
257 	(void) vstream_fclose(entry->stream); \
258 	entry->stream = 0; \
259 	qmgr_deliver_concurrency--; \
260     } while (0)
261 
262     if (dsb == 0)
263 	dsb = dsb_create();
264 
265     /*
266      * The message transport has responded. Stop the watchdog timer.
267      */
268     event_cancel_timer(qmgr_deliver_abort, context);
269 
270     /*
271      * Retrieve the delivery agent status report. The numerical status code
272      * indicates if delivery should be tried again. The reason text is sent
273      * only when a site should be avoided for a while, so that the queue
274      * manager can log why it does not even try to schedule delivery to the
275      * affected recipients.
276      */
277     status = qmgr_deliver_final_reply(entry->stream, dsb);
278 
279     /*
280      * The mail delivery process failed for some reason (although delivery
281      * may have been successful). Back off with this transport type for a
282      * while. Dispose of queue entries for this transport that await
283      * selection (the todo lists). Stay away from queue entries that have
284      * been selected (the busy lists), or we would have dangling pointers.
285      * The queue itself won't go away before we dispose of the current queue
286      * entry.
287      */
288     if (status == DELIVER_STAT_CRASH) {
289 	message->flags |= DELIVER_STAT_DEFER;
290 #if 0
291 	whatsup = concatenate("unknown ", transport->name,
292 			      " mail transport error", (char *) 0);
293 	qmgr_transport_throttle(transport,
294 				DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup));
295 	myfree(whatsup);
296 #else
297 	qmgr_transport_throttle(transport,
298 				DSN_SIMPLE(&dsb->dsn, "4.3.0",
299 					   "unknown mail transport error"));
300 #endif
301 	msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
302 		 transport->name);
303 
304 	/*
305 	 * Assume the worst and write a defer logfile record for each
306 	 * recipient. This omission was already present in the first queue
307 	 * manager implementation of 199703, and was fixed 200511.
308 	 *
309 	 * To avoid the synchronous qmgr_defer_recipient() operation for each
310 	 * recipient of this queue entry, release the delivery process and
311 	 * move the entry back to the todo queue. Let qmgr_defer_transport()
312 	 * log the recipient asynchronously if possible, and get out of here.
313 	 * Note: if asynchronous logging is not possible,
314 	 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and
315 	 * the entry becomes a dangling pointer.
316 	 */
317 	QMGR_DELIVER_RELEASE_AGENT(entry);
318 	qmgr_entry_unselect(queue, entry);
319 	qmgr_defer_transport(transport, &dsb->dsn);
320 	return;
321     }
322 
323     /*
324      * This message must be tried again.
325      *
326      * If we have a problem talking to this site, back off with this site for a
327      * while; dispose of queue entries for this site that await selection
328      * (the todo list); stay away from queue entries that have been selected
329      * (the busy list), or we would have dangling pointers. The queue itself
330      * won't go away before we dispose of the current queue entry.
331      *
332      * XXX Caution: DSN_COPY() will panic on empty status or reason.
333      */
334 #define SUSPENDED	"delivery temporarily suspended: "
335 
336     if (status == DELIVER_STAT_DEFER) {
337 	message->flags |= DELIVER_STAT_DEFER;
338 	if (VSTRING_LEN(dsb->status)) {
339 	    /* Sanitize the DSN status/reason from the delivery agent. */
340 	    if (!dsn_valid(vstring_str(dsb->status)))
341 		vstring_strcpy(dsb->status, "4.0.0");
342 	    if (VSTRING_LEN(dsb->reason) == 0)
343 		vstring_strcpy(dsb->reason, "unknown error");
344 	    vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1);
345 	    if (QMGR_QUEUE_READY(queue)) {
346 		qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb));
347 		if (QMGR_QUEUE_THROTTLED(queue))
348 		    qmgr_defer_todo(queue, &dsb->dsn);
349 	    }
350 	}
351     }
352 
353     /*
354      * No problems detected. Mark the transport and queue as alive. The queue
355      * itself won't go away before we dispose of the current queue entry.
356      */
357     if (status != DELIVER_STAT_CRASH) {
358 	qmgr_transport_unthrottle(transport);
359 	if (VSTRING_LEN(dsb->reason) == 0)
360 	    qmgr_queue_unthrottle(queue);
361     }
362 
363     /*
364      * Release the delivery process, and give some other queue entry a chance
365      * to be delivered. When all recipients for a message have been tried,
366      * decide what to do next with this message: defer, bounce, delete.
367      */
368     QMGR_DELIVER_RELEASE_AGENT(entry);
369     qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
370 }
371 
372 /* qmgr_deliver - deliver one per-site queue entry */
373 
qmgr_deliver(QMGR_TRANSPORT * transport,VSTREAM * stream)374 void    qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
375 {
376     QMGR_QUEUE *queue;
377     QMGR_ENTRY *entry;
378     DSN     dsn;
379 
380     /*
381      * Find out if this delivery process is really available. Once elected,
382      * the delivery process is supposed to express its happiness. If there is
383      * a problem, wipe the pending deliveries for this transport. This
384      * routine runs in response to an external event, so it does not run
385      * while some other queue manipulation is happening.
386      */
387     if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) {
388 #if 0
389 	whatsup = concatenate(transport->name,
390 			      " mail transport unavailable", (char *) 0);
391 	qmgr_transport_throttle(transport,
392 				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
393 	myfree(whatsup);
394 #else
395 	qmgr_transport_throttle(transport,
396 				DSN_SIMPLE(&dsn, "4.3.0",
397 					   "mail transport unavailable"));
398 #endif
399 	qmgr_defer_transport(transport, &dsn);
400 	if (stream)
401 	    (void) vstream_fclose(stream);
402 	return;
403     }
404 
405     /*
406      * Find a suitable queue entry. Things may have changed since this
407      * transport was allocated. If no suitable entry is found,
408      * unceremoniously disconnect from the delivery process. The delivery
409      * agent request reading routine is prepared for the queue manager to
410      * change its mind for no apparent reason.
411      */
412     if ((queue = qmgr_queue_select(transport)) == 0
413 	|| (entry = qmgr_entry_select(queue)) == 0) {
414 	(void) vstream_fclose(stream);
415 	return;
416     }
417 
418     /*
419      * Send the queue file info and recipient info to the delivery process.
420      * If there is a problem, wipe the pending deliveries for this transport.
421      * This routine runs in response to an external event, so it does not run
422      * while some other queue manipulation is happening.
423      */
424     if (qmgr_deliver_send_request(entry, stream) < 0) {
425 	qmgr_entry_unselect(queue, entry);
426 #if 0
427 	whatsup = concatenate(transport->name,
428 			      " mail transport unavailable", (char *) 0);
429 	qmgr_transport_throttle(transport,
430 				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
431 	myfree(whatsup);
432 #else
433 	qmgr_transport_throttle(transport,
434 				DSN_SIMPLE(&dsn, "4.3.0",
435 					   "mail transport unavailable"));
436 #endif
437 	qmgr_defer_transport(transport, &dsn);
438 	/* warning: entry and queue may be dangling pointers here */
439 	(void) vstream_fclose(stream);
440 	return;
441     }
442 
443     /*
444      * If we get this far, go wait for the delivery status report.
445      */
446     qmgr_deliver_concurrency++;
447     entry->stream = stream;
448     event_enable_read(vstream_fileno(stream),
449 		      qmgr_deliver_update, (void *) entry);
450 
451     /*
452      * Guard against broken systems.
453      */
454     event_request_timer(qmgr_deliver_abort, (void *) entry, var_daemon_timeout);
455 }
456