1 /* $OpenBSD: queue.c,v 1.182 2016/09/08 12:06:43 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 7 * 8 * Permission to use, copy, modify, and distribute this software for any 9 * purpose with or without fee is hereby granted, provided that the above 10 * copyright notice and this permission notice appear in all copies. 11 * 12 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 13 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 14 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 15 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 16 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 17 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 18 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 19 */ 20 21 #include <sys/types.h> 22 #include <sys/queue.h> 23 #include <sys/tree.h> 24 #include <sys/socket.h> 25 #include <sys/stat.h> 26 27 #include <err.h> 28 #include <event.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <libgen.h> 32 #include <pwd.h> 33 #include <signal.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <time.h> 38 #include <unistd.h> 39 #include <limits.h> 40 41 #include "smtpd.h" 42 #include "log.h" 43 44 static void queue_imsg(struct mproc *, struct imsg *); 45 static void queue_timeout(int, short, void *); 46 static void queue_bounce(struct envelope *, struct delivery_bounce *); 47 static void queue_shutdown(void); 48 static void queue_log(const struct envelope *, const char *, const char *); 49 static void queue_msgid_walk(int, short, void *); 50 51 52 static void 53 queue_imsg(struct mproc *p, struct imsg *imsg) 54 { 55 struct delivery_bounce bounce; 56 struct msg_walkinfo *wi; 57 struct timeval tv; 58 struct bounce_req_msg *req_bounce; 59 struct envelope evp; 60 struct msg m; 61 const char *reason; 62 uint64_t reqid, evpid, holdq; 63 uint32_t msgid; 64 time_t nexttry; 65 size_t n_evp; 66 int fd, mta_ext, ret, v, flags, code; 67 68 if (imsg == NULL) 69 queue_shutdown(); 70 71 memset(&bounce, 0, sizeof(struct delivery_bounce)); 72 if (p->proc == PROC_PONY) { 73 74 switch (imsg->hdr.type) { 75 case IMSG_SMTP_MESSAGE_CREATE: 76 m_msg(&m, imsg); 77 m_get_id(&m, &reqid); 78 m_end(&m); 79 80 ret = queue_message_create(&msgid); 81 82 m_create(p, IMSG_SMTP_MESSAGE_CREATE, 0, 0, -1); 83 m_add_id(p, reqid); 84 if (ret == 0) 85 m_add_int(p, 0); 86 else { 87 m_add_int(p, 1); 88 m_add_msgid(p, msgid); 89 } 90 m_close(p); 91 return; 92 93 case IMSG_SMTP_MESSAGE_ROLLBACK: 94 m_msg(&m, imsg); 95 m_get_msgid(&m, &msgid); 96 m_end(&m); 97 98 queue_message_delete(msgid); 99 100 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_ROLLBACK, 101 0, 0, -1); 102 m_add_msgid(p_scheduler, msgid); 103 m_close(p_scheduler); 104 return; 105 106 case IMSG_SMTP_MESSAGE_COMMIT: 107 m_msg(&m, imsg); 108 m_get_id(&m, &reqid); 109 m_get_msgid(&m, &msgid); 110 m_end(&m); 111 112 ret = queue_message_commit(msgid); 113 114 m_create(p, IMSG_SMTP_MESSAGE_COMMIT, 0, 0, -1); 115 m_add_id(p, reqid); 116 m_add_int(p, (ret == 0) ? 0 : 1); 117 m_close(p); 118 119 if (ret) { 120 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 121 0, 0, -1); 122 m_add_msgid(p_scheduler, msgid); 123 m_close(p_scheduler); 124 } 125 return; 126 127 case IMSG_SMTP_MESSAGE_OPEN: 128 m_msg(&m, imsg); 129 m_get_id(&m, &reqid); 130 m_get_msgid(&m, &msgid); 131 m_end(&m); 132 133 fd = queue_message_fd_rw(msgid); 134 135 m_create(p, IMSG_SMTP_MESSAGE_OPEN, 0, 0, fd); 136 m_add_id(p, reqid); 137 m_add_int(p, (fd == -1) ? 0 : 1); 138 m_close(p); 139 return; 140 141 case IMSG_QUEUE_SMTP_SESSION: 142 bounce_fd(imsg->fd); 143 return; 144 } 145 } 146 147 if (p->proc == PROC_LKA) { 148 switch (imsg->hdr.type) { 149 case IMSG_LKA_ENVELOPE_SUBMIT: 150 m_msg(&m, imsg); 151 m_get_id(&m, &reqid); 152 m_get_envelope(&m, &evp); 153 m_end(&m); 154 155 if (evp.id == 0) 156 log_warnx("warn: imsg_queue_submit_envelope: evpid=0"); 157 if (evpid_to_msgid(evp.id) == 0) 158 log_warnx("warn: imsg_queue_submit_envelope: msgid=0, " 159 "evpid=%016"PRIx64, evp.id); 160 ret = queue_envelope_create(&evp); 161 m_create(p_pony, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 162 m_add_id(p_pony, reqid); 163 if (ret == 0) 164 m_add_int(p_pony, 0); 165 else { 166 m_add_int(p_pony, 1); 167 m_add_evpid(p_pony, evp.id); 168 } 169 m_close(p_pony); 170 if (ret) { 171 m_create(p_scheduler, 172 IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 173 m_add_envelope(p_scheduler, &evp); 174 m_close(p_scheduler); 175 176 } 177 return; 178 179 case IMSG_LKA_ENVELOPE_COMMIT: 180 m_msg(&m, imsg); 181 m_get_id(&m, &reqid); 182 m_end(&m); 183 m_create(p_pony, IMSG_QUEUE_ENVELOPE_COMMIT, 0, 0, -1); 184 m_add_id(p_pony, reqid); 185 m_add_int(p_pony, 1); 186 m_close(p_pony); 187 return; 188 } 189 } 190 191 if (p->proc == PROC_SCHEDULER) { 192 switch (imsg->hdr.type) { 193 case IMSG_SCHED_ENVELOPE_REMOVE: 194 m_msg(&m, imsg); 195 m_get_evpid(&m, &evpid); 196 m_end(&m); 197 198 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); 199 m_add_evpid(p_scheduler, evpid); 200 m_close(p_scheduler); 201 202 /* already removed by scheduler */ 203 if (queue_envelope_load(evpid, &evp) == 0) 204 return; 205 206 queue_log(&evp, "Remove", "Removed by administrator"); 207 queue_envelope_delete(evpid); 208 return; 209 210 case IMSG_SCHED_ENVELOPE_EXPIRE: 211 m_msg(&m, imsg); 212 m_get_evpid(&m, &evpid); 213 m_end(&m); 214 215 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); 216 m_add_evpid(p_scheduler, evpid); 217 m_close(p_scheduler); 218 219 /* already removed by scheduler*/ 220 if (queue_envelope_load(evpid, &evp) == 0) 221 return; 222 223 bounce.type = B_ERROR; 224 envelope_set_errormsg(&evp, "Envelope expired"); 225 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 226 envelope_set_esc_code(&evp, ESC_DELIVERY_TIME_EXPIRED); 227 queue_bounce(&evp, &bounce); 228 queue_log(&evp, "Expire", "Envelope expired"); 229 queue_envelope_delete(evpid); 230 return; 231 232 case IMSG_SCHED_ENVELOPE_BOUNCE: 233 CHECK_IMSG_DATA_SIZE(imsg, sizeof *req_bounce); 234 req_bounce = imsg->data; 235 evpid = req_bounce->evpid; 236 237 if (queue_envelope_load(evpid, &evp) == 0) { 238 log_warnx("queue: bounce: failed to load envelope"); 239 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 240 m_add_evpid(p_scheduler, evpid); 241 m_add_u32(p_scheduler, 0); /* not in-flight */ 242 m_close(p_scheduler); 243 return; 244 } 245 queue_bounce(&evp, &req_bounce->bounce); 246 evp.lastbounce = req_bounce->timestamp; 247 if (!queue_envelope_update(&evp)) 248 log_warnx("warn: could not update envelope %016"PRIx64, evpid); 249 return; 250 251 case IMSG_SCHED_ENVELOPE_DELIVER: 252 m_msg(&m, imsg); 253 m_get_evpid(&m, &evpid); 254 m_end(&m); 255 if (queue_envelope_load(evpid, &evp) == 0) { 256 log_warnx("queue: deliver: failed to load envelope"); 257 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 258 m_add_evpid(p_scheduler, evpid); 259 m_add_u32(p_scheduler, 1); /* in-flight */ 260 m_close(p_scheduler); 261 return; 262 } 263 evp.lasttry = time(NULL); 264 m_create(p_pony, IMSG_QUEUE_DELIVER, 0, 0, -1); 265 m_add_envelope(p_pony, &evp); 266 m_close(p_pony); 267 return; 268 269 case IMSG_SCHED_ENVELOPE_INJECT: 270 m_msg(&m, imsg); 271 m_get_evpid(&m, &evpid); 272 m_end(&m); 273 bounce_add(evpid); 274 return; 275 276 case IMSG_SCHED_ENVELOPE_TRANSFER: 277 m_msg(&m, imsg); 278 m_get_evpid(&m, &evpid); 279 m_end(&m); 280 if (queue_envelope_load(evpid, &evp) == 0) { 281 log_warnx("queue: failed to load envelope"); 282 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 283 m_add_evpid(p_scheduler, evpid); 284 m_add_u32(p_scheduler, 1); /* in-flight */ 285 m_close(p_scheduler); 286 return; 287 } 288 evp.lasttry = time(NULL); 289 m_create(p_pony, IMSG_QUEUE_TRANSFER, 0, 0, -1); 290 m_add_envelope(p_pony, &evp); 291 m_close(p_pony); 292 return; 293 294 case IMSG_CTL_LIST_ENVELOPES: 295 if (imsg->hdr.len == sizeof imsg->hdr) { 296 m_forward(p_control, imsg); 297 return; 298 } 299 300 m_msg(&m, imsg); 301 m_get_evpid(&m, &evpid); 302 m_get_int(&m, &flags); 303 m_get_time(&m, &nexttry); 304 m_end(&m); 305 306 if (queue_envelope_load(evpid, &evp) == 0) 307 return; /* Envelope is gone, drop it */ 308 309 /* 310 * XXX consistency: The envelope might already be on 311 * its way back to the scheduler. We need to detect 312 * this properly and report that state. 313 */ 314 if (flags & EF_INFLIGHT) { 315 /* 316 * Not exactly correct but pretty close: The 317 * value is not recorded on the envelope unless 318 * a tempfail occurs. 319 */ 320 evp.lasttry = nexttry; 321 } 322 323 m_create(p_control, IMSG_CTL_LIST_ENVELOPES, 324 imsg->hdr.peerid, 0, -1); 325 m_add_int(p_control, flags); 326 m_add_time(p_control, nexttry); 327 m_add_envelope(p_control, &evp); 328 m_close(p_control); 329 return; 330 } 331 } 332 333 if (p->proc == PROC_PONY) { 334 switch (imsg->hdr.type) { 335 case IMSG_MDA_OPEN_MESSAGE: 336 case IMSG_MTA_OPEN_MESSAGE: 337 m_msg(&m, imsg); 338 m_get_id(&m, &reqid); 339 m_get_msgid(&m, &msgid); 340 m_end(&m); 341 fd = queue_message_fd_r(msgid); 342 m_create(p, imsg->hdr.type, 0, 0, fd); 343 m_add_id(p, reqid); 344 m_close(p); 345 return; 346 347 case IMSG_MDA_DELIVERY_OK: 348 case IMSG_MTA_DELIVERY_OK: 349 m_msg(&m, imsg); 350 m_get_evpid(&m, &evpid); 351 if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK) 352 m_get_int(&m, &mta_ext); 353 m_end(&m); 354 if (queue_envelope_load(evpid, &evp) == 0) { 355 log_warn("queue: dsn: failed to load envelope"); 356 return; 357 } 358 if (evp.dsn_notify & DSN_SUCCESS) { 359 bounce.type = B_DSN; 360 bounce.dsn_ret = evp.dsn_ret; 361 envelope_set_esc_class(&evp, ESC_STATUS_OK); 362 if (imsg->hdr.type == IMSG_MDA_DELIVERY_OK) 363 queue_bounce(&evp, &bounce); 364 else if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK && 365 (mta_ext & MTA_EXT_DSN) == 0) { 366 bounce.mta_without_dsn = 1; 367 queue_bounce(&evp, &bounce); 368 } 369 } 370 queue_envelope_delete(evpid); 371 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_OK, 0, 0, -1); 372 m_add_evpid(p_scheduler, evpid); 373 m_close(p_scheduler); 374 return; 375 376 case IMSG_MDA_DELIVERY_TEMPFAIL: 377 case IMSG_MTA_DELIVERY_TEMPFAIL: 378 m_msg(&m, imsg); 379 m_get_evpid(&m, &evpid); 380 m_get_string(&m, &reason); 381 m_get_int(&m, &code); 382 m_end(&m); 383 if (queue_envelope_load(evpid, &evp) == 0) { 384 log_warnx("queue: tempfail: failed to load envelope"); 385 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 386 m_add_evpid(p_scheduler, evpid); 387 m_add_u32(p_scheduler, 1); /* in-flight */ 388 m_close(p_scheduler); 389 return; 390 } 391 envelope_set_errormsg(&evp, "%s", reason); 392 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 393 envelope_set_esc_code(&evp, code); 394 evp.retry++; 395 if (!queue_envelope_update(&evp)) 396 log_warnx("warn: could not update envelope %016"PRIx64, evpid); 397 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1); 398 m_add_envelope(p_scheduler, &evp); 399 m_close(p_scheduler); 400 return; 401 402 case IMSG_MDA_DELIVERY_PERMFAIL: 403 case IMSG_MTA_DELIVERY_PERMFAIL: 404 m_msg(&m, imsg); 405 m_get_evpid(&m, &evpid); 406 m_get_string(&m, &reason); 407 m_get_int(&m, &code); 408 m_end(&m); 409 if (queue_envelope_load(evpid, &evp) == 0) { 410 log_warnx("queue: permfail: failed to load envelope"); 411 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 412 m_add_evpid(p_scheduler, evpid); 413 m_add_u32(p_scheduler, 1); /* in-flight */ 414 m_close(p_scheduler); 415 return; 416 } 417 bounce.type = B_ERROR; 418 envelope_set_errormsg(&evp, "%s", reason); 419 envelope_set_esc_class(&evp, ESC_STATUS_PERMFAIL); 420 envelope_set_esc_code(&evp, code); 421 queue_bounce(&evp, &bounce); 422 queue_envelope_delete(evpid); 423 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1); 424 m_add_evpid(p_scheduler, evpid); 425 m_close(p_scheduler); 426 return; 427 428 case IMSG_MDA_DELIVERY_LOOP: 429 case IMSG_MTA_DELIVERY_LOOP: 430 m_msg(&m, imsg); 431 m_get_evpid(&m, &evpid); 432 m_end(&m); 433 if (queue_envelope_load(evpid, &evp) == 0) { 434 log_warnx("queue: loop: failed to load envelope"); 435 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 436 m_add_evpid(p_scheduler, evpid); 437 m_add_u32(p_scheduler, 1); /* in-flight */ 438 m_close(p_scheduler); 439 return; 440 } 441 envelope_set_errormsg(&evp, "%s", "Loop detected"); 442 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 443 envelope_set_esc_code(&evp, ESC_ROUTING_LOOP_DETECTED); 444 bounce.type = B_ERROR; 445 queue_bounce(&evp, &bounce); 446 queue_envelope_delete(evp.id); 447 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1); 448 m_add_evpid(p_scheduler, evp.id); 449 m_close(p_scheduler); 450 return; 451 452 case IMSG_MTA_DELIVERY_HOLD: 453 case IMSG_MDA_DELIVERY_HOLD: 454 imsg->hdr.type = IMSG_QUEUE_HOLDQ_HOLD; 455 m_forward(p_scheduler, imsg); 456 return; 457 458 case IMSG_MTA_SCHEDULE: 459 imsg->hdr.type = IMSG_QUEUE_ENVELOPE_SCHEDULE; 460 m_forward(p_scheduler, imsg); 461 return; 462 463 case IMSG_MTA_HOLDQ_RELEASE: 464 case IMSG_MDA_HOLDQ_RELEASE: 465 m_msg(&m, imsg); 466 m_get_id(&m, &holdq); 467 m_get_int(&m, &v); 468 m_end(&m); 469 m_create(p_scheduler, IMSG_QUEUE_HOLDQ_RELEASE, 0, 0, -1); 470 if (imsg->hdr.type == IMSG_MTA_HOLDQ_RELEASE) 471 m_add_int(p_scheduler, D_MTA); 472 else 473 m_add_int(p_scheduler, D_MDA); 474 m_add_id(p_scheduler, holdq); 475 m_add_int(p_scheduler, v); 476 m_close(p_scheduler); 477 return; 478 } 479 } 480 481 if (p->proc == PROC_CONTROL) { 482 switch (imsg->hdr.type) { 483 case IMSG_CTL_PAUSE_MDA: 484 case IMSG_CTL_PAUSE_MTA: 485 case IMSG_CTL_RESUME_MDA: 486 case IMSG_CTL_RESUME_MTA: 487 m_forward(p_scheduler, imsg); 488 return; 489 490 case IMSG_CTL_VERBOSE: 491 m_msg(&m, imsg); 492 m_get_int(&m, &v); 493 m_end(&m); 494 log_verbose(v); 495 return; 496 497 case IMSG_CTL_PROFILE: 498 m_msg(&m, imsg); 499 m_get_int(&m, &v); 500 m_end(&m); 501 profiling = v; 502 return; 503 504 case IMSG_CTL_DISCOVER_EVPID: 505 m_msg(&m, imsg); 506 m_get_evpid(&m, &evpid); 507 m_end(&m); 508 if (queue_envelope_load(evpid, &evp) == 0) { 509 log_warnx("queue: discover: failed to load " 510 "envelope %016" PRIx64, evpid); 511 n_evp = 0; 512 m_compose(p_control, imsg->hdr.type, 513 imsg->hdr.peerid, 0, -1, 514 &n_evp, sizeof n_evp); 515 return; 516 } 517 518 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 519 0, 0, -1); 520 m_add_envelope(p_scheduler, &evp); 521 m_close(p_scheduler); 522 523 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID, 524 0, 0, -1); 525 m_add_msgid(p_scheduler, evpid_to_msgid(evpid)); 526 m_close(p_scheduler); 527 n_evp = 1; 528 m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid, 529 0, -1, &n_evp, sizeof n_evp); 530 return; 531 532 case IMSG_CTL_DISCOVER_MSGID: 533 m_msg(&m, imsg); 534 m_get_msgid(&m, &msgid); 535 m_end(&m); 536 /* handle concurrent walk requests */ 537 wi = xcalloc(1, sizeof *wi, "queu_imsg"); 538 wi->msgid = msgid; 539 wi->peerid = imsg->hdr.peerid; 540 evtimer_set(&wi->ev, queue_msgid_walk, wi); 541 tv.tv_sec = 0; 542 tv.tv_usec = 10; 543 evtimer_add(&wi->ev, &tv); 544 return; 545 546 case IMSG_CTL_UNCORRUPT_MSGID: 547 m_msg(&m, imsg); 548 m_get_msgid(&m, &msgid); 549 m_end(&m); 550 ret = queue_message_uncorrupt(msgid); 551 m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid, 552 0, -1, &ret, sizeof ret); 553 return; 554 } 555 } 556 557 errx(1, "queue_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type)); 558 } 559 560 static void 561 queue_msgid_walk(int fd, short event, void *arg) 562 { 563 struct envelope evp; 564 struct timeval tv; 565 struct msg_walkinfo *wi = arg; 566 int r; 567 568 r = queue_message_walk(&evp, wi->msgid, &wi->done, &wi->data); 569 if (r == -1) { 570 if (wi->n_evp) { 571 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID, 572 0, 0, -1); 573 m_add_msgid(p_scheduler, wi->msgid); 574 m_close(p_scheduler); 575 } 576 577 m_compose(p_control, IMSG_CTL_DISCOVER_MSGID, wi->peerid, 0, -1, 578 &wi->n_evp, sizeof wi->n_evp); 579 evtimer_del(&wi->ev); 580 free(wi); 581 return; 582 } 583 584 if (r) { 585 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 0, 0, -1); 586 m_add_envelope(p_scheduler, &evp); 587 m_close(p_scheduler); 588 wi->n_evp += 1; 589 } 590 591 tv.tv_sec = 0; 592 tv.tv_usec = 10; 593 evtimer_set(&wi->ev, queue_msgid_walk, wi); 594 evtimer_add(&wi->ev, &tv); 595 } 596 597 static void 598 queue_bounce(struct envelope *e, struct delivery_bounce *d) 599 { 600 struct envelope b; 601 602 b = *e; 603 b.type = D_BOUNCE; 604 b.agent.bounce = *d; 605 b.retry = 0; 606 b.lasttry = 0; 607 b.creation = time(NULL); 608 b.expire = 3600 * 24 * 7; 609 610 if (e->dsn_notify & DSN_NEVER) 611 return; 612 613 if (b.id == 0) 614 log_warnx("warn: queue_bounce: evpid=0"); 615 if (evpid_to_msgid(b.id) == 0) 616 log_warnx("warn: queue_bounce: msgid=0, evpid=%016"PRIx64, 617 b.id); 618 if (e->type == D_BOUNCE) { 619 log_warnx("warn: queue: double bounce!"); 620 } else if (e->sender.user[0] == '\0') { 621 log_warnx("warn: queue: no return path!"); 622 } else if (!queue_envelope_create(&b)) { 623 log_warnx("warn: queue: cannot bounce!"); 624 } else { 625 log_debug("debug: queue: bouncing evp:%016" PRIx64 626 " as evp:%016" PRIx64, e->id, b.id); 627 628 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 629 m_add_envelope(p_scheduler, &b); 630 m_close(p_scheduler); 631 632 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 0, 0, -1); 633 m_add_msgid(p_scheduler, evpid_to_msgid(b.id)); 634 m_close(p_scheduler); 635 636 stat_increment("queue.bounce", 1); 637 } 638 } 639 640 static void 641 queue_shutdown(void) 642 { 643 log_debug("debug: queue agent exiting"); 644 queue_close(); 645 _exit(0); 646 } 647 648 int 649 queue(void) 650 { 651 struct passwd *pw; 652 struct timeval tv; 653 struct event ev_qload; 654 655 purge_config(PURGE_EVERYTHING); 656 657 if ((pw = getpwnam(SMTPD_QUEUE_USER)) == NULL) 658 if ((pw = getpwnam(SMTPD_USER)) == NULL) 659 fatalx("unknown user " SMTPD_USER); 660 661 env->sc_queue_flags |= QUEUE_EVPCACHE; 662 env->sc_queue_evpcache_size = 1024; 663 664 if (chroot(PATH_SPOOL) == -1) 665 fatal("queue: chroot"); 666 if (chdir("/") == -1) 667 fatal("queue: chdir(\"/\")"); 668 669 config_process(PROC_QUEUE); 670 671 if (env->sc_queue_flags & QUEUE_COMPRESSION) 672 log_info("queue: queue compression enabled"); 673 674 if (env->sc_queue_key) { 675 if (!crypto_setup(env->sc_queue_key, strlen(env->sc_queue_key))) 676 fatalx("crypto_setup: invalid key for queue encryption"); 677 log_info("queue: queue encryption enabled"); 678 } 679 680 if (setgroups(1, &pw->pw_gid) || 681 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 682 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 683 fatal("queue: cannot drop privileges"); 684 685 imsg_callback = queue_imsg; 686 event_init(); 687 688 signal(SIGINT, SIG_IGN); 689 signal(SIGTERM, SIG_IGN); 690 signal(SIGPIPE, SIG_IGN); 691 signal(SIGHUP, SIG_IGN); 692 693 config_peer(PROC_PARENT); 694 config_peer(PROC_CONTROL); 695 config_peer(PROC_LKA); 696 config_peer(PROC_SCHEDULER); 697 config_peer(PROC_PONY); 698 699 /* setup queue loading task */ 700 evtimer_set(&ev_qload, queue_timeout, &ev_qload); 701 tv.tv_sec = 0; 702 tv.tv_usec = 10; 703 evtimer_add(&ev_qload, &tv); 704 705 if (pledge("stdio rpath wpath cpath flock recvfd sendfd", NULL) == -1) 706 err(1, "pledge"); 707 708 event_dispatch(); 709 fatalx("exited event loop"); 710 711 return (0); 712 } 713 714 static void 715 queue_timeout(int fd, short event, void *p) 716 { 717 static uint32_t msgid = 0; 718 struct envelope evp; 719 struct event *ev = p; 720 struct timeval tv; 721 int r; 722 723 r = queue_envelope_walk(&evp); 724 if (r == -1) { 725 if (msgid) { 726 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 727 0, 0, -1); 728 m_add_msgid(p_scheduler, msgid); 729 m_close(p_scheduler); 730 } 731 log_debug("debug: queue: done loading queue into scheduler"); 732 return; 733 } 734 735 if (r) { 736 if (msgid && evpid_to_msgid(evp.id) != msgid) { 737 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 738 0, 0, -1); 739 m_add_msgid(p_scheduler, msgid); 740 m_close(p_scheduler); 741 } 742 msgid = evpid_to_msgid(evp.id); 743 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 744 m_add_envelope(p_scheduler, &evp); 745 m_close(p_scheduler); 746 } 747 748 tv.tv_sec = 0; 749 tv.tv_usec = 10; 750 evtimer_add(ev, &tv); 751 } 752 753 static void 754 queue_log(const struct envelope *e, const char *prefix, const char *status) 755 { 756 char rcpt[LINE_MAX]; 757 758 (void)strlcpy(rcpt, "-", sizeof rcpt); 759 if (strcmp(e->rcpt.user, e->dest.user) || 760 strcmp(e->rcpt.domain, e->dest.domain)) 761 (void)snprintf(rcpt, sizeof rcpt, "%s@%s", 762 e->rcpt.user, e->rcpt.domain); 763 764 log_info("%s: %s for %016" PRIx64 ": from=<%s@%s>, to=<%s@%s>, " 765 "rcpt=<%s>, delay=%s, stat=%s", 766 e->type == D_MDA ? "delivery" : "relay", 767 prefix, 768 e->id, e->sender.user, e->sender.domain, 769 e->dest.user, e->dest.domain, 770 rcpt, 771 duration_to_text(time(NULL) - e->creation), 772 status); 773 } 774