xref: /netbsd-src/external/ibm-public/postfix/dist/src/qmgr/qmgr_deliver.c (revision c38e7cc395b1472a774ff828e46123de44c628e9)
1 /*	$NetBSD: qmgr_deliver.c,v 1.2 2017/02/14 01:16:47 christos Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_deliver 3
6 /* SUMMARY
7 /*	deliver one per-site queue entry to that site
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	int	qmgr_deliver_concurrency;
12 /*
13 /*	int	qmgr_deliver(transport, fp)
14 /*	QMGR_TRANSPORT *transport;
15 /*	VSTREAM	*fp;
16 /* DESCRIPTION
17 /*	This module implements the client side of the `queue manager
18 /*	to delivery agent' protocol. The queue manager uses
19 /*	asynchronous I/O so that it can drive multiple delivery
20 /*	agents in parallel. Depending on the outcome of a delivery
21 /*	attempt, the status of messages, queues and transports is
22 /*	updated.
23 /*
24 /*	qmgr_deliver_concurrency is a global counter that says how
25 /*	many delivery processes are in use. This can be used, for
26 /*	example, to control the size of the `active' message queue.
27 /*
28 /*	qmgr_deliver() executes when a delivery process announces its
29 /*	availability for the named transport. It arranges for delivery
30 /*	of a suitable queue entry.  The \fIfp\fR argument specifies a
31 /*	stream that is connected to a delivery process, or a null
32 /*	pointer if the transport accepts no connection. Upon completion
33 /*	of delivery (successful or not), the stream is closed, so that the
34 /*	delivery process is released.
35 /* DIAGNOSTICS
36 /* LICENSE
37 /* .ad
38 /* .fi
39 /*	The Secure Mailer license must be distributed with this software.
40 /* AUTHOR(S)
41 /*	Wietse Venema
42 /*	IBM T.J. Watson Research
43 /*	P.O. Box 704
44 /*	Yorktown Heights, NY 10598, USA
45 /*
46 /*	Preemptive scheduler enhancements:
47 /*	Patrik Rak
48 /*	Modra 6
49 /*	155 00, Prague, Czech Republic
50 /*
51 /*	Wietse Venema
52 /*	Google, Inc.
53 /*	111 8th Avenue
54 /*	New York, NY 10011, USA
55 /*--*/
56 
57 /* System library. */
58 
59 #include <sys_defs.h>
60 #include <time.h>
61 #include <string.h>
62 
63 /* Utility library. */
64 
65 #include <msg.h>
66 #include <vstring.h>
67 #include <vstream.h>
68 #include <vstring_vstream.h>
69 #include <events.h>
70 #include <iostuff.h>
71 #include <stringops.h>
72 #include <mymalloc.h>
73 
74 /* Global library. */
75 
76 #include <mail_queue.h>
77 #include <mail_proto.h>
78 #include <recipient_list.h>
79 #include <mail_params.h>
80 #include <deliver_request.h>
81 #include <verp_sender.h>
82 #include <dsn_util.h>
83 #include <dsn_buf.h>
84 #include <dsb_scan.h>
85 #include <rcpt_print.h>
86 #include <smtputf8.h>
87 
88 /* Application-specific. */
89 
90 #include "qmgr.h"
91 
92  /*
93   * Important note on the _transport_rate_delay implementation: after
94   * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all
95   * code paths must directly or indirectly invoke qmgr_transport_unthrottle()
96   * or qmgr_transport_throttle(). Otherwise, transports with non-zero
97   * _transport_rate_delay will become stuck.
98   */
99 
100 int     qmgr_deliver_concurrency;
101 
102  /*
103   * Message delivery status codes.
104   */
105 #define DELIVER_STAT_OK		0	/* all recipients delivered */
106 #define DELIVER_STAT_DEFER	1	/* try some recipients later */
107 #define DELIVER_STAT_CRASH	2	/* mailer internal problem */
108 
109 /* qmgr_deliver_initial_reply - retrieve initial delivery process response */
110 
111 static int qmgr_deliver_initial_reply(VSTREAM *stream)
112 {
113     int     stat;
114 
115     if (peekfd(vstream_fileno(stream)) < 0) {
116 	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
117 	return (DELIVER_STAT_CRASH);
118     } else if (attr_scan(stream, ATTR_FLAG_STRICT,
119 			 RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat),
120 			 ATTR_TYPE_END) != 1) {
121 	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
122 	return (DELIVER_STAT_CRASH);
123     } else {
124 	return (stat ? DELIVER_STAT_DEFER : 0);
125     }
126 }
127 
128 /* qmgr_deliver_final_reply - retrieve final delivery process response */
129 
130 static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb)
131 {
132     int     stat;
133 
134     if (peekfd(vstream_fileno(stream)) < 0) {
135 	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
136 	return (DELIVER_STAT_CRASH);
137     } else if (attr_scan(stream, ATTR_FLAG_STRICT,
138 			 RECV_ATTR_FUNC(dsb_scan, (void *) dsb),
139 			 RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat),
140 			 ATTR_TYPE_END) != 2) {
141 	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
142 	return (DELIVER_STAT_CRASH);
143     } else {
144 	return (stat ? DELIVER_STAT_DEFER : 0);
145     }
146 }
147 
148 /* qmgr_deliver_send_request - send delivery request to delivery process */
149 
150 static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
151 {
152     RECIPIENT_LIST list = entry->rcpt_list;
153     RECIPIENT *recipient;
154     QMGR_MESSAGE *message = entry->message;
155     VSTRING *sender_buf = 0;
156     MSG_STATS stats;
157     char   *sender;
158     int     flags;
159     int     smtputf8 = message->smtputf8;
160     const char *addr;
161 
162     /*
163      * Todo: integrate with code up-stream that builds the delivery request.
164      */
165     for (recipient = list.info; recipient < list.info + list.len; recipient++)
166 	if (var_smtputf8_enable && (addr = recipient->address)[0]
167 	    && !allascii(addr) && valid_utf8_string(addr, strlen(addr))) {
168 	    smtputf8 |= SMTPUTF8_FLAG_RECIPIENT;
169 	    if (message->verp_delims)
170 		smtputf8 |= SMTPUTF8_FLAG_SENDER;
171 	}
172 
173     /*
174      * If variable envelope return path is requested, change prefix+@origin
175      * into prefix+user=domain@origin. Note that with VERP there is only one
176      * recipient per delivery.
177      */
178     if (message->verp_delims == 0) {
179 	sender = message->sender;
180     } else {
181 	sender_buf = vstring_alloc(100);
182 	verp_sender(sender_buf, message->verp_delims,
183 		    message->sender, list.info);
184 	sender = vstring_str(sender_buf);
185     }
186 
187     flags = message->tflags
188 	| entry->queue->dflags
189 	| (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
190     (void) QMGR_MSG_STATS(&stats, message);
191     attr_print(stream, ATTR_FLAG_NONE,
192 	       SEND_ATTR_INT(MAIL_ATTR_FLAGS, flags),
193 	       SEND_ATTR_STR(MAIL_ATTR_QUEUE, message->queue_name),
194 	       SEND_ATTR_STR(MAIL_ATTR_QUEUEID, message->queue_id),
195 	       SEND_ATTR_LONG(MAIL_ATTR_OFFSET, message->data_offset),
196 	       SEND_ATTR_LONG(MAIL_ATTR_SIZE, message->cont_length),
197 	       SEND_ATTR_STR(MAIL_ATTR_NEXTHOP, entry->queue->nexthop),
198 	       SEND_ATTR_STR(MAIL_ATTR_ENCODING, message->encoding),
199 	       SEND_ATTR_INT(MAIL_ATTR_SMTPUTF8, smtputf8),
200 	       SEND_ATTR_STR(MAIL_ATTR_SENDER, sender),
201 	       SEND_ATTR_STR(MAIL_ATTR_DSN_ENVID, message->dsn_envid),
202 	       SEND_ATTR_INT(MAIL_ATTR_DSN_RET, message->dsn_ret),
203 	       SEND_ATTR_FUNC(msg_stats_print, (void *) &stats),
204     /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
205 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_NAME, message->client_name),
206 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr),
207 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_PORT, message->client_port),
208 	     SEND_ATTR_STR(MAIL_ATTR_LOG_PROTO_NAME, message->client_proto),
209 	       SEND_ATTR_STR(MAIL_ATTR_LOG_HELO_NAME, message->client_helo),
210     /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
211 	       SEND_ATTR_STR(MAIL_ATTR_SASL_METHOD, message->sasl_method),
212 	     SEND_ATTR_STR(MAIL_ATTR_SASL_USERNAME, message->sasl_username),
213 	       SEND_ATTR_STR(MAIL_ATTR_SASL_SENDER, message->sasl_sender),
214     /* XXX Ditto if we want to pass TLS certificate info. */
215 	       SEND_ATTR_STR(MAIL_ATTR_LOG_IDENT, message->log_ident),
216 	     SEND_ATTR_STR(MAIL_ATTR_RWR_CONTEXT, message->rewrite_context),
217 	       SEND_ATTR_INT(MAIL_ATTR_RCPT_COUNT, list.len),
218 	       ATTR_TYPE_END);
219     if (sender_buf != 0)
220 	vstring_free(sender_buf);
221     for (recipient = list.info; recipient < list.info + list.len; recipient++)
222 	attr_print(stream, ATTR_FLAG_NONE,
223 		   SEND_ATTR_FUNC(rcpt_print, (void *) recipient),
224 		   ATTR_TYPE_END);
225     if (vstream_fflush(stream) != 0) {
226 	msg_warn("write to process (%s): %m", entry->queue->transport->name);
227 	return (-1);
228     } else {
229 	if (msg_verbose)
230 	    msg_info("qmgr_deliver: site `%s'", entry->queue->name);
231 	return (0);
232     }
233 }
234 
235 /* qmgr_deliver_abort - transport response watchdog */
236 
237 static void qmgr_deliver_abort(int unused_event, void *context)
238 {
239     QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
240     QMGR_QUEUE *queue = entry->queue;
241     QMGR_TRANSPORT *transport = queue->transport;
242     QMGR_MESSAGE *message = entry->message;
243 
244     msg_fatal("%s: timeout receiving delivery status from transport: %s",
245 	      message->queue_id, transport->name);
246 }
247 
248 /* qmgr_deliver_update - process delivery status report */
249 
250 static void qmgr_deliver_update(int unused_event, void *context)
251 {
252     QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
253     QMGR_QUEUE *queue = entry->queue;
254     QMGR_TRANSPORT *transport = queue->transport;
255     QMGR_MESSAGE *message = entry->message;
256     static DSN_BUF *dsb;
257     int     status;
258 
259     /*
260      * Release the delivery agent from a "hot" queue entry.
261      */
262 #define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
263 	event_disable_readwrite(vstream_fileno(entry->stream)); \
264 	(void) vstream_fclose(entry->stream); \
265 	entry->stream = 0; \
266 	qmgr_deliver_concurrency--; \
267     } while (0)
268 
269     if (dsb == 0)
270 	dsb = dsb_create();
271 
272     /*
273      * The message transport has responded. Stop the watchdog timer.
274      */
275     event_cancel_timer(qmgr_deliver_abort, context);
276 
277     /*
278      * Retrieve the delivery agent status report. The numerical status code
279      * indicates if delivery should be tried again. The reason text is sent
280      * only when a site should be avoided for a while, so that the queue
281      * manager can log why it does not even try to schedule delivery to the
282      * affected recipients.
283      */
284     status = qmgr_deliver_final_reply(entry->stream, dsb);
285 
286     /*
287      * The mail delivery process failed for some reason (although delivery
288      * may have been successful). Back off with this transport type for a
289      * while. Dispose of queue entries for this transport that await
290      * selection (the todo lists). Stay away from queue entries that have
291      * been selected (the busy lists), or we would have dangling pointers.
292      * The queue itself won't go away before we dispose of the current queue
293      * entry.
294      */
295     if (status == DELIVER_STAT_CRASH) {
296 	message->flags |= DELIVER_STAT_DEFER;
297 #if 0
298 	whatsup = concatenate("unknown ", transport->name,
299 			      " mail transport error", (char *) 0);
300 	qmgr_transport_throttle(transport,
301 				DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup));
302 	myfree(whatsup);
303 #else
304 	qmgr_transport_throttle(transport,
305 				DSN_SIMPLE(&dsb->dsn, "4.3.0",
306 					   "unknown mail transport error"));
307 #endif
308 	msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
309 		 transport->name);
310 
311 	/*
312 	 * Assume the worst and write a defer logfile record for each
313 	 * recipient. This omission was already present in the first queue
314 	 * manager implementation of 199703, and was fixed 200511.
315 	 *
316 	 * To avoid the synchronous qmgr_defer_recipient() operation for each
317 	 * recipient of this queue entry, release the delivery process and
318 	 * move the entry back to the todo queue. Let qmgr_defer_transport()
319 	 * log the recipient asynchronously if possible, and get out of here.
320 	 * Note: if asynchronous logging is not possible,
321 	 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and
322 	 * the entry becomes a dangling pointer.
323 	 */
324 	QMGR_DELIVER_RELEASE_AGENT(entry);
325 	qmgr_entry_unselect(entry);
326 	qmgr_defer_transport(transport, &dsb->dsn);
327 	return;
328     }
329 
330     /*
331      * This message must be tried again.
332      *
333      * If we have a problem talking to this site, back off with this site for a
334      * while; dispose of queue entries for this site that await selection
335      * (the todo list); stay away from queue entries that have been selected
336      * (the busy list), or we would have dangling pointers. The queue itself
337      * won't go away before we dispose of the current queue entry.
338      *
339      * XXX Caution: DSN_COPY() will panic on empty status or reason.
340      */
341 #define SUSPENDED	"delivery temporarily suspended: "
342 
343     if (status == DELIVER_STAT_DEFER) {
344 	message->flags |= DELIVER_STAT_DEFER;
345 	if (VSTRING_LEN(dsb->status)) {
346 	    /* Sanitize the DSN status/reason from the delivery agent. */
347 	    if (!dsn_valid(vstring_str(dsb->status)))
348 		vstring_strcpy(dsb->status, "4.0.0");
349 	    if (VSTRING_LEN(dsb->reason) == 0)
350 		vstring_strcpy(dsb->reason, "unknown error");
351 	    vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1);
352 	    if (QMGR_QUEUE_READY(queue)) {
353 		qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb));
354 		if (QMGR_QUEUE_THROTTLED(queue))
355 		    qmgr_defer_todo(queue, &dsb->dsn);
356 	    }
357 	}
358     }
359 
360     /*
361      * No problems detected. Mark the transport and queue as alive. The queue
362      * itself won't go away before we dispose of the current queue entry.
363      */
364     if (status != DELIVER_STAT_CRASH) {
365 	qmgr_transport_unthrottle(transport);
366 	if (VSTRING_LEN(dsb->reason) == 0)
367 	    qmgr_queue_unthrottle(queue);
368     }
369 
370     /*
371      * Release the delivery process, and give some other queue entry a chance
372      * to be delivered. When all recipients for a message have been tried,
373      * decide what to do next with this message: defer, bounce, delete.
374      */
375     QMGR_DELIVER_RELEASE_AGENT(entry);
376     qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
377 }
378 
379 /* qmgr_deliver - deliver one per-site queue entry */
380 
381 void    qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
382 {
383     QMGR_ENTRY *entry;
384     DSN     dsn;
385 
386     /*
387      * Find out if this delivery process is really available. Once elected,
388      * the delivery process is supposed to express its happiness. If there is
389      * a problem, wipe the pending deliveries for this transport. This
390      * routine runs in response to an external event, so it does not run
391      * while some other queue manipulation is happening.
392      */
393     if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) {
394 #if 0
395 	whatsup = concatenate(transport->name,
396 			      " mail transport unavailable", (char *) 0);
397 	qmgr_transport_throttle(transport,
398 				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
399 	myfree(whatsup);
400 #else
401 	qmgr_transport_throttle(transport,
402 				DSN_SIMPLE(&dsn, "4.3.0",
403 					   "mail transport unavailable"));
404 #endif
405 	qmgr_defer_transport(transport, &dsn);
406 	if (stream)
407 	    (void) vstream_fclose(stream);
408 	return;
409     }
410 
411     /*
412      * Find a suitable queue entry. Things may have changed since this
413      * transport was allocated. If no suitable entry is found,
414      * unceremoniously disconnect from the delivery process. The delivery
415      * agent request reading routine is prepared for the queue manager to
416      * change its mind for no apparent reason.
417      */
418     if ((entry = qmgr_job_entry_select(transport)) == 0) {
419 	(void) vstream_fclose(stream);
420 	return;
421     }
422 
423     /*
424      * Send the queue file info and recipient info to the delivery process.
425      * If there is a problem, wipe the pending deliveries for this transport.
426      * This routine runs in response to an external event, so it does not run
427      * while some other queue manipulation is happening.
428      */
429     if (qmgr_deliver_send_request(entry, stream) < 0) {
430 	qmgr_entry_unselect(entry);
431 #if 0
432 	whatsup = concatenate(transport->name,
433 			      " mail transport unavailable", (char *) 0);
434 	qmgr_transport_throttle(transport,
435 				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
436 	myfree(whatsup);
437 #else
438 	qmgr_transport_throttle(transport,
439 				DSN_SIMPLE(&dsn, "4.3.0",
440 					   "mail transport unavailable"));
441 #endif
442 	qmgr_defer_transport(transport, &dsn);
443 	/* warning: entry may be a dangling pointer here */
444 	(void) vstream_fclose(stream);
445 	return;
446     }
447 
448     /*
449      * If we get this far, go wait for the delivery status report.
450      */
451     qmgr_deliver_concurrency++;
452     entry->stream = stream;
453     event_enable_read(vstream_fileno(stream),
454 		      qmgr_deliver_update, (void *) entry);
455 
456     /*
457      * Guard against broken systems.
458      */
459     event_request_timer(qmgr_deliver_abort, (void *) entry, var_daemon_timeout);
460 }
461