xref: /netbsd-src/external/ibm-public/postfix/dist/src/qmgr/qmgr_deliver.c (revision a4ddc2c8fb9af816efe3b1c375a5530aef0e89e9)
1 /*	$NetBSD: qmgr_deliver.c,v 1.1.1.2 2011/03/02 19:32:29 tron Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_deliver 3
6 /* SUMMARY
7 /*	deliver one per-site queue entry to that site
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	int	qmgr_deliver_concurrency;
12 /*
13 /*	int	qmgr_deliver(transport, fp)
14 /*	QMGR_TRANSPORT *transport;
15 /*	VSTREAM	*fp;
16 /* DESCRIPTION
17 /*	This module implements the client side of the `queue manager
18 /*	to delivery agent' protocol. The queue manager uses
19 /*	asynchronous I/O so that it can drive multiple delivery
20 /*	agents in parallel. Depending on the outcome of a delivery
21 /*	attempt, the status of messages, queues and transports is
22 /*	updated.
23 /*
24 /*	qmgr_deliver_concurrency is a global counter that says how
25 /*	many delivery processes are in use. This can be used, for
26 /*	example, to control the size of the `active' message queue.
27 /*
28 /*	qmgr_deliver() executes when a delivery process announces its
29 /*	availability for the named transport. It arranges for delivery
30 /*	of a suitable queue entry.  The \fIfp\fR argument specifies a
31 /*	stream that is connected to a delivery process, or a null
32 /*	pointer if the transport accepts no connection. Upon completion
33 /*	of delivery (successful or not), the stream is closed, so that the
34 /*	delivery process is released.
35 /* DIAGNOSTICS
36 /* LICENSE
37 /* .ad
38 /* .fi
39 /*	The Secure Mailer license must be distributed with this software.
40 /* AUTHOR(S)
41 /*	Wietse Venema
42 /*	IBM T.J. Watson Research
43 /*	P.O. Box 704
44 /*	Yorktown Heights, NY 10598, USA
45 /*
46 /*	Preemptive scheduler enhancements:
47 /*	Patrik Rak
48 /*	Modra 6
49 /*	155 00, Prague, Czech Republic
50 /*--*/
51 
52 /* System library. */
53 
54 #include <sys_defs.h>
55 #include <time.h>
56 #include <string.h>
57 
58 /* Utility library. */
59 
60 #include <msg.h>
61 #include <vstring.h>
62 #include <vstream.h>
63 #include <vstring_vstream.h>
64 #include <events.h>
65 #include <iostuff.h>
66 #include <stringops.h>
67 #include <mymalloc.h>
68 
69 /* Global library. */
70 
71 #include <mail_queue.h>
72 #include <mail_proto.h>
73 #include <recipient_list.h>
74 #include <mail_params.h>
75 #include <deliver_request.h>
76 #include <verp_sender.h>
77 #include <dsn_util.h>
78 #include <dsn_buf.h>
79 #include <dsb_scan.h>
80 #include <rcpt_print.h>
81 
82 /* Application-specific. */
83 
84 #include "qmgr.h"
85 
86 int     qmgr_deliver_concurrency;
87 
88  /*
89   * Message delivery status codes.
90   */
91 #define DELIVER_STAT_OK		0	/* all recipients delivered */
92 #define DELIVER_STAT_DEFER	1	/* try some recipients later */
93 #define DELIVER_STAT_CRASH	2	/* mailer internal problem */
94 
95 /* qmgr_deliver_initial_reply - retrieve initial delivery process response */
96 
97 static int qmgr_deliver_initial_reply(VSTREAM *stream)
98 {
99     int     stat;
100 
101     if (peekfd(vstream_fileno(stream)) < 0) {
102 	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
103 	return (DELIVER_STAT_CRASH);
104     } else if (attr_scan(stream, ATTR_FLAG_STRICT,
105 			 ATTR_TYPE_INT, MAIL_ATTR_STATUS, &stat,
106 			 ATTR_TYPE_END) != 1) {
107 	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
108 	return (DELIVER_STAT_CRASH);
109     } else {
110 	return (stat ? DELIVER_STAT_DEFER : 0);
111     }
112 }
113 
114 /* qmgr_deliver_final_reply - retrieve final delivery process response */
115 
116 static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb)
117 {
118     int     stat;
119 
120     if (peekfd(vstream_fileno(stream)) < 0) {
121 	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
122 	return (DELIVER_STAT_CRASH);
123     } else if (attr_scan(stream, ATTR_FLAG_STRICT,
124 			 ATTR_TYPE_FUNC, dsb_scan, (void *) dsb,
125 			 ATTR_TYPE_INT, MAIL_ATTR_STATUS, &stat,
126 			 ATTR_TYPE_END) != 2) {
127 	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
128 	return (DELIVER_STAT_CRASH);
129     } else {
130 	return (stat ? DELIVER_STAT_DEFER : 0);
131     }
132 }
133 
134 /* qmgr_deliver_send_request - send delivery request to delivery process */
135 
136 static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
137 {
138     RECIPIENT_LIST list = entry->rcpt_list;
139     RECIPIENT *recipient;
140     QMGR_MESSAGE *message = entry->message;
141     VSTRING *sender_buf = 0;
142     MSG_STATS stats;
143     char   *sender;
144     int     flags;
145 
146     /*
147      * If variable envelope return path is requested, change prefix+@origin
148      * into prefix+user=domain@origin. Note that with VERP there is only one
149      * recipient per delivery.
150      */
151     if (message->verp_delims == 0) {
152 	sender = message->sender;
153     } else {
154 	sender_buf = vstring_alloc(100);
155 	verp_sender(sender_buf, message->verp_delims,
156 		    message->sender, list.info);
157 	sender = vstring_str(sender_buf);
158     }
159 
160     flags = message->tflags
161 	| entry->queue->dflags
162 	| (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
163     (void) QMGR_MSG_STATS(&stats, message);
164     attr_print(stream, ATTR_FLAG_NONE,
165 	       ATTR_TYPE_INT, MAIL_ATTR_FLAGS, flags,
166 	       ATTR_TYPE_STR, MAIL_ATTR_QUEUE, message->queue_name,
167 	       ATTR_TYPE_STR, MAIL_ATTR_QUEUEID, message->queue_id,
168 	       ATTR_TYPE_LONG, MAIL_ATTR_OFFSET, message->data_offset,
169 	       ATTR_TYPE_LONG, MAIL_ATTR_SIZE, message->cont_length,
170 	       ATTR_TYPE_STR, MAIL_ATTR_NEXTHOP, entry->queue->nexthop,
171 	       ATTR_TYPE_STR, MAIL_ATTR_ENCODING, message->encoding,
172 	       ATTR_TYPE_STR, MAIL_ATTR_SENDER, sender,
173 	       ATTR_TYPE_STR, MAIL_ATTR_DSN_ENVID, message->dsn_envid,
174 	       ATTR_TYPE_INT, MAIL_ATTR_DSN_RET, message->dsn_ret,
175 	       ATTR_TYPE_FUNC, msg_stats_print, (void *) &stats,
176     /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
177 	     ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_NAME, message->client_name,
178 	     ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr,
179 	     ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_PORT, message->client_port,
180 	     ATTR_TYPE_STR, MAIL_ATTR_LOG_PROTO_NAME, message->client_proto,
181 	       ATTR_TYPE_STR, MAIL_ATTR_LOG_HELO_NAME, message->client_helo,
182     /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
183 	       ATTR_TYPE_STR, MAIL_ATTR_SASL_METHOD, message->sasl_method,
184 	     ATTR_TYPE_STR, MAIL_ATTR_SASL_USERNAME, message->sasl_username,
185 	       ATTR_TYPE_STR, MAIL_ATTR_SASL_SENDER, message->sasl_sender,
186     /* XXX Ditto if we want to pass TLS certificate info. */
187 	       ATTR_TYPE_STR, MAIL_ATTR_LOG_IDENT, message->log_ident,
188 	     ATTR_TYPE_STR, MAIL_ATTR_RWR_CONTEXT, message->rewrite_context,
189 	       ATTR_TYPE_INT, MAIL_ATTR_RCPT_COUNT, list.len,
190 	       ATTR_TYPE_END);
191     if (sender_buf != 0)
192 	vstring_free(sender_buf);
193     for (recipient = list.info; recipient < list.info + list.len; recipient++)
194 	attr_print(stream, ATTR_FLAG_NONE,
195 		   ATTR_TYPE_FUNC, rcpt_print, (void *) recipient,
196 		   ATTR_TYPE_END);
197     if (vstream_fflush(stream) != 0) {
198 	msg_warn("write to process (%s): %m", entry->queue->transport->name);
199 	return (-1);
200     } else {
201 	if (msg_verbose)
202 	    msg_info("qmgr_deliver: site `%s'", entry->queue->name);
203 	return (0);
204     }
205 }
206 
207 /* qmgr_deliver_abort - transport response watchdog */
208 
209 static void qmgr_deliver_abort(int unused_event, char *context)
210 {
211     QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
212     QMGR_QUEUE *queue = entry->queue;
213     QMGR_TRANSPORT *transport = queue->transport;
214     QMGR_MESSAGE *message = entry->message;
215 
216     msg_fatal("%s: timeout receiving delivery status from transport: %s",
217 	      message->queue_id, transport->name);
218 }
219 
220 /* qmgr_deliver_update - process delivery status report */
221 
222 static void qmgr_deliver_update(int unused_event, char *context)
223 {
224     QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
225     QMGR_QUEUE *queue = entry->queue;
226     QMGR_TRANSPORT *transport = queue->transport;
227     QMGR_MESSAGE *message = entry->message;
228     static DSN_BUF *dsb;
229     int     status;
230 
231     /*
232      * Release the delivery agent from a "hot" queue entry.
233      */
234 #define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
235 	event_disable_readwrite(vstream_fileno(entry->stream)); \
236 	(void) vstream_fclose(entry->stream); \
237 	entry->stream = 0; \
238 	qmgr_deliver_concurrency--; \
239     } while (0)
240 
241     if (dsb == 0)
242 	dsb = dsb_create();
243 
244     /*
245      * The message transport has responded. Stop the watchdog timer.
246      */
247     event_cancel_timer(qmgr_deliver_abort, context);
248 
249     /*
250      * Retrieve the delivery agent status report. The numerical status code
251      * indicates if delivery should be tried again. The reason text is sent
252      * only when a site should be avoided for a while, so that the queue
253      * manager can log why it does not even try to schedule delivery to the
254      * affected recipients.
255      */
256     status = qmgr_deliver_final_reply(entry->stream, dsb);
257 
258     /*
259      * The mail delivery process failed for some reason (although delivery
260      * may have been successful). Back off with this transport type for a
261      * while. Dispose of queue entries for this transport that await
262      * selection (the todo lists). Stay away from queue entries that have
263      * been selected (the busy lists), or we would have dangling pointers.
264      * The queue itself won't go away before we dispose of the current queue
265      * entry.
266      */
267     if (status == DELIVER_STAT_CRASH) {
268 	message->flags |= DELIVER_STAT_DEFER;
269 #if 0
270 	whatsup = concatenate("unknown ", transport->name,
271 			      " mail transport error", (char *) 0);
272 	qmgr_transport_throttle(transport,
273 				DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup));
274 	myfree(whatsup);
275 #else
276 	qmgr_transport_throttle(transport,
277 				DSN_SIMPLE(&dsb->dsn, "4.3.0",
278 					   "unknown mail transport error"));
279 #endif
280 	msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
281 		 transport->name);
282 
283 	/*
284 	 * Assume the worst and write a defer logfile record for each
285 	 * recipient. This omission was already present in the first queue
286 	 * manager implementation of 199703, and was fixed 200511.
287 	 *
288 	 * To avoid the synchronous qmgr_defer_recipient() operation for each
289 	 * recipient of this queue entry, release the delivery process and
290 	 * move the entry back to the todo queue. Let qmgr_defer_transport()
291 	 * log the recipient asynchronously if possible, and get out of here.
292 	 * Note: if asynchronous logging is not possible,
293 	 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and
294 	 * the entry becomes a dangling pointer.
295 	 */
296 	QMGR_DELIVER_RELEASE_AGENT(entry);
297 	qmgr_entry_unselect(entry);
298 	qmgr_defer_transport(transport, &dsb->dsn);
299 	return;
300     }
301 
302     /*
303      * This message must be tried again.
304      *
305      * If we have a problem talking to this site, back off with this site for a
306      * while; dispose of queue entries for this site that await selection
307      * (the todo list); stay away from queue entries that have been selected
308      * (the busy list), or we would have dangling pointers. The queue itself
309      * won't go away before we dispose of the current queue entry.
310      *
311      * XXX Caution: DSN_COPY() will panic on empty status or reason.
312      */
313 #define SUSPENDED	"delivery temporarily suspended: "
314 
315     if (status == DELIVER_STAT_DEFER) {
316 	message->flags |= DELIVER_STAT_DEFER;
317 	if (VSTRING_LEN(dsb->status)) {
318 	    /* Sanitize the DSN status/reason from the delivery agent. */
319 	    if (!dsn_valid(vstring_str(dsb->status)))
320 		vstring_strcpy(dsb->status, "4.0.0");
321 	    if (VSTRING_LEN(dsb->reason) == 0)
322 		vstring_strcpy(dsb->reason, "unknown error");
323 	    vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1);
324 	    if (QMGR_QUEUE_READY(queue)) {
325 		qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb));
326 		if (QMGR_QUEUE_THROTTLED(queue))
327 		    qmgr_defer_todo(queue, &dsb->dsn);
328 	    }
329 	}
330     }
331 
332     /*
333      * No problems detected. Mark the transport and queue as alive. The queue
334      * itself won't go away before we dispose of the current queue entry.
335      */
336     if (status != DELIVER_STAT_CRASH && VSTRING_LEN(dsb->reason) == 0) {
337 	qmgr_transport_unthrottle(transport);
338 	qmgr_queue_unthrottle(queue);
339     }
340 
341     /*
342      * Release the delivery process, and give some other queue entry a chance
343      * to be delivered. When all recipients for a message have been tried,
344      * decide what to do next with this message: defer, bounce, delete.
345      */
346     QMGR_DELIVER_RELEASE_AGENT(entry);
347     qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
348 }
349 
350 /* qmgr_deliver - deliver one per-site queue entry */
351 
352 void    qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
353 {
354     QMGR_ENTRY *entry;
355     DSN     dsn;
356 
357     /*
358      * Find out if this delivery process is really available. Once elected,
359      * the delivery process is supposed to express its happiness. If there is
360      * a problem, wipe the pending deliveries for this transport. This
361      * routine runs in response to an external event, so it does not run
362      * while some other queue manipulation is happening.
363      */
364     if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) {
365 #if 0
366 	whatsup = concatenate(transport->name,
367 			      " mail transport unavailable", (char *) 0);
368 	qmgr_transport_throttle(transport,
369 				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
370 	myfree(whatsup);
371 #else
372 	qmgr_transport_throttle(transport,
373 				DSN_SIMPLE(&dsn, "4.3.0",
374 					   "mail transport unavailable"));
375 #endif
376 	qmgr_defer_transport(transport, &dsn);
377 	if (stream)
378 	    (void) vstream_fclose(stream);
379 	return;
380     }
381 
382     /*
383      * Find a suitable queue entry. Things may have changed since this
384      * transport was allocated. If no suitable entry is found,
385      * unceremoniously disconnect from the delivery process. The delivery
386      * agent request reading routine is prepared for the queue manager to
387      * change its mind for no apparent reason.
388      */
389     if ((entry = qmgr_job_entry_select(transport)) == 0) {
390 	(void) vstream_fclose(stream);
391 	return;
392     }
393 
394     /*
395      * Send the queue file info and recipient info to the delivery process.
396      * If there is a problem, wipe the pending deliveries for this transport.
397      * This routine runs in response to an external event, so it does not run
398      * while some other queue manipulation is happening.
399      */
400     if (qmgr_deliver_send_request(entry, stream) < 0) {
401 	qmgr_entry_unselect(entry);
402 #if 0
403 	whatsup = concatenate(transport->name,
404 			      " mail transport unavailable", (char *) 0);
405 	qmgr_transport_throttle(transport,
406 				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
407 	myfree(whatsup);
408 #else
409 	qmgr_transport_throttle(transport,
410 				DSN_SIMPLE(&dsn, "4.3.0",
411 					   "mail transport unavailable"));
412 #endif
413 	qmgr_defer_transport(transport, &dsn);
414 	/* warning: entry may be a dangling pointer here */
415 	(void) vstream_fclose(stream);
416 	return;
417     }
418 
419     /*
420      * If we get this far, go wait for the delivery status report.
421      */
422     qmgr_deliver_concurrency++;
423     entry->stream = stream;
424     event_enable_read(vstream_fileno(stream),
425 		      qmgr_deliver_update, (char *) entry);
426 
427     /*
428      * Guard against broken systems.
429      */
430     event_request_timer(qmgr_deliver_abort, (char *) entry, var_daemon_timeout);
431 }
432