1 /* $OpenBSD: queue.c,v 1.184 2017/11/21 12:20:34 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 7 * 8 * Permission to use, copy, modify, and distribute this software for any 9 * purpose with or without fee is hereby granted, provided that the above 10 * copyright notice and this permission notice appear in all copies. 11 * 12 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 13 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 14 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 15 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 16 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 17 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 18 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 19 */ 20 21 #include <sys/types.h> 22 #include <sys/queue.h> 23 #include <sys/tree.h> 24 #include <sys/socket.h> 25 #include <sys/stat.h> 26 27 #include <err.h> 28 #include <event.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <libgen.h> 32 #include <pwd.h> 33 #include <signal.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <time.h> 38 #include <unistd.h> 39 #include <limits.h> 40 41 #include "smtpd.h" 42 #include "log.h" 43 44 static void queue_imsg(struct mproc *, struct imsg *); 45 static void queue_timeout(int, short, void *); 46 static void queue_bounce(struct envelope *, struct delivery_bounce *); 47 static void queue_shutdown(void); 48 static void queue_log(const struct envelope *, const char *, const char *); 49 static void queue_msgid_walk(int, short, void *); 50 51 52 static void 53 queue_imsg(struct mproc *p, struct imsg *imsg) 54 { 55 struct delivery_bounce bounce; 56 struct msg_walkinfo *wi; 57 struct timeval tv; 58 struct bounce_req_msg *req_bounce; 59 struct envelope evp; 60 struct msg m; 61 const char *reason; 62 uint64_t reqid, evpid, holdq; 63 uint32_t msgid; 64 time_t nexttry; 65 size_t n_evp; 66 int fd, mta_ext, ret, v, flags, code; 67 68 if (imsg == NULL) 69 queue_shutdown(); 70 71 memset(&bounce, 0, sizeof(struct delivery_bounce)); 72 73 switch (imsg->hdr.type) { 74 case IMSG_SMTP_MESSAGE_CREATE: 75 m_msg(&m, imsg); 76 m_get_id(&m, &reqid); 77 m_end(&m); 78 79 ret = queue_message_create(&msgid); 80 81 m_create(p, IMSG_SMTP_MESSAGE_CREATE, 0, 0, -1); 82 m_add_id(p, reqid); 83 if (ret == 0) 84 m_add_int(p, 0); 85 else { 86 m_add_int(p, 1); 87 m_add_msgid(p, msgid); 88 } 89 m_close(p); 90 return; 91 92 case IMSG_SMTP_MESSAGE_ROLLBACK: 93 m_msg(&m, imsg); 94 m_get_msgid(&m, &msgid); 95 m_end(&m); 96 97 queue_message_delete(msgid); 98 99 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_ROLLBACK, 100 0, 0, -1); 101 m_add_msgid(p_scheduler, msgid); 102 m_close(p_scheduler); 103 return; 104 105 case IMSG_SMTP_MESSAGE_COMMIT: 106 m_msg(&m, imsg); 107 m_get_id(&m, &reqid); 108 m_get_msgid(&m, &msgid); 109 m_end(&m); 110 111 ret = queue_message_commit(msgid); 112 113 m_create(p, IMSG_SMTP_MESSAGE_COMMIT, 0, 0, -1); 114 m_add_id(p, reqid); 115 m_add_int(p, (ret == 0) ? 0 : 1); 116 m_close(p); 117 118 if (ret) { 119 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 120 0, 0, -1); 121 m_add_msgid(p_scheduler, msgid); 122 m_close(p_scheduler); 123 } 124 return; 125 126 case IMSG_SMTP_MESSAGE_OPEN: 127 m_msg(&m, imsg); 128 m_get_id(&m, &reqid); 129 m_get_msgid(&m, &msgid); 130 m_end(&m); 131 132 fd = queue_message_fd_rw(msgid); 133 134 m_create(p, IMSG_SMTP_MESSAGE_OPEN, 0, 0, fd); 135 m_add_id(p, reqid); 136 m_add_int(p, (fd == -1) ? 0 : 1); 137 m_close(p); 138 return; 139 140 case IMSG_QUEUE_SMTP_SESSION: 141 bounce_fd(imsg->fd); 142 return; 143 144 case IMSG_LKA_ENVELOPE_SUBMIT: 145 m_msg(&m, imsg); 146 m_get_id(&m, &reqid); 147 m_get_envelope(&m, &evp); 148 m_end(&m); 149 150 if (evp.id == 0) 151 log_warnx("warn: imsg_queue_submit_envelope: evpid=0"); 152 if (evpid_to_msgid(evp.id) == 0) 153 log_warnx("warn: imsg_queue_submit_envelope: msgid=0, " 154 "evpid=%016"PRIx64, evp.id); 155 ret = queue_envelope_create(&evp); 156 m_create(p_pony, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 157 m_add_id(p_pony, reqid); 158 if (ret == 0) 159 m_add_int(p_pony, 0); 160 else { 161 m_add_int(p_pony, 1); 162 m_add_evpid(p_pony, evp.id); 163 } 164 m_close(p_pony); 165 if (ret) { 166 m_create(p_scheduler, 167 IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 168 m_add_envelope(p_scheduler, &evp); 169 m_close(p_scheduler); 170 } 171 return; 172 173 case IMSG_LKA_ENVELOPE_COMMIT: 174 m_msg(&m, imsg); 175 m_get_id(&m, &reqid); 176 m_end(&m); 177 m_create(p_pony, IMSG_QUEUE_ENVELOPE_COMMIT, 0, 0, -1); 178 m_add_id(p_pony, reqid); 179 m_add_int(p_pony, 1); 180 m_close(p_pony); 181 return; 182 183 case IMSG_SCHED_ENVELOPE_REMOVE: 184 m_msg(&m, imsg); 185 m_get_evpid(&m, &evpid); 186 m_end(&m); 187 188 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); 189 m_add_evpid(p_scheduler, evpid); 190 m_close(p_scheduler); 191 192 /* already removed by scheduler */ 193 if (queue_envelope_load(evpid, &evp) == 0) 194 return; 195 196 queue_log(&evp, "Remove", "Removed by administrator"); 197 queue_envelope_delete(evpid); 198 return; 199 200 case IMSG_SCHED_ENVELOPE_EXPIRE: 201 m_msg(&m, imsg); 202 m_get_evpid(&m, &evpid); 203 m_end(&m); 204 205 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); 206 m_add_evpid(p_scheduler, evpid); 207 m_close(p_scheduler); 208 209 /* already removed by scheduler*/ 210 if (queue_envelope_load(evpid, &evp) == 0) 211 return; 212 213 bounce.type = B_ERROR; 214 envelope_set_errormsg(&evp, "Envelope expired"); 215 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 216 envelope_set_esc_code(&evp, ESC_DELIVERY_TIME_EXPIRED); 217 queue_bounce(&evp, &bounce); 218 queue_log(&evp, "Expire", "Envelope expired"); 219 queue_envelope_delete(evpid); 220 return; 221 222 case IMSG_SCHED_ENVELOPE_BOUNCE: 223 CHECK_IMSG_DATA_SIZE(imsg, sizeof *req_bounce); 224 req_bounce = imsg->data; 225 evpid = req_bounce->evpid; 226 227 if (queue_envelope_load(evpid, &evp) == 0) { 228 log_warnx("queue: bounce: failed to load envelope"); 229 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 230 m_add_evpid(p_scheduler, evpid); 231 m_add_u32(p_scheduler, 0); /* not in-flight */ 232 m_close(p_scheduler); 233 return; 234 } 235 queue_bounce(&evp, &req_bounce->bounce); 236 evp.lastbounce = req_bounce->timestamp; 237 if (!queue_envelope_update(&evp)) 238 log_warnx("warn: could not update envelope %016"PRIx64, evpid); 239 return; 240 241 case IMSG_SCHED_ENVELOPE_DELIVER: 242 m_msg(&m, imsg); 243 m_get_evpid(&m, &evpid); 244 m_end(&m); 245 if (queue_envelope_load(evpid, &evp) == 0) { 246 log_warnx("queue: deliver: failed to load envelope"); 247 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 248 m_add_evpid(p_scheduler, evpid); 249 m_add_u32(p_scheduler, 1); /* in-flight */ 250 m_close(p_scheduler); 251 return; 252 } 253 evp.lasttry = time(NULL); 254 m_create(p_pony, IMSG_QUEUE_DELIVER, 0, 0, -1); 255 m_add_envelope(p_pony, &evp); 256 m_close(p_pony); 257 return; 258 259 case IMSG_SCHED_ENVELOPE_INJECT: 260 m_msg(&m, imsg); 261 m_get_evpid(&m, &evpid); 262 m_end(&m); 263 bounce_add(evpid); 264 return; 265 266 case IMSG_SCHED_ENVELOPE_TRANSFER: 267 m_msg(&m, imsg); 268 m_get_evpid(&m, &evpid); 269 m_end(&m); 270 if (queue_envelope_load(evpid, &evp) == 0) { 271 log_warnx("queue: failed to load envelope"); 272 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 273 m_add_evpid(p_scheduler, evpid); 274 m_add_u32(p_scheduler, 1); /* in-flight */ 275 m_close(p_scheduler); 276 return; 277 } 278 evp.lasttry = time(NULL); 279 m_create(p_pony, IMSG_QUEUE_TRANSFER, 0, 0, -1); 280 m_add_envelope(p_pony, &evp); 281 m_close(p_pony); 282 return; 283 284 case IMSG_CTL_LIST_ENVELOPES: 285 if (imsg->hdr.len == sizeof imsg->hdr) { 286 m_forward(p_control, imsg); 287 return; 288 } 289 290 m_msg(&m, imsg); 291 m_get_evpid(&m, &evpid); 292 m_get_int(&m, &flags); 293 m_get_time(&m, &nexttry); 294 m_end(&m); 295 296 if (queue_envelope_load(evpid, &evp) == 0) 297 return; /* Envelope is gone, drop it */ 298 299 /* 300 * XXX consistency: The envelope might already be on 301 * its way back to the scheduler. We need to detect 302 * this properly and report that state. 303 */ 304 if (flags & EF_INFLIGHT) { 305 /* 306 * Not exactly correct but pretty close: The 307 * value is not recorded on the envelope unless 308 * a tempfail occurs. 309 */ 310 evp.lasttry = nexttry; 311 } 312 313 m_create(p_control, IMSG_CTL_LIST_ENVELOPES, 314 imsg->hdr.peerid, 0, -1); 315 m_add_int(p_control, flags); 316 m_add_time(p_control, nexttry); 317 m_add_envelope(p_control, &evp); 318 m_close(p_control); 319 return; 320 321 case IMSG_MDA_OPEN_MESSAGE: 322 case IMSG_MTA_OPEN_MESSAGE: 323 m_msg(&m, imsg); 324 m_get_id(&m, &reqid); 325 m_get_msgid(&m, &msgid); 326 m_end(&m); 327 fd = queue_message_fd_r(msgid); 328 m_create(p, imsg->hdr.type, 0, 0, fd); 329 m_add_id(p, reqid); 330 m_close(p); 331 return; 332 333 case IMSG_MDA_DELIVERY_OK: 334 case IMSG_MTA_DELIVERY_OK: 335 m_msg(&m, imsg); 336 m_get_evpid(&m, &evpid); 337 if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK) 338 m_get_int(&m, &mta_ext); 339 m_end(&m); 340 if (queue_envelope_load(evpid, &evp) == 0) { 341 log_warn("queue: dsn: failed to load envelope"); 342 return; 343 } 344 if (evp.dsn_notify & DSN_SUCCESS) { 345 bounce.type = B_DSN; 346 bounce.dsn_ret = evp.dsn_ret; 347 envelope_set_esc_class(&evp, ESC_STATUS_OK); 348 if (imsg->hdr.type == IMSG_MDA_DELIVERY_OK) 349 queue_bounce(&evp, &bounce); 350 else if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK && 351 (mta_ext & MTA_EXT_DSN) == 0) { 352 bounce.mta_without_dsn = 1; 353 queue_bounce(&evp, &bounce); 354 } 355 } 356 queue_envelope_delete(evpid); 357 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_OK, 0, 0, -1); 358 m_add_evpid(p_scheduler, evpid); 359 m_close(p_scheduler); 360 return; 361 362 case IMSG_MDA_DELIVERY_TEMPFAIL: 363 case IMSG_MTA_DELIVERY_TEMPFAIL: 364 m_msg(&m, imsg); 365 m_get_evpid(&m, &evpid); 366 m_get_string(&m, &reason); 367 m_get_int(&m, &code); 368 m_end(&m); 369 if (queue_envelope_load(evpid, &evp) == 0) { 370 log_warnx("queue: tempfail: failed to load envelope"); 371 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 372 m_add_evpid(p_scheduler, evpid); 373 m_add_u32(p_scheduler, 1); /* in-flight */ 374 m_close(p_scheduler); 375 return; 376 } 377 envelope_set_errormsg(&evp, "%s", reason); 378 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 379 envelope_set_esc_code(&evp, code); 380 evp.retry++; 381 if (!queue_envelope_update(&evp)) 382 log_warnx("warn: could not update envelope %016"PRIx64, evpid); 383 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1); 384 m_add_envelope(p_scheduler, &evp); 385 m_close(p_scheduler); 386 return; 387 388 case IMSG_MDA_DELIVERY_PERMFAIL: 389 case IMSG_MTA_DELIVERY_PERMFAIL: 390 m_msg(&m, imsg); 391 m_get_evpid(&m, &evpid); 392 m_get_string(&m, &reason); 393 m_get_int(&m, &code); 394 m_end(&m); 395 if (queue_envelope_load(evpid, &evp) == 0) { 396 log_warnx("queue: permfail: failed to load envelope"); 397 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 398 m_add_evpid(p_scheduler, evpid); 399 m_add_u32(p_scheduler, 1); /* in-flight */ 400 m_close(p_scheduler); 401 return; 402 } 403 bounce.type = B_ERROR; 404 envelope_set_errormsg(&evp, "%s", reason); 405 envelope_set_esc_class(&evp, ESC_STATUS_PERMFAIL); 406 envelope_set_esc_code(&evp, code); 407 queue_bounce(&evp, &bounce); 408 queue_envelope_delete(evpid); 409 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1); 410 m_add_evpid(p_scheduler, evpid); 411 m_close(p_scheduler); 412 return; 413 414 case IMSG_MDA_DELIVERY_LOOP: 415 case IMSG_MTA_DELIVERY_LOOP: 416 m_msg(&m, imsg); 417 m_get_evpid(&m, &evpid); 418 m_end(&m); 419 if (queue_envelope_load(evpid, &evp) == 0) { 420 log_warnx("queue: loop: failed to load envelope"); 421 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 422 m_add_evpid(p_scheduler, evpid); 423 m_add_u32(p_scheduler, 1); /* in-flight */ 424 m_close(p_scheduler); 425 return; 426 } 427 envelope_set_errormsg(&evp, "%s", "Loop detected"); 428 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 429 envelope_set_esc_code(&evp, ESC_ROUTING_LOOP_DETECTED); 430 bounce.type = B_ERROR; 431 queue_bounce(&evp, &bounce); 432 queue_envelope_delete(evp.id); 433 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1); 434 m_add_evpid(p_scheduler, evp.id); 435 m_close(p_scheduler); 436 return; 437 438 case IMSG_MTA_DELIVERY_HOLD: 439 case IMSG_MDA_DELIVERY_HOLD: 440 imsg->hdr.type = IMSG_QUEUE_HOLDQ_HOLD; 441 m_forward(p_scheduler, imsg); 442 return; 443 444 case IMSG_MTA_SCHEDULE: 445 imsg->hdr.type = IMSG_QUEUE_ENVELOPE_SCHEDULE; 446 m_forward(p_scheduler, imsg); 447 return; 448 449 case IMSG_MTA_HOLDQ_RELEASE: 450 case IMSG_MDA_HOLDQ_RELEASE: 451 m_msg(&m, imsg); 452 m_get_id(&m, &holdq); 453 m_get_int(&m, &v); 454 m_end(&m); 455 m_create(p_scheduler, IMSG_QUEUE_HOLDQ_RELEASE, 0, 0, -1); 456 if (imsg->hdr.type == IMSG_MTA_HOLDQ_RELEASE) 457 m_add_int(p_scheduler, D_MTA); 458 else 459 m_add_int(p_scheduler, D_MDA); 460 m_add_id(p_scheduler, holdq); 461 m_add_int(p_scheduler, v); 462 m_close(p_scheduler); 463 return; 464 465 case IMSG_CTL_PAUSE_MDA: 466 case IMSG_CTL_PAUSE_MTA: 467 case IMSG_CTL_RESUME_MDA: 468 case IMSG_CTL_RESUME_MTA: 469 m_forward(p_scheduler, imsg); 470 return; 471 472 case IMSG_CTL_VERBOSE: 473 m_msg(&m, imsg); 474 m_get_int(&m, &v); 475 m_end(&m); 476 log_trace_verbose(v); 477 return; 478 479 case IMSG_CTL_PROFILE: 480 m_msg(&m, imsg); 481 m_get_int(&m, &v); 482 m_end(&m); 483 profiling = v; 484 return; 485 486 case IMSG_CTL_DISCOVER_EVPID: 487 m_msg(&m, imsg); 488 m_get_evpid(&m, &evpid); 489 m_end(&m); 490 if (queue_envelope_load(evpid, &evp) == 0) { 491 log_warnx("queue: discover: failed to load " 492 "envelope %016" PRIx64, evpid); 493 n_evp = 0; 494 m_compose(p_control, imsg->hdr.type, 495 imsg->hdr.peerid, 0, -1, 496 &n_evp, sizeof n_evp); 497 return; 498 } 499 500 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 501 0, 0, -1); 502 m_add_envelope(p_scheduler, &evp); 503 m_close(p_scheduler); 504 505 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID, 506 0, 0, -1); 507 m_add_msgid(p_scheduler, evpid_to_msgid(evpid)); 508 m_close(p_scheduler); 509 n_evp = 1; 510 m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid, 511 0, -1, &n_evp, sizeof n_evp); 512 return; 513 514 case IMSG_CTL_DISCOVER_MSGID: 515 m_msg(&m, imsg); 516 m_get_msgid(&m, &msgid); 517 m_end(&m); 518 /* handle concurrent walk requests */ 519 wi = xcalloc(1, sizeof *wi, "queu_imsg"); 520 wi->msgid = msgid; 521 wi->peerid = imsg->hdr.peerid; 522 evtimer_set(&wi->ev, queue_msgid_walk, wi); 523 tv.tv_sec = 0; 524 tv.tv_usec = 10; 525 evtimer_add(&wi->ev, &tv); 526 return; 527 528 case IMSG_CTL_UNCORRUPT_MSGID: 529 m_msg(&m, imsg); 530 m_get_msgid(&m, &msgid); 531 m_end(&m); 532 ret = queue_message_uncorrupt(msgid); 533 m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid, 534 0, -1, &ret, sizeof ret); 535 return; 536 } 537 538 errx(1, "queue_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type)); 539 } 540 541 static void 542 queue_msgid_walk(int fd, short event, void *arg) 543 { 544 struct envelope evp; 545 struct timeval tv; 546 struct msg_walkinfo *wi = arg; 547 int r; 548 549 r = queue_message_walk(&evp, wi->msgid, &wi->done, &wi->data); 550 if (r == -1) { 551 if (wi->n_evp) { 552 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID, 553 0, 0, -1); 554 m_add_msgid(p_scheduler, wi->msgid); 555 m_close(p_scheduler); 556 } 557 558 m_compose(p_control, IMSG_CTL_DISCOVER_MSGID, wi->peerid, 0, -1, 559 &wi->n_evp, sizeof wi->n_evp); 560 evtimer_del(&wi->ev); 561 free(wi); 562 return; 563 } 564 565 if (r) { 566 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 0, 0, -1); 567 m_add_envelope(p_scheduler, &evp); 568 m_close(p_scheduler); 569 wi->n_evp += 1; 570 } 571 572 tv.tv_sec = 0; 573 tv.tv_usec = 10; 574 evtimer_set(&wi->ev, queue_msgid_walk, wi); 575 evtimer_add(&wi->ev, &tv); 576 } 577 578 static void 579 queue_bounce(struct envelope *e, struct delivery_bounce *d) 580 { 581 struct envelope b; 582 583 b = *e; 584 b.type = D_BOUNCE; 585 b.agent.bounce = *d; 586 b.retry = 0; 587 b.lasttry = 0; 588 b.creation = time(NULL); 589 b.expire = 3600 * 24 * 7; 590 591 if (e->dsn_notify & DSN_NEVER) 592 return; 593 594 if (b.id == 0) 595 log_warnx("warn: queue_bounce: evpid=0"); 596 if (evpid_to_msgid(b.id) == 0) 597 log_warnx("warn: queue_bounce: msgid=0, evpid=%016"PRIx64, 598 b.id); 599 if (e->type == D_BOUNCE) { 600 log_warnx("warn: queue: double bounce!"); 601 } else if (e->sender.user[0] == '\0') { 602 log_warnx("warn: queue: no return path!"); 603 } else if (!queue_envelope_create(&b)) { 604 log_warnx("warn: queue: cannot bounce!"); 605 } else { 606 log_debug("debug: queue: bouncing evp:%016" PRIx64 607 " as evp:%016" PRIx64, e->id, b.id); 608 609 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 610 m_add_envelope(p_scheduler, &b); 611 m_close(p_scheduler); 612 613 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 0, 0, -1); 614 m_add_msgid(p_scheduler, evpid_to_msgid(b.id)); 615 m_close(p_scheduler); 616 617 stat_increment("queue.bounce", 1); 618 } 619 } 620 621 static void 622 queue_shutdown(void) 623 { 624 log_debug("debug: queue agent exiting"); 625 queue_close(); 626 _exit(0); 627 } 628 629 int 630 queue(void) 631 { 632 struct passwd *pw; 633 struct timeval tv; 634 struct event ev_qload; 635 636 purge_config(PURGE_EVERYTHING); 637 638 if ((pw = getpwnam(SMTPD_QUEUE_USER)) == NULL) 639 if ((pw = getpwnam(SMTPD_USER)) == NULL) 640 fatalx("unknown user " SMTPD_USER); 641 642 env->sc_queue_flags |= QUEUE_EVPCACHE; 643 env->sc_queue_evpcache_size = 1024; 644 645 if (chroot(PATH_SPOOL) == -1) 646 fatal("queue: chroot"); 647 if (chdir("/") == -1) 648 fatal("queue: chdir(\"/\")"); 649 650 config_process(PROC_QUEUE); 651 652 if (env->sc_queue_flags & QUEUE_COMPRESSION) 653 log_info("queue: queue compression enabled"); 654 655 if (env->sc_queue_key) { 656 if (!crypto_setup(env->sc_queue_key, strlen(env->sc_queue_key))) 657 fatalx("crypto_setup: invalid key for queue encryption"); 658 log_info("queue: queue encryption enabled"); 659 } 660 661 if (setgroups(1, &pw->pw_gid) || 662 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 663 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 664 fatal("queue: cannot drop privileges"); 665 666 imsg_callback = queue_imsg; 667 event_init(); 668 669 signal(SIGINT, SIG_IGN); 670 signal(SIGTERM, SIG_IGN); 671 signal(SIGPIPE, SIG_IGN); 672 signal(SIGHUP, SIG_IGN); 673 674 config_peer(PROC_PARENT); 675 config_peer(PROC_CONTROL); 676 config_peer(PROC_LKA); 677 config_peer(PROC_SCHEDULER); 678 config_peer(PROC_PONY); 679 680 /* setup queue loading task */ 681 evtimer_set(&ev_qload, queue_timeout, &ev_qload); 682 tv.tv_sec = 0; 683 tv.tv_usec = 10; 684 evtimer_add(&ev_qload, &tv); 685 686 if (pledge("stdio rpath wpath cpath flock recvfd sendfd", NULL) == -1) 687 err(1, "pledge"); 688 689 event_dispatch(); 690 fatalx("exited event loop"); 691 692 return (0); 693 } 694 695 static void 696 queue_timeout(int fd, short event, void *p) 697 { 698 static uint32_t msgid = 0; 699 struct envelope evp; 700 struct event *ev = p; 701 struct timeval tv; 702 int r; 703 704 r = queue_envelope_walk(&evp); 705 if (r == -1) { 706 if (msgid) { 707 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 708 0, 0, -1); 709 m_add_msgid(p_scheduler, msgid); 710 m_close(p_scheduler); 711 } 712 log_debug("debug: queue: done loading queue into scheduler"); 713 return; 714 } 715 716 if (r) { 717 if (msgid && evpid_to_msgid(evp.id) != msgid) { 718 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 719 0, 0, -1); 720 m_add_msgid(p_scheduler, msgid); 721 m_close(p_scheduler); 722 } 723 msgid = evpid_to_msgid(evp.id); 724 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 725 m_add_envelope(p_scheduler, &evp); 726 m_close(p_scheduler); 727 } 728 729 tv.tv_sec = 0; 730 tv.tv_usec = 10; 731 evtimer_add(ev, &tv); 732 } 733 734 static void 735 queue_log(const struct envelope *e, const char *prefix, const char *status) 736 { 737 char rcpt[LINE_MAX]; 738 739 (void)strlcpy(rcpt, "-", sizeof rcpt); 740 if (strcmp(e->rcpt.user, e->dest.user) || 741 strcmp(e->rcpt.domain, e->dest.domain)) 742 (void)snprintf(rcpt, sizeof rcpt, "%s@%s", 743 e->rcpt.user, e->rcpt.domain); 744 745 log_info("%s: %s for %016" PRIx64 ": from=<%s@%s>, to=<%s@%s>, " 746 "rcpt=<%s>, delay=%s, stat=%s", 747 e->type == D_MDA ? "delivery" : "relay", 748 prefix, 749 e->id, e->sender.user, e->sender.domain, 750 e->dest.user, e->dest.domain, 751 rcpt, 752 duration_to_text(time(NULL) - e->creation), 753 status); 754 } 755