1 /* $NetBSD: qmgr_deliver.c,v 1.1.1.2 2011/03/02 19:32:29 tron Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmgr_deliver 3 6 /* SUMMARY 7 /* deliver one per-site queue entry to that site 8 /* SYNOPSIS 9 /* #include "qmgr.h" 10 /* 11 /* int qmgr_deliver_concurrency; 12 /* 13 /* int qmgr_deliver(transport, fp) 14 /* QMGR_TRANSPORT *transport; 15 /* VSTREAM *fp; 16 /* DESCRIPTION 17 /* This module implements the client side of the `queue manager 18 /* to delivery agent' protocol. The queue manager uses 19 /* asynchronous I/O so that it can drive multiple delivery 20 /* agents in parallel. Depending on the outcome of a delivery 21 /* attempt, the status of messages, queues and transports is 22 /* updated. 23 /* 24 /* qmgr_deliver_concurrency is a global counter that says how 25 /* many delivery processes are in use. This can be used, for 26 /* example, to control the size of the `active' message queue. 27 /* 28 /* qmgr_deliver() executes when a delivery process announces its 29 /* availability for the named transport. It arranges for delivery 30 /* of a suitable queue entry. The \fIfp\fR argument specifies a 31 /* stream that is connected to a delivery process, or a null 32 /* pointer if the transport accepts no connection. Upon completion 33 /* of delivery (successful or not), the stream is closed, so that the 34 /* delivery process is released. 35 /* DIAGNOSTICS 36 /* LICENSE 37 /* .ad 38 /* .fi 39 /* The Secure Mailer license must be distributed with this software. 40 /* AUTHOR(S) 41 /* Wietse Venema 42 /* IBM T.J. Watson Research 43 /* P.O. Box 704 44 /* Yorktown Heights, NY 10598, USA 45 /* 46 /* Preemptive scheduler enhancements: 47 /* Patrik Rak 48 /* Modra 6 49 /* 155 00, Prague, Czech Republic 50 /*--*/ 51 52 /* System library. */ 53 54 #include <sys_defs.h> 55 #include <time.h> 56 #include <string.h> 57 58 /* Utility library. */ 59 60 #include <msg.h> 61 #include <vstring.h> 62 #include <vstream.h> 63 #include <vstring_vstream.h> 64 #include <events.h> 65 #include <iostuff.h> 66 #include <stringops.h> 67 #include <mymalloc.h> 68 69 /* Global library. */ 70 71 #include <mail_queue.h> 72 #include <mail_proto.h> 73 #include <recipient_list.h> 74 #include <mail_params.h> 75 #include <deliver_request.h> 76 #include <verp_sender.h> 77 #include <dsn_util.h> 78 #include <dsn_buf.h> 79 #include <dsb_scan.h> 80 #include <rcpt_print.h> 81 82 /* Application-specific. */ 83 84 #include "qmgr.h" 85 86 int qmgr_deliver_concurrency; 87 88 /* 89 * Message delivery status codes. 90 */ 91 #define DELIVER_STAT_OK 0 /* all recipients delivered */ 92 #define DELIVER_STAT_DEFER 1 /* try some recipients later */ 93 #define DELIVER_STAT_CRASH 2 /* mailer internal problem */ 94 95 /* qmgr_deliver_initial_reply - retrieve initial delivery process response */ 96 97 static int qmgr_deliver_initial_reply(VSTREAM *stream) 98 { 99 int stat; 100 101 if (peekfd(vstream_fileno(stream)) < 0) { 102 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream)); 103 return (DELIVER_STAT_CRASH); 104 } else if (attr_scan(stream, ATTR_FLAG_STRICT, 105 ATTR_TYPE_INT, MAIL_ATTR_STATUS, &stat, 106 ATTR_TYPE_END) != 1) { 107 msg_warn("%s: malformed response", VSTREAM_PATH(stream)); 108 return (DELIVER_STAT_CRASH); 109 } else { 110 return (stat ? DELIVER_STAT_DEFER : 0); 111 } 112 } 113 114 /* qmgr_deliver_final_reply - retrieve final delivery process response */ 115 116 static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb) 117 { 118 int stat; 119 120 if (peekfd(vstream_fileno(stream)) < 0) { 121 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream)); 122 return (DELIVER_STAT_CRASH); 123 } else if (attr_scan(stream, ATTR_FLAG_STRICT, 124 ATTR_TYPE_FUNC, dsb_scan, (void *) dsb, 125 ATTR_TYPE_INT, MAIL_ATTR_STATUS, &stat, 126 ATTR_TYPE_END) != 2) { 127 msg_warn("%s: malformed response", VSTREAM_PATH(stream)); 128 return (DELIVER_STAT_CRASH); 129 } else { 130 return (stat ? DELIVER_STAT_DEFER : 0); 131 } 132 } 133 134 /* qmgr_deliver_send_request - send delivery request to delivery process */ 135 136 static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream) 137 { 138 RECIPIENT_LIST list = entry->rcpt_list; 139 RECIPIENT *recipient; 140 QMGR_MESSAGE *message = entry->message; 141 VSTRING *sender_buf = 0; 142 MSG_STATS stats; 143 char *sender; 144 int flags; 145 146 /* 147 * If variable envelope return path is requested, change prefix+@origin 148 * into prefix+user=domain@origin. Note that with VERP there is only one 149 * recipient per delivery. 150 */ 151 if (message->verp_delims == 0) { 152 sender = message->sender; 153 } else { 154 sender_buf = vstring_alloc(100); 155 verp_sender(sender_buf, message->verp_delims, 156 message->sender, list.info); 157 sender = vstring_str(sender_buf); 158 } 159 160 flags = message->tflags 161 | entry->queue->dflags 162 | (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT); 163 (void) QMGR_MSG_STATS(&stats, message); 164 attr_print(stream, ATTR_FLAG_NONE, 165 ATTR_TYPE_INT, MAIL_ATTR_FLAGS, flags, 166 ATTR_TYPE_STR, MAIL_ATTR_QUEUE, message->queue_name, 167 ATTR_TYPE_STR, MAIL_ATTR_QUEUEID, message->queue_id, 168 ATTR_TYPE_LONG, MAIL_ATTR_OFFSET, message->data_offset, 169 ATTR_TYPE_LONG, MAIL_ATTR_SIZE, message->cont_length, 170 ATTR_TYPE_STR, MAIL_ATTR_NEXTHOP, entry->queue->nexthop, 171 ATTR_TYPE_STR, MAIL_ATTR_ENCODING, message->encoding, 172 ATTR_TYPE_STR, MAIL_ATTR_SENDER, sender, 173 ATTR_TYPE_STR, MAIL_ATTR_DSN_ENVID, message->dsn_envid, 174 ATTR_TYPE_INT, MAIL_ATTR_DSN_RET, message->dsn_ret, 175 ATTR_TYPE_FUNC, msg_stats_print, (void *) &stats, 176 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */ 177 ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_NAME, message->client_name, 178 ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr, 179 ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_PORT, message->client_port, 180 ATTR_TYPE_STR, MAIL_ATTR_LOG_PROTO_NAME, message->client_proto, 181 ATTR_TYPE_STR, MAIL_ATTR_LOG_HELO_NAME, message->client_helo, 182 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */ 183 ATTR_TYPE_STR, MAIL_ATTR_SASL_METHOD, message->sasl_method, 184 ATTR_TYPE_STR, MAIL_ATTR_SASL_USERNAME, message->sasl_username, 185 ATTR_TYPE_STR, MAIL_ATTR_SASL_SENDER, message->sasl_sender, 186 /* XXX Ditto if we want to pass TLS certificate info. */ 187 ATTR_TYPE_STR, MAIL_ATTR_LOG_IDENT, message->log_ident, 188 ATTR_TYPE_STR, MAIL_ATTR_RWR_CONTEXT, message->rewrite_context, 189 ATTR_TYPE_INT, MAIL_ATTR_RCPT_COUNT, list.len, 190 ATTR_TYPE_END); 191 if (sender_buf != 0) 192 vstring_free(sender_buf); 193 for (recipient = list.info; recipient < list.info + list.len; recipient++) 194 attr_print(stream, ATTR_FLAG_NONE, 195 ATTR_TYPE_FUNC, rcpt_print, (void *) recipient, 196 ATTR_TYPE_END); 197 if (vstream_fflush(stream) != 0) { 198 msg_warn("write to process (%s): %m", entry->queue->transport->name); 199 return (-1); 200 } else { 201 if (msg_verbose) 202 msg_info("qmgr_deliver: site `%s'", entry->queue->name); 203 return (0); 204 } 205 } 206 207 /* qmgr_deliver_abort - transport response watchdog */ 208 209 static void qmgr_deliver_abort(int unused_event, char *context) 210 { 211 QMGR_ENTRY *entry = (QMGR_ENTRY *) context; 212 QMGR_QUEUE *queue = entry->queue; 213 QMGR_TRANSPORT *transport = queue->transport; 214 QMGR_MESSAGE *message = entry->message; 215 216 msg_fatal("%s: timeout receiving delivery status from transport: %s", 217 message->queue_id, transport->name); 218 } 219 220 /* qmgr_deliver_update - process delivery status report */ 221 222 static void qmgr_deliver_update(int unused_event, char *context) 223 { 224 QMGR_ENTRY *entry = (QMGR_ENTRY *) context; 225 QMGR_QUEUE *queue = entry->queue; 226 QMGR_TRANSPORT *transport = queue->transport; 227 QMGR_MESSAGE *message = entry->message; 228 static DSN_BUF *dsb; 229 int status; 230 231 /* 232 * Release the delivery agent from a "hot" queue entry. 233 */ 234 #define QMGR_DELIVER_RELEASE_AGENT(entry) do { \ 235 event_disable_readwrite(vstream_fileno(entry->stream)); \ 236 (void) vstream_fclose(entry->stream); \ 237 entry->stream = 0; \ 238 qmgr_deliver_concurrency--; \ 239 } while (0) 240 241 if (dsb == 0) 242 dsb = dsb_create(); 243 244 /* 245 * The message transport has responded. Stop the watchdog timer. 246 */ 247 event_cancel_timer(qmgr_deliver_abort, context); 248 249 /* 250 * Retrieve the delivery agent status report. The numerical status code 251 * indicates if delivery should be tried again. The reason text is sent 252 * only when a site should be avoided for a while, so that the queue 253 * manager can log why it does not even try to schedule delivery to the 254 * affected recipients. 255 */ 256 status = qmgr_deliver_final_reply(entry->stream, dsb); 257 258 /* 259 * The mail delivery process failed for some reason (although delivery 260 * may have been successful). Back off with this transport type for a 261 * while. Dispose of queue entries for this transport that await 262 * selection (the todo lists). Stay away from queue entries that have 263 * been selected (the busy lists), or we would have dangling pointers. 264 * The queue itself won't go away before we dispose of the current queue 265 * entry. 266 */ 267 if (status == DELIVER_STAT_CRASH) { 268 message->flags |= DELIVER_STAT_DEFER; 269 #if 0 270 whatsup = concatenate("unknown ", transport->name, 271 " mail transport error", (char *) 0); 272 qmgr_transport_throttle(transport, 273 DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup)); 274 myfree(whatsup); 275 #else 276 qmgr_transport_throttle(transport, 277 DSN_SIMPLE(&dsb->dsn, "4.3.0", 278 "unknown mail transport error")); 279 #endif 280 msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description", 281 transport->name); 282 283 /* 284 * Assume the worst and write a defer logfile record for each 285 * recipient. This omission was already present in the first queue 286 * manager implementation of 199703, and was fixed 200511. 287 * 288 * To avoid the synchronous qmgr_defer_recipient() operation for each 289 * recipient of this queue entry, release the delivery process and 290 * move the entry back to the todo queue. Let qmgr_defer_transport() 291 * log the recipient asynchronously if possible, and get out of here. 292 * Note: if asynchronous logging is not possible, 293 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and 294 * the entry becomes a dangling pointer. 295 */ 296 QMGR_DELIVER_RELEASE_AGENT(entry); 297 qmgr_entry_unselect(entry); 298 qmgr_defer_transport(transport, &dsb->dsn); 299 return; 300 } 301 302 /* 303 * This message must be tried again. 304 * 305 * If we have a problem talking to this site, back off with this site for a 306 * while; dispose of queue entries for this site that await selection 307 * (the todo list); stay away from queue entries that have been selected 308 * (the busy list), or we would have dangling pointers. The queue itself 309 * won't go away before we dispose of the current queue entry. 310 * 311 * XXX Caution: DSN_COPY() will panic on empty status or reason. 312 */ 313 #define SUSPENDED "delivery temporarily suspended: " 314 315 if (status == DELIVER_STAT_DEFER) { 316 message->flags |= DELIVER_STAT_DEFER; 317 if (VSTRING_LEN(dsb->status)) { 318 /* Sanitize the DSN status/reason from the delivery agent. */ 319 if (!dsn_valid(vstring_str(dsb->status))) 320 vstring_strcpy(dsb->status, "4.0.0"); 321 if (VSTRING_LEN(dsb->reason) == 0) 322 vstring_strcpy(dsb->reason, "unknown error"); 323 vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1); 324 if (QMGR_QUEUE_READY(queue)) { 325 qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb)); 326 if (QMGR_QUEUE_THROTTLED(queue)) 327 qmgr_defer_todo(queue, &dsb->dsn); 328 } 329 } 330 } 331 332 /* 333 * No problems detected. Mark the transport and queue as alive. The queue 334 * itself won't go away before we dispose of the current queue entry. 335 */ 336 if (status != DELIVER_STAT_CRASH && VSTRING_LEN(dsb->reason) == 0) { 337 qmgr_transport_unthrottle(transport); 338 qmgr_queue_unthrottle(queue); 339 } 340 341 /* 342 * Release the delivery process, and give some other queue entry a chance 343 * to be delivered. When all recipients for a message have been tried, 344 * decide what to do next with this message: defer, bounce, delete. 345 */ 346 QMGR_DELIVER_RELEASE_AGENT(entry); 347 qmgr_entry_done(entry, QMGR_QUEUE_BUSY); 348 } 349 350 /* qmgr_deliver - deliver one per-site queue entry */ 351 352 void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream) 353 { 354 QMGR_ENTRY *entry; 355 DSN dsn; 356 357 /* 358 * Find out if this delivery process is really available. Once elected, 359 * the delivery process is supposed to express its happiness. If there is 360 * a problem, wipe the pending deliveries for this transport. This 361 * routine runs in response to an external event, so it does not run 362 * while some other queue manipulation is happening. 363 */ 364 if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) { 365 #if 0 366 whatsup = concatenate(transport->name, 367 " mail transport unavailable", (char *) 0); 368 qmgr_transport_throttle(transport, 369 DSN_SIMPLE(&dsn, "4.3.0", whatsup)); 370 myfree(whatsup); 371 #else 372 qmgr_transport_throttle(transport, 373 DSN_SIMPLE(&dsn, "4.3.0", 374 "mail transport unavailable")); 375 #endif 376 qmgr_defer_transport(transport, &dsn); 377 if (stream) 378 (void) vstream_fclose(stream); 379 return; 380 } 381 382 /* 383 * Find a suitable queue entry. Things may have changed since this 384 * transport was allocated. If no suitable entry is found, 385 * unceremoniously disconnect from the delivery process. The delivery 386 * agent request reading routine is prepared for the queue manager to 387 * change its mind for no apparent reason. 388 */ 389 if ((entry = qmgr_job_entry_select(transport)) == 0) { 390 (void) vstream_fclose(stream); 391 return; 392 } 393 394 /* 395 * Send the queue file info and recipient info to the delivery process. 396 * If there is a problem, wipe the pending deliveries for this transport. 397 * This routine runs in response to an external event, so it does not run 398 * while some other queue manipulation is happening. 399 */ 400 if (qmgr_deliver_send_request(entry, stream) < 0) { 401 qmgr_entry_unselect(entry); 402 #if 0 403 whatsup = concatenate(transport->name, 404 " mail transport unavailable", (char *) 0); 405 qmgr_transport_throttle(transport, 406 DSN_SIMPLE(&dsn, "4.3.0", whatsup)); 407 myfree(whatsup); 408 #else 409 qmgr_transport_throttle(transport, 410 DSN_SIMPLE(&dsn, "4.3.0", 411 "mail transport unavailable")); 412 #endif 413 qmgr_defer_transport(transport, &dsn); 414 /* warning: entry may be a dangling pointer here */ 415 (void) vstream_fclose(stream); 416 return; 417 } 418 419 /* 420 * If we get this far, go wait for the delivery status report. 421 */ 422 qmgr_deliver_concurrency++; 423 entry->stream = stream; 424 event_enable_read(vstream_fileno(stream), 425 qmgr_deliver_update, (char *) entry); 426 427 /* 428 * Guard against broken systems. 429 */ 430 event_request_timer(qmgr_deliver_abort, (char *) entry, var_daemon_timeout); 431 } 432