1 /* $OpenBSD: queue.c,v 1.187 2018/05/31 21:06:12 gilles Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 7 * 8 * Permission to use, copy, modify, and distribute this software for any 9 * purpose with or without fee is hereby granted, provided that the above 10 * copyright notice and this permission notice appear in all copies. 11 * 12 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 13 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 14 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 15 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 16 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 17 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 18 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 19 */ 20 21 #include <sys/types.h> 22 #include <sys/queue.h> 23 #include <sys/tree.h> 24 #include <sys/socket.h> 25 #include <sys/stat.h> 26 27 #include <err.h> 28 #include <event.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <libgen.h> 32 #include <pwd.h> 33 #include <signal.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <time.h> 38 #include <unistd.h> 39 #include <limits.h> 40 41 #include "smtpd.h" 42 #include "log.h" 43 44 static void queue_imsg(struct mproc *, struct imsg *); 45 static void queue_timeout(int, short, void *); 46 static void queue_bounce(struct envelope *, struct delivery_bounce *); 47 static void queue_shutdown(void); 48 static void queue_log(const struct envelope *, const char *, const char *); 49 static void queue_msgid_walk(int, short, void *); 50 51 52 static void 53 queue_imsg(struct mproc *p, struct imsg *imsg) 54 { 55 struct delivery_bounce bounce; 56 struct msg_walkinfo *wi; 57 struct timeval tv; 58 struct bounce_req_msg *req_bounce; 59 struct envelope evp; 60 struct msg m; 61 const char *reason; 62 uint64_t reqid, evpid, holdq; 63 uint32_t msgid; 64 time_t nexttry; 65 size_t n_evp; 66 int fd, mta_ext, ret, v, flags, code; 67 68 if (imsg == NULL) 69 queue_shutdown(); 70 71 memset(&bounce, 0, sizeof(struct delivery_bounce)); 72 73 switch (imsg->hdr.type) { 74 case IMSG_SMTP_MESSAGE_CREATE: 75 m_msg(&m, imsg); 76 m_get_id(&m, &reqid); 77 m_end(&m); 78 79 ret = queue_message_create(&msgid); 80 81 m_create(p, IMSG_SMTP_MESSAGE_CREATE, 0, 0, -1); 82 m_add_id(p, reqid); 83 if (ret == 0) 84 m_add_int(p, 0); 85 else { 86 m_add_int(p, 1); 87 m_add_msgid(p, msgid); 88 } 89 m_close(p); 90 return; 91 92 case IMSG_SMTP_MESSAGE_ROLLBACK: 93 m_msg(&m, imsg); 94 m_get_msgid(&m, &msgid); 95 m_end(&m); 96 97 queue_message_delete(msgid); 98 99 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_ROLLBACK, 100 0, 0, -1); 101 m_add_msgid(p_scheduler, msgid); 102 m_close(p_scheduler); 103 return; 104 105 case IMSG_SMTP_MESSAGE_COMMIT: 106 m_msg(&m, imsg); 107 m_get_id(&m, &reqid); 108 m_get_msgid(&m, &msgid); 109 m_end(&m); 110 111 ret = queue_message_commit(msgid); 112 113 m_create(p, IMSG_SMTP_MESSAGE_COMMIT, 0, 0, -1); 114 m_add_id(p, reqid); 115 m_add_int(p, (ret == 0) ? 0 : 1); 116 m_close(p); 117 118 if (ret) { 119 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 120 0, 0, -1); 121 m_add_msgid(p_scheduler, msgid); 122 m_close(p_scheduler); 123 } 124 return; 125 126 case IMSG_SMTP_MESSAGE_OPEN: 127 m_msg(&m, imsg); 128 m_get_id(&m, &reqid); 129 m_get_msgid(&m, &msgid); 130 m_end(&m); 131 132 fd = queue_message_fd_rw(msgid); 133 134 m_create(p, IMSG_SMTP_MESSAGE_OPEN, 0, 0, fd); 135 m_add_id(p, reqid); 136 m_add_int(p, (fd == -1) ? 0 : 1); 137 m_close(p); 138 return; 139 140 case IMSG_QUEUE_SMTP_SESSION: 141 bounce_fd(imsg->fd); 142 return; 143 144 case IMSG_LKA_ENVELOPE_SUBMIT: 145 m_msg(&m, imsg); 146 m_get_id(&m, &reqid); 147 m_get_envelope(&m, &evp); 148 m_end(&m); 149 150 if (evp.id == 0) 151 log_warnx("warn: imsg_queue_submit_envelope: evpid=0"); 152 if (evpid_to_msgid(evp.id) == 0) 153 log_warnx("warn: imsg_queue_submit_envelope: msgid=0, " 154 "evpid=%016"PRIx64, evp.id); 155 ret = queue_envelope_create(&evp); 156 m_create(p_pony, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 157 m_add_id(p_pony, reqid); 158 if (ret == 0) 159 m_add_int(p_pony, 0); 160 else { 161 m_add_int(p_pony, 1); 162 m_add_evpid(p_pony, evp.id); 163 } 164 m_close(p_pony); 165 if (ret) { 166 m_create(p_scheduler, 167 IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 168 m_add_envelope(p_scheduler, &evp); 169 m_close(p_scheduler); 170 } 171 return; 172 173 case IMSG_LKA_ENVELOPE_COMMIT: 174 m_msg(&m, imsg); 175 m_get_id(&m, &reqid); 176 m_end(&m); 177 m_create(p_pony, IMSG_QUEUE_ENVELOPE_COMMIT, 0, 0, -1); 178 m_add_id(p_pony, reqid); 179 m_add_int(p_pony, 1); 180 m_close(p_pony); 181 return; 182 183 case IMSG_SCHED_ENVELOPE_REMOVE: 184 m_msg(&m, imsg); 185 m_get_evpid(&m, &evpid); 186 m_end(&m); 187 188 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); 189 m_add_evpid(p_scheduler, evpid); 190 m_close(p_scheduler); 191 192 /* already removed by scheduler */ 193 if (queue_envelope_load(evpid, &evp) == 0) 194 return; 195 196 queue_log(&evp, "Remove", "Removed by administrator"); 197 queue_envelope_delete(evpid); 198 return; 199 200 case IMSG_SCHED_ENVELOPE_EXPIRE: 201 m_msg(&m, imsg); 202 m_get_evpid(&m, &evpid); 203 m_end(&m); 204 205 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); 206 m_add_evpid(p_scheduler, evpid); 207 m_close(p_scheduler); 208 209 /* already removed by scheduler*/ 210 if (queue_envelope_load(evpid, &evp) == 0) 211 return; 212 213 bounce.type = B_ERROR; 214 envelope_set_errormsg(&evp, "Envelope expired"); 215 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 216 envelope_set_esc_code(&evp, ESC_DELIVERY_TIME_EXPIRED); 217 queue_bounce(&evp, &bounce); 218 queue_log(&evp, "Expire", "Envelope expired"); 219 queue_envelope_delete(evpid); 220 return; 221 222 case IMSG_SCHED_ENVELOPE_BOUNCE: 223 CHECK_IMSG_DATA_SIZE(imsg, sizeof *req_bounce); 224 req_bounce = imsg->data; 225 evpid = req_bounce->evpid; 226 227 if (queue_envelope_load(evpid, &evp) == 0) { 228 log_warnx("queue: bounce: failed to load envelope"); 229 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 230 m_add_evpid(p_scheduler, evpid); 231 m_add_u32(p_scheduler, 0); /* not in-flight */ 232 m_close(p_scheduler); 233 return; 234 } 235 queue_bounce(&evp, &req_bounce->bounce); 236 evp.lastbounce = req_bounce->timestamp; 237 if (!queue_envelope_update(&evp)) 238 log_warnx("warn: could not update envelope %016"PRIx64, evpid); 239 return; 240 241 case IMSG_SCHED_ENVELOPE_DELIVER: 242 m_msg(&m, imsg); 243 m_get_evpid(&m, &evpid); 244 m_end(&m); 245 if (queue_envelope_load(evpid, &evp) == 0) { 246 log_warnx("queue: deliver: failed to load envelope"); 247 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 248 m_add_evpid(p_scheduler, evpid); 249 m_add_u32(p_scheduler, 1); /* in-flight */ 250 m_close(p_scheduler); 251 return; 252 } 253 evp.lasttry = time(NULL); 254 m_create(p_pony, IMSG_QUEUE_DELIVER, 0, 0, -1); 255 m_add_envelope(p_pony, &evp); 256 m_close(p_pony); 257 return; 258 259 case IMSG_SCHED_ENVELOPE_INJECT: 260 m_msg(&m, imsg); 261 m_get_evpid(&m, &evpid); 262 m_end(&m); 263 bounce_add(evpid); 264 return; 265 266 case IMSG_SCHED_ENVELOPE_TRANSFER: 267 m_msg(&m, imsg); 268 m_get_evpid(&m, &evpid); 269 m_end(&m); 270 if (queue_envelope_load(evpid, &evp) == 0) { 271 log_warnx("queue: failed to load envelope"); 272 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 273 m_add_evpid(p_scheduler, evpid); 274 m_add_u32(p_scheduler, 1); /* in-flight */ 275 m_close(p_scheduler); 276 return; 277 } 278 evp.lasttry = time(NULL); 279 m_create(p_pony, IMSG_QUEUE_TRANSFER, 0, 0, -1); 280 m_add_envelope(p_pony, &evp); 281 m_close(p_pony); 282 return; 283 284 case IMSG_CTL_LIST_ENVELOPES: 285 if (imsg->hdr.len == sizeof imsg->hdr) { 286 m_forward(p_control, imsg); 287 return; 288 } 289 290 m_msg(&m, imsg); 291 m_get_evpid(&m, &evpid); 292 m_get_int(&m, &flags); 293 m_get_time(&m, &nexttry); 294 m_end(&m); 295 296 if (queue_envelope_load(evpid, &evp) == 0) 297 return; /* Envelope is gone, drop it */ 298 299 /* 300 * XXX consistency: The envelope might already be on 301 * its way back to the scheduler. We need to detect 302 * this properly and report that state. 303 */ 304 if (flags & EF_INFLIGHT) { 305 /* 306 * Not exactly correct but pretty close: The 307 * value is not recorded on the envelope unless 308 * a tempfail occurs. 309 */ 310 evp.lasttry = nexttry; 311 } 312 313 m_create(p_control, IMSG_CTL_LIST_ENVELOPES, 314 imsg->hdr.peerid, 0, -1); 315 m_add_int(p_control, flags); 316 m_add_time(p_control, nexttry); 317 m_add_envelope(p_control, &evp); 318 m_close(p_control); 319 return; 320 321 case IMSG_MDA_OPEN_MESSAGE: 322 case IMSG_MTA_OPEN_MESSAGE: 323 m_msg(&m, imsg); 324 m_get_id(&m, &reqid); 325 m_get_msgid(&m, &msgid); 326 m_end(&m); 327 fd = queue_message_fd_r(msgid); 328 m_create(p, imsg->hdr.type, 0, 0, fd); 329 m_add_id(p, reqid); 330 m_close(p); 331 return; 332 333 case IMSG_MDA_DELIVERY_OK: 334 case IMSG_MTA_DELIVERY_OK: 335 m_msg(&m, imsg); 336 m_get_evpid(&m, &evpid); 337 if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK) 338 m_get_int(&m, &mta_ext); 339 m_end(&m); 340 if (queue_envelope_load(evpid, &evp) == 0) { 341 log_warn("queue: dsn: failed to load envelope"); 342 return; 343 } 344 if (evp.dsn_notify & DSN_SUCCESS) { 345 bounce.type = B_DSN; 346 bounce.dsn_ret = evp.dsn_ret; 347 envelope_set_esc_class(&evp, ESC_STATUS_OK); 348 if (imsg->hdr.type == IMSG_MDA_DELIVERY_OK) 349 queue_bounce(&evp, &bounce); 350 else if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK && 351 (mta_ext & MTA_EXT_DSN) == 0) { 352 bounce.mta_without_dsn = 1; 353 queue_bounce(&evp, &bounce); 354 } 355 } 356 queue_envelope_delete(evpid); 357 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_OK, 0, 0, -1); 358 m_add_evpid(p_scheduler, evpid); 359 m_close(p_scheduler); 360 return; 361 362 case IMSG_MDA_DELIVERY_TEMPFAIL: 363 case IMSG_MTA_DELIVERY_TEMPFAIL: 364 m_msg(&m, imsg); 365 m_get_evpid(&m, &evpid); 366 m_get_string(&m, &reason); 367 m_get_int(&m, &code); 368 m_end(&m); 369 if (queue_envelope_load(evpid, &evp) == 0) { 370 log_warnx("queue: tempfail: failed to load envelope"); 371 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 372 m_add_evpid(p_scheduler, evpid); 373 m_add_u32(p_scheduler, 1); /* in-flight */ 374 m_close(p_scheduler); 375 return; 376 } 377 envelope_set_errormsg(&evp, "%s", reason); 378 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 379 envelope_set_esc_code(&evp, code); 380 evp.retry++; 381 if (!queue_envelope_update(&evp)) 382 log_warnx("warn: could not update envelope %016"PRIx64, evpid); 383 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1); 384 m_add_envelope(p_scheduler, &evp); 385 m_close(p_scheduler); 386 return; 387 388 case IMSG_MDA_DELIVERY_PERMFAIL: 389 case IMSG_MTA_DELIVERY_PERMFAIL: 390 m_msg(&m, imsg); 391 m_get_evpid(&m, &evpid); 392 m_get_string(&m, &reason); 393 m_get_int(&m, &code); 394 m_end(&m); 395 if (queue_envelope_load(evpid, &evp) == 0) { 396 log_warnx("queue: permfail: failed to load envelope"); 397 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 398 m_add_evpid(p_scheduler, evpid); 399 m_add_u32(p_scheduler, 1); /* in-flight */ 400 m_close(p_scheduler); 401 return; 402 } 403 bounce.type = B_ERROR; 404 envelope_set_errormsg(&evp, "%s", reason); 405 envelope_set_esc_class(&evp, ESC_STATUS_PERMFAIL); 406 envelope_set_esc_code(&evp, code); 407 queue_bounce(&evp, &bounce); 408 queue_envelope_delete(evpid); 409 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1); 410 m_add_evpid(p_scheduler, evpid); 411 m_close(p_scheduler); 412 return; 413 414 case IMSG_MDA_DELIVERY_LOOP: 415 case IMSG_MTA_DELIVERY_LOOP: 416 m_msg(&m, imsg); 417 m_get_evpid(&m, &evpid); 418 m_end(&m); 419 if (queue_envelope_load(evpid, &evp) == 0) { 420 log_warnx("queue: loop: failed to load envelope"); 421 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 422 m_add_evpid(p_scheduler, evpid); 423 m_add_u32(p_scheduler, 1); /* in-flight */ 424 m_close(p_scheduler); 425 return; 426 } 427 envelope_set_errormsg(&evp, "%s", "Loop detected"); 428 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 429 envelope_set_esc_code(&evp, ESC_ROUTING_LOOP_DETECTED); 430 bounce.type = B_ERROR; 431 queue_bounce(&evp, &bounce); 432 queue_envelope_delete(evp.id); 433 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1); 434 m_add_evpid(p_scheduler, evp.id); 435 m_close(p_scheduler); 436 return; 437 438 case IMSG_MTA_DELIVERY_HOLD: 439 case IMSG_MDA_DELIVERY_HOLD: 440 imsg->hdr.type = IMSG_QUEUE_HOLDQ_HOLD; 441 m_forward(p_scheduler, imsg); 442 return; 443 444 case IMSG_MTA_SCHEDULE: 445 imsg->hdr.type = IMSG_QUEUE_ENVELOPE_SCHEDULE; 446 m_forward(p_scheduler, imsg); 447 return; 448 449 case IMSG_MTA_HOLDQ_RELEASE: 450 case IMSG_MDA_HOLDQ_RELEASE: 451 m_msg(&m, imsg); 452 m_get_id(&m, &holdq); 453 m_get_int(&m, &v); 454 m_end(&m); 455 m_create(p_scheduler, IMSG_QUEUE_HOLDQ_RELEASE, 0, 0, -1); 456 if (imsg->hdr.type == IMSG_MTA_HOLDQ_RELEASE) 457 m_add_int(p_scheduler, D_MTA); 458 else 459 m_add_int(p_scheduler, D_MDA); 460 m_add_id(p_scheduler, holdq); 461 m_add_int(p_scheduler, v); 462 m_close(p_scheduler); 463 return; 464 465 case IMSG_CTL_PAUSE_MDA: 466 case IMSG_CTL_PAUSE_MTA: 467 case IMSG_CTL_RESUME_MDA: 468 case IMSG_CTL_RESUME_MTA: 469 m_forward(p_scheduler, imsg); 470 return; 471 472 case IMSG_CTL_VERBOSE: 473 m_msg(&m, imsg); 474 m_get_int(&m, &v); 475 m_end(&m); 476 log_trace_verbose(v); 477 return; 478 479 case IMSG_CTL_PROFILE: 480 m_msg(&m, imsg); 481 m_get_int(&m, &v); 482 m_end(&m); 483 profiling = v; 484 return; 485 486 case IMSG_CTL_DISCOVER_EVPID: 487 m_msg(&m, imsg); 488 m_get_evpid(&m, &evpid); 489 m_end(&m); 490 if (queue_envelope_load(evpid, &evp) == 0) { 491 log_warnx("queue: discover: failed to load " 492 "envelope %016" PRIx64, evpid); 493 n_evp = 0; 494 m_compose(p_control, imsg->hdr.type, 495 imsg->hdr.peerid, 0, -1, 496 &n_evp, sizeof n_evp); 497 return; 498 } 499 500 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 501 0, 0, -1); 502 m_add_envelope(p_scheduler, &evp); 503 m_close(p_scheduler); 504 505 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID, 506 0, 0, -1); 507 m_add_msgid(p_scheduler, evpid_to_msgid(evpid)); 508 m_close(p_scheduler); 509 n_evp = 1; 510 m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid, 511 0, -1, &n_evp, sizeof n_evp); 512 return; 513 514 case IMSG_CTL_DISCOVER_MSGID: 515 m_msg(&m, imsg); 516 m_get_msgid(&m, &msgid); 517 m_end(&m); 518 /* handle concurrent walk requests */ 519 wi = xcalloc(1, sizeof *wi); 520 wi->msgid = msgid; 521 wi->peerid = imsg->hdr.peerid; 522 evtimer_set(&wi->ev, queue_msgid_walk, wi); 523 tv.tv_sec = 0; 524 tv.tv_usec = 10; 525 evtimer_add(&wi->ev, &tv); 526 return; 527 } 528 529 errx(1, "queue_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type)); 530 } 531 532 static void 533 queue_msgid_walk(int fd, short event, void *arg) 534 { 535 struct envelope evp; 536 struct timeval tv; 537 struct msg_walkinfo *wi = arg; 538 int r; 539 540 r = queue_message_walk(&evp, wi->msgid, &wi->done, &wi->data); 541 if (r == -1) { 542 if (wi->n_evp) { 543 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID, 544 0, 0, -1); 545 m_add_msgid(p_scheduler, wi->msgid); 546 m_close(p_scheduler); 547 } 548 549 m_compose(p_control, IMSG_CTL_DISCOVER_MSGID, wi->peerid, 0, -1, 550 &wi->n_evp, sizeof wi->n_evp); 551 evtimer_del(&wi->ev); 552 free(wi); 553 return; 554 } 555 556 if (r) { 557 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 0, 0, -1); 558 m_add_envelope(p_scheduler, &evp); 559 m_close(p_scheduler); 560 wi->n_evp += 1; 561 } 562 563 tv.tv_sec = 0; 564 tv.tv_usec = 10; 565 evtimer_set(&wi->ev, queue_msgid_walk, wi); 566 evtimer_add(&wi->ev, &tv); 567 } 568 569 static void 570 queue_bounce(struct envelope *e, struct delivery_bounce *d) 571 { 572 struct envelope b; 573 574 b = *e; 575 b.type = D_BOUNCE; 576 b.agent.bounce = *d; 577 b.retry = 0; 578 b.lasttry = 0; 579 b.creation = time(NULL); 580 b.ttl = 3600 * 24 * 7; 581 582 if (e->dsn_notify & DSN_NEVER) 583 return; 584 585 if (b.id == 0) 586 log_warnx("warn: queue_bounce: evpid=0"); 587 if (evpid_to_msgid(b.id) == 0) 588 log_warnx("warn: queue_bounce: msgid=0, evpid=%016"PRIx64, 589 b.id); 590 if (e->type == D_BOUNCE) { 591 log_warnx("warn: queue: double bounce!"); 592 } else if (e->sender.user[0] == '\0') { 593 log_warnx("warn: queue: no return path!"); 594 } else if (!queue_envelope_create(&b)) { 595 log_warnx("warn: queue: cannot bounce!"); 596 } else { 597 log_debug("debug: queue: bouncing evp:%016" PRIx64 598 " as evp:%016" PRIx64, e->id, b.id); 599 600 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 601 m_add_envelope(p_scheduler, &b); 602 m_close(p_scheduler); 603 604 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 0, 0, -1); 605 m_add_msgid(p_scheduler, evpid_to_msgid(b.id)); 606 m_close(p_scheduler); 607 608 stat_increment("queue.bounce", 1); 609 } 610 } 611 612 static void 613 queue_shutdown(void) 614 { 615 log_debug("debug: queue agent exiting"); 616 queue_close(); 617 _exit(0); 618 } 619 620 int 621 queue(void) 622 { 623 struct passwd *pw; 624 struct timeval tv; 625 struct event ev_qload; 626 627 purge_config(PURGE_EVERYTHING & ~PURGE_DISPATCHERS); 628 629 if ((pw = getpwnam(SMTPD_QUEUE_USER)) == NULL) 630 if ((pw = getpwnam(SMTPD_USER)) == NULL) 631 fatalx("unknown user " SMTPD_USER); 632 633 env->sc_queue_flags |= QUEUE_EVPCACHE; 634 env->sc_queue_evpcache_size = 1024; 635 636 if (chroot(PATH_SPOOL) == -1) 637 fatal("queue: chroot"); 638 if (chdir("/") == -1) 639 fatal("queue: chdir(\"/\")"); 640 641 config_process(PROC_QUEUE); 642 643 if (env->sc_queue_flags & QUEUE_COMPRESSION) 644 log_info("queue: queue compression enabled"); 645 646 if (env->sc_queue_key) { 647 if (!crypto_setup(env->sc_queue_key, strlen(env->sc_queue_key))) 648 fatalx("crypto_setup: invalid key for queue encryption"); 649 log_info("queue: queue encryption enabled"); 650 } 651 652 if (setgroups(1, &pw->pw_gid) || 653 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 654 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 655 fatal("queue: cannot drop privileges"); 656 657 imsg_callback = queue_imsg; 658 event_init(); 659 660 signal(SIGINT, SIG_IGN); 661 signal(SIGTERM, SIG_IGN); 662 signal(SIGPIPE, SIG_IGN); 663 signal(SIGHUP, SIG_IGN); 664 665 config_peer(PROC_PARENT); 666 config_peer(PROC_CONTROL); 667 config_peer(PROC_LKA); 668 config_peer(PROC_SCHEDULER); 669 config_peer(PROC_PONY); 670 671 /* setup queue loading task */ 672 evtimer_set(&ev_qload, queue_timeout, &ev_qload); 673 tv.tv_sec = 0; 674 tv.tv_usec = 10; 675 evtimer_add(&ev_qload, &tv); 676 677 if (pledge("stdio rpath wpath cpath flock recvfd sendfd", NULL) == -1) 678 err(1, "pledge"); 679 680 event_dispatch(); 681 fatalx("exited event loop"); 682 683 return (0); 684 } 685 686 static void 687 queue_timeout(int fd, short event, void *p) 688 { 689 static uint32_t msgid = 0; 690 struct dispatcher *dsp; 691 struct envelope evp; 692 struct event *ev = p; 693 struct timeval tv; 694 int r; 695 696 r = queue_envelope_walk(&evp); 697 if (r == -1) { 698 if (msgid) { 699 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 700 0, 0, -1); 701 m_add_msgid(p_scheduler, msgid); 702 m_close(p_scheduler); 703 } 704 log_debug("debug: queue: done loading queue into scheduler"); 705 return; 706 } 707 708 if (r) { 709 dsp = dict_get(env->sc_dispatchers, evp.dispatcher); 710 if (dsp == NULL) { 711 log_warnx("warn: queue: missing dispatcher \"%s\"" 712 " for envelope %016"PRIx64", ignoring", 713 evp.dispatcher, evp.id); 714 goto reset; 715 } 716 if (msgid && evpid_to_msgid(evp.id) != msgid) { 717 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 718 0, 0, -1); 719 m_add_msgid(p_scheduler, msgid); 720 m_close(p_scheduler); 721 } 722 msgid = evpid_to_msgid(evp.id); 723 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 724 m_add_envelope(p_scheduler, &evp); 725 m_close(p_scheduler); 726 } 727 728 reset: 729 tv.tv_sec = 0; 730 tv.tv_usec = 10; 731 evtimer_add(ev, &tv); 732 } 733 734 static void 735 queue_log(const struct envelope *e, const char *prefix, const char *status) 736 { 737 char rcpt[LINE_MAX]; 738 739 (void)strlcpy(rcpt, "-", sizeof rcpt); 740 if (strcmp(e->rcpt.user, e->dest.user) || 741 strcmp(e->rcpt.domain, e->dest.domain)) 742 (void)snprintf(rcpt, sizeof rcpt, "%s@%s", 743 e->rcpt.user, e->rcpt.domain); 744 745 log_info("%s: %s for %016" PRIx64 ": from=<%s@%s>, to=<%s@%s>, " 746 "rcpt=<%s>, delay=%s, stat=%s", 747 e->type == D_MDA ? "delivery" : "relay", 748 prefix, 749 e->id, e->sender.user, e->sender.domain, 750 e->dest.user, e->dest.domain, 751 rcpt, 752 duration_to_text(time(NULL) - e->creation), 753 status); 754 } 755