1 /* $NetBSD: qmgr_deliver.c,v 1.2 2017/02/14 01:16:47 christos Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmgr_deliver 3 6 /* SUMMARY 7 /* deliver one per-site queue entry to that site 8 /* SYNOPSIS 9 /* #include "qmgr.h" 10 /* 11 /* int qmgr_deliver_concurrency; 12 /* 13 /* int qmgr_deliver(transport, fp) 14 /* QMGR_TRANSPORT *transport; 15 /* VSTREAM *fp; 16 /* DESCRIPTION 17 /* This module implements the client side of the `queue manager 18 /* to delivery agent' protocol. The queue manager uses 19 /* asynchronous I/O so that it can drive multiple delivery 20 /* agents in parallel. Depending on the outcome of a delivery 21 /* attempt, the status of messages, queues and transports is 22 /* updated. 23 /* 24 /* qmgr_deliver_concurrency is a global counter that says how 25 /* many delivery processes are in use. This can be used, for 26 /* example, to control the size of the `active' message queue. 27 /* 28 /* qmgr_deliver() executes when a delivery process announces its 29 /* availability for the named transport. It arranges for delivery 30 /* of a suitable queue entry. The \fIfp\fR argument specifies a 31 /* stream that is connected to a delivery process, or a null 32 /* pointer if the transport accepts no connection. Upon completion 33 /* of delivery (successful or not), the stream is closed, so that the 34 /* delivery process is released. 35 /* DIAGNOSTICS 36 /* LICENSE 37 /* .ad 38 /* .fi 39 /* The Secure Mailer license must be distributed with this software. 40 /* AUTHOR(S) 41 /* Wietse Venema 42 /* IBM T.J. Watson Research 43 /* P.O. Box 704 44 /* Yorktown Heights, NY 10598, USA 45 /* 46 /* Preemptive scheduler enhancements: 47 /* Patrik Rak 48 /* Modra 6 49 /* 155 00, Prague, Czech Republic 50 /* 51 /* Wietse Venema 52 /* Google, Inc. 53 /* 111 8th Avenue 54 /* New York, NY 10011, USA 55 /*--*/ 56 57 /* System library. */ 58 59 #include <sys_defs.h> 60 #include <time.h> 61 #include <string.h> 62 63 /* Utility library. */ 64 65 #include <msg.h> 66 #include <vstring.h> 67 #include <vstream.h> 68 #include <vstring_vstream.h> 69 #include <events.h> 70 #include <iostuff.h> 71 #include <stringops.h> 72 #include <mymalloc.h> 73 74 /* Global library. */ 75 76 #include <mail_queue.h> 77 #include <mail_proto.h> 78 #include <recipient_list.h> 79 #include <mail_params.h> 80 #include <deliver_request.h> 81 #include <verp_sender.h> 82 #include <dsn_util.h> 83 #include <dsn_buf.h> 84 #include <dsb_scan.h> 85 #include <rcpt_print.h> 86 #include <smtputf8.h> 87 88 /* Application-specific. */ 89 90 #include "qmgr.h" 91 92 /* 93 * Important note on the _transport_rate_delay implementation: after 94 * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all 95 * code paths must directly or indirectly invoke qmgr_transport_unthrottle() 96 * or qmgr_transport_throttle(). Otherwise, transports with non-zero 97 * _transport_rate_delay will become stuck. 98 */ 99 100 int qmgr_deliver_concurrency; 101 102 /* 103 * Message delivery status codes. 104 */ 105 #define DELIVER_STAT_OK 0 /* all recipients delivered */ 106 #define DELIVER_STAT_DEFER 1 /* try some recipients later */ 107 #define DELIVER_STAT_CRASH 2 /* mailer internal problem */ 108 109 /* qmgr_deliver_initial_reply - retrieve initial delivery process response */ 110 111 static int qmgr_deliver_initial_reply(VSTREAM *stream) 112 { 113 int stat; 114 115 if (peekfd(vstream_fileno(stream)) < 0) { 116 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream)); 117 return (DELIVER_STAT_CRASH); 118 } else if (attr_scan(stream, ATTR_FLAG_STRICT, 119 RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat), 120 ATTR_TYPE_END) != 1) { 121 msg_warn("%s: malformed response", VSTREAM_PATH(stream)); 122 return (DELIVER_STAT_CRASH); 123 } else { 124 return (stat ? DELIVER_STAT_DEFER : 0); 125 } 126 } 127 128 /* qmgr_deliver_final_reply - retrieve final delivery process response */ 129 130 static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb) 131 { 132 int stat; 133 134 if (peekfd(vstream_fileno(stream)) < 0) { 135 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream)); 136 return (DELIVER_STAT_CRASH); 137 } else if (attr_scan(stream, ATTR_FLAG_STRICT, 138 RECV_ATTR_FUNC(dsb_scan, (void *) dsb), 139 RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat), 140 ATTR_TYPE_END) != 2) { 141 msg_warn("%s: malformed response", VSTREAM_PATH(stream)); 142 return (DELIVER_STAT_CRASH); 143 } else { 144 return (stat ? DELIVER_STAT_DEFER : 0); 145 } 146 } 147 148 /* qmgr_deliver_send_request - send delivery request to delivery process */ 149 150 static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream) 151 { 152 RECIPIENT_LIST list = entry->rcpt_list; 153 RECIPIENT *recipient; 154 QMGR_MESSAGE *message = entry->message; 155 VSTRING *sender_buf = 0; 156 MSG_STATS stats; 157 char *sender; 158 int flags; 159 int smtputf8 = message->smtputf8; 160 const char *addr; 161 162 /* 163 * Todo: integrate with code up-stream that builds the delivery request. 164 */ 165 for (recipient = list.info; recipient < list.info + list.len; recipient++) 166 if (var_smtputf8_enable && (addr = recipient->address)[0] 167 && !allascii(addr) && valid_utf8_string(addr, strlen(addr))) { 168 smtputf8 |= SMTPUTF8_FLAG_RECIPIENT; 169 if (message->verp_delims) 170 smtputf8 |= SMTPUTF8_FLAG_SENDER; 171 } 172 173 /* 174 * If variable envelope return path is requested, change prefix+@origin 175 * into prefix+user=domain@origin. Note that with VERP there is only one 176 * recipient per delivery. 177 */ 178 if (message->verp_delims == 0) { 179 sender = message->sender; 180 } else { 181 sender_buf = vstring_alloc(100); 182 verp_sender(sender_buf, message->verp_delims, 183 message->sender, list.info); 184 sender = vstring_str(sender_buf); 185 } 186 187 flags = message->tflags 188 | entry->queue->dflags 189 | (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT); 190 (void) QMGR_MSG_STATS(&stats, message); 191 attr_print(stream, ATTR_FLAG_NONE, 192 SEND_ATTR_INT(MAIL_ATTR_FLAGS, flags), 193 SEND_ATTR_STR(MAIL_ATTR_QUEUE, message->queue_name), 194 SEND_ATTR_STR(MAIL_ATTR_QUEUEID, message->queue_id), 195 SEND_ATTR_LONG(MAIL_ATTR_OFFSET, message->data_offset), 196 SEND_ATTR_LONG(MAIL_ATTR_SIZE, message->cont_length), 197 SEND_ATTR_STR(MAIL_ATTR_NEXTHOP, entry->queue->nexthop), 198 SEND_ATTR_STR(MAIL_ATTR_ENCODING, message->encoding), 199 SEND_ATTR_INT(MAIL_ATTR_SMTPUTF8, smtputf8), 200 SEND_ATTR_STR(MAIL_ATTR_SENDER, sender), 201 SEND_ATTR_STR(MAIL_ATTR_DSN_ENVID, message->dsn_envid), 202 SEND_ATTR_INT(MAIL_ATTR_DSN_RET, message->dsn_ret), 203 SEND_ATTR_FUNC(msg_stats_print, (void *) &stats), 204 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */ 205 SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_NAME, message->client_name), 206 SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr), 207 SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_PORT, message->client_port), 208 SEND_ATTR_STR(MAIL_ATTR_LOG_PROTO_NAME, message->client_proto), 209 SEND_ATTR_STR(MAIL_ATTR_LOG_HELO_NAME, message->client_helo), 210 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */ 211 SEND_ATTR_STR(MAIL_ATTR_SASL_METHOD, message->sasl_method), 212 SEND_ATTR_STR(MAIL_ATTR_SASL_USERNAME, message->sasl_username), 213 SEND_ATTR_STR(MAIL_ATTR_SASL_SENDER, message->sasl_sender), 214 /* XXX Ditto if we want to pass TLS certificate info. */ 215 SEND_ATTR_STR(MAIL_ATTR_LOG_IDENT, message->log_ident), 216 SEND_ATTR_STR(MAIL_ATTR_RWR_CONTEXT, message->rewrite_context), 217 SEND_ATTR_INT(MAIL_ATTR_RCPT_COUNT, list.len), 218 ATTR_TYPE_END); 219 if (sender_buf != 0) 220 vstring_free(sender_buf); 221 for (recipient = list.info; recipient < list.info + list.len; recipient++) 222 attr_print(stream, ATTR_FLAG_NONE, 223 SEND_ATTR_FUNC(rcpt_print, (void *) recipient), 224 ATTR_TYPE_END); 225 if (vstream_fflush(stream) != 0) { 226 msg_warn("write to process (%s): %m", entry->queue->transport->name); 227 return (-1); 228 } else { 229 if (msg_verbose) 230 msg_info("qmgr_deliver: site `%s'", entry->queue->name); 231 return (0); 232 } 233 } 234 235 /* qmgr_deliver_abort - transport response watchdog */ 236 237 static void qmgr_deliver_abort(int unused_event, void *context) 238 { 239 QMGR_ENTRY *entry = (QMGR_ENTRY *) context; 240 QMGR_QUEUE *queue = entry->queue; 241 QMGR_TRANSPORT *transport = queue->transport; 242 QMGR_MESSAGE *message = entry->message; 243 244 msg_fatal("%s: timeout receiving delivery status from transport: %s", 245 message->queue_id, transport->name); 246 } 247 248 /* qmgr_deliver_update - process delivery status report */ 249 250 static void qmgr_deliver_update(int unused_event, void *context) 251 { 252 QMGR_ENTRY *entry = (QMGR_ENTRY *) context; 253 QMGR_QUEUE *queue = entry->queue; 254 QMGR_TRANSPORT *transport = queue->transport; 255 QMGR_MESSAGE *message = entry->message; 256 static DSN_BUF *dsb; 257 int status; 258 259 /* 260 * Release the delivery agent from a "hot" queue entry. 261 */ 262 #define QMGR_DELIVER_RELEASE_AGENT(entry) do { \ 263 event_disable_readwrite(vstream_fileno(entry->stream)); \ 264 (void) vstream_fclose(entry->stream); \ 265 entry->stream = 0; \ 266 qmgr_deliver_concurrency--; \ 267 } while (0) 268 269 if (dsb == 0) 270 dsb = dsb_create(); 271 272 /* 273 * The message transport has responded. Stop the watchdog timer. 274 */ 275 event_cancel_timer(qmgr_deliver_abort, context); 276 277 /* 278 * Retrieve the delivery agent status report. The numerical status code 279 * indicates if delivery should be tried again. The reason text is sent 280 * only when a site should be avoided for a while, so that the queue 281 * manager can log why it does not even try to schedule delivery to the 282 * affected recipients. 283 */ 284 status = qmgr_deliver_final_reply(entry->stream, dsb); 285 286 /* 287 * The mail delivery process failed for some reason (although delivery 288 * may have been successful). Back off with this transport type for a 289 * while. Dispose of queue entries for this transport that await 290 * selection (the todo lists). Stay away from queue entries that have 291 * been selected (the busy lists), or we would have dangling pointers. 292 * The queue itself won't go away before we dispose of the current queue 293 * entry. 294 */ 295 if (status == DELIVER_STAT_CRASH) { 296 message->flags |= DELIVER_STAT_DEFER; 297 #if 0 298 whatsup = concatenate("unknown ", transport->name, 299 " mail transport error", (char *) 0); 300 qmgr_transport_throttle(transport, 301 DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup)); 302 myfree(whatsup); 303 #else 304 qmgr_transport_throttle(transport, 305 DSN_SIMPLE(&dsb->dsn, "4.3.0", 306 "unknown mail transport error")); 307 #endif 308 msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description", 309 transport->name); 310 311 /* 312 * Assume the worst and write a defer logfile record for each 313 * recipient. This omission was already present in the first queue 314 * manager implementation of 199703, and was fixed 200511. 315 * 316 * To avoid the synchronous qmgr_defer_recipient() operation for each 317 * recipient of this queue entry, release the delivery process and 318 * move the entry back to the todo queue. Let qmgr_defer_transport() 319 * log the recipient asynchronously if possible, and get out of here. 320 * Note: if asynchronous logging is not possible, 321 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and 322 * the entry becomes a dangling pointer. 323 */ 324 QMGR_DELIVER_RELEASE_AGENT(entry); 325 qmgr_entry_unselect(entry); 326 qmgr_defer_transport(transport, &dsb->dsn); 327 return; 328 } 329 330 /* 331 * This message must be tried again. 332 * 333 * If we have a problem talking to this site, back off with this site for a 334 * while; dispose of queue entries for this site that await selection 335 * (the todo list); stay away from queue entries that have been selected 336 * (the busy list), or we would have dangling pointers. The queue itself 337 * won't go away before we dispose of the current queue entry. 338 * 339 * XXX Caution: DSN_COPY() will panic on empty status or reason. 340 */ 341 #define SUSPENDED "delivery temporarily suspended: " 342 343 if (status == DELIVER_STAT_DEFER) { 344 message->flags |= DELIVER_STAT_DEFER; 345 if (VSTRING_LEN(dsb->status)) { 346 /* Sanitize the DSN status/reason from the delivery agent. */ 347 if (!dsn_valid(vstring_str(dsb->status))) 348 vstring_strcpy(dsb->status, "4.0.0"); 349 if (VSTRING_LEN(dsb->reason) == 0) 350 vstring_strcpy(dsb->reason, "unknown error"); 351 vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1); 352 if (QMGR_QUEUE_READY(queue)) { 353 qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb)); 354 if (QMGR_QUEUE_THROTTLED(queue)) 355 qmgr_defer_todo(queue, &dsb->dsn); 356 } 357 } 358 } 359 360 /* 361 * No problems detected. Mark the transport and queue as alive. The queue 362 * itself won't go away before we dispose of the current queue entry. 363 */ 364 if (status != DELIVER_STAT_CRASH) { 365 qmgr_transport_unthrottle(transport); 366 if (VSTRING_LEN(dsb->reason) == 0) 367 qmgr_queue_unthrottle(queue); 368 } 369 370 /* 371 * Release the delivery process, and give some other queue entry a chance 372 * to be delivered. When all recipients for a message have been tried, 373 * decide what to do next with this message: defer, bounce, delete. 374 */ 375 QMGR_DELIVER_RELEASE_AGENT(entry); 376 qmgr_entry_done(entry, QMGR_QUEUE_BUSY); 377 } 378 379 /* qmgr_deliver - deliver one per-site queue entry */ 380 381 void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream) 382 { 383 QMGR_ENTRY *entry; 384 DSN dsn; 385 386 /* 387 * Find out if this delivery process is really available. Once elected, 388 * the delivery process is supposed to express its happiness. If there is 389 * a problem, wipe the pending deliveries for this transport. This 390 * routine runs in response to an external event, so it does not run 391 * while some other queue manipulation is happening. 392 */ 393 if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) { 394 #if 0 395 whatsup = concatenate(transport->name, 396 " mail transport unavailable", (char *) 0); 397 qmgr_transport_throttle(transport, 398 DSN_SIMPLE(&dsn, "4.3.0", whatsup)); 399 myfree(whatsup); 400 #else 401 qmgr_transport_throttle(transport, 402 DSN_SIMPLE(&dsn, "4.3.0", 403 "mail transport unavailable")); 404 #endif 405 qmgr_defer_transport(transport, &dsn); 406 if (stream) 407 (void) vstream_fclose(stream); 408 return; 409 } 410 411 /* 412 * Find a suitable queue entry. Things may have changed since this 413 * transport was allocated. If no suitable entry is found, 414 * unceremoniously disconnect from the delivery process. The delivery 415 * agent request reading routine is prepared for the queue manager to 416 * change its mind for no apparent reason. 417 */ 418 if ((entry = qmgr_job_entry_select(transport)) == 0) { 419 (void) vstream_fclose(stream); 420 return; 421 } 422 423 /* 424 * Send the queue file info and recipient info to the delivery process. 425 * If there is a problem, wipe the pending deliveries for this transport. 426 * This routine runs in response to an external event, so it does not run 427 * while some other queue manipulation is happening. 428 */ 429 if (qmgr_deliver_send_request(entry, stream) < 0) { 430 qmgr_entry_unselect(entry); 431 #if 0 432 whatsup = concatenate(transport->name, 433 " mail transport unavailable", (char *) 0); 434 qmgr_transport_throttle(transport, 435 DSN_SIMPLE(&dsn, "4.3.0", whatsup)); 436 myfree(whatsup); 437 #else 438 qmgr_transport_throttle(transport, 439 DSN_SIMPLE(&dsn, "4.3.0", 440 "mail transport unavailable")); 441 #endif 442 qmgr_defer_transport(transport, &dsn); 443 /* warning: entry may be a dangling pointer here */ 444 (void) vstream_fclose(stream); 445 return; 446 } 447 448 /* 449 * If we get this far, go wait for the delivery status report. 450 */ 451 qmgr_deliver_concurrency++; 452 entry->stream = stream; 453 event_enable_read(vstream_fileno(stream), 454 qmgr_deliver_update, (void *) entry); 455 456 /* 457 * Guard against broken systems. 458 */ 459 event_request_timer(qmgr_deliver_abort, (void *) entry, var_daemon_timeout); 460 } 461