1 /* $OpenBSD: queue.c,v 1.178 2016/05/28 21:21:20 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 7 * 8 * Permission to use, copy, modify, and distribute this software for any 9 * purpose with or without fee is hereby granted, provided that the above 10 * copyright notice and this permission notice appear in all copies. 11 * 12 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 13 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 14 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 15 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 16 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 17 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 18 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 19 */ 20 21 #include <sys/types.h> 22 #include <sys/queue.h> 23 #include <sys/tree.h> 24 #include <sys/socket.h> 25 #include <sys/stat.h> 26 27 #include <err.h> 28 #include <event.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <libgen.h> 32 #include <pwd.h> 33 #include <signal.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <time.h> 38 #include <unistd.h> 39 #include <limits.h> 40 41 #include "smtpd.h" 42 #include "log.h" 43 44 static void queue_imsg(struct mproc *, struct imsg *); 45 static void queue_timeout(int, short, void *); 46 static void queue_bounce(struct envelope *, struct delivery_bounce *); 47 static void queue_shutdown(void); 48 static void queue_sig_handler(int, short, void *); 49 static void queue_log(const struct envelope *, const char *, const char *); 50 static void queue_msgid_walk(int, short, void *); 51 52 static size_t flow_agent_hiwat = 10 * 1024 * 1024; 53 static size_t flow_agent_lowat = 1 * 1024 * 1024; 54 static size_t flow_scheduler_hiwat = 10 * 1024 * 1024; 55 static size_t flow_scheduler_lowat = 1 * 1024 * 1024; 56 57 #define LIMIT_AGENT 0x01 58 #define LIMIT_SCHEDULER 0x02 59 60 static int limit = 0; 61 62 static void 63 queue_imsg(struct mproc *p, struct imsg *imsg) 64 { 65 struct delivery_bounce bounce; 66 struct msg_walkinfo *wi; 67 struct timeval tv; 68 struct bounce_req_msg *req_bounce; 69 struct envelope evp; 70 struct msg m; 71 const char *reason; 72 uint64_t reqid, evpid, holdq; 73 uint32_t msgid; 74 time_t nexttry; 75 size_t n_evp; 76 int fd, mta_ext, ret, v, flags, code; 77 78 memset(&bounce, 0, sizeof(struct delivery_bounce)); 79 if (p->proc == PROC_PONY) { 80 81 switch (imsg->hdr.type) { 82 case IMSG_SMTP_MESSAGE_CREATE: 83 m_msg(&m, imsg); 84 m_get_id(&m, &reqid); 85 m_end(&m); 86 87 ret = queue_message_create(&msgid); 88 89 m_create(p, IMSG_SMTP_MESSAGE_CREATE, 0, 0, -1); 90 m_add_id(p, reqid); 91 if (ret == 0) 92 m_add_int(p, 0); 93 else { 94 m_add_int(p, 1); 95 m_add_msgid(p, msgid); 96 } 97 m_close(p); 98 return; 99 100 case IMSG_SMTP_MESSAGE_ROLLBACK: 101 m_msg(&m, imsg); 102 m_get_msgid(&m, &msgid); 103 m_end(&m); 104 105 queue_message_delete(msgid); 106 107 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_ROLLBACK, 108 0, 0, -1); 109 m_add_msgid(p_scheduler, msgid); 110 m_close(p_scheduler); 111 return; 112 113 case IMSG_SMTP_MESSAGE_COMMIT: 114 m_msg(&m, imsg); 115 m_get_id(&m, &reqid); 116 m_get_msgid(&m, &msgid); 117 m_end(&m); 118 119 ret = queue_message_commit(msgid); 120 121 m_create(p, IMSG_SMTP_MESSAGE_COMMIT, 0, 0, -1); 122 m_add_id(p, reqid); 123 m_add_int(p, (ret == 0) ? 0 : 1); 124 m_close(p); 125 126 if (ret) { 127 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 128 0, 0, -1); 129 m_add_msgid(p_scheduler, msgid); 130 m_close(p_scheduler); 131 } 132 return; 133 134 case IMSG_SMTP_MESSAGE_OPEN: 135 m_msg(&m, imsg); 136 m_get_id(&m, &reqid); 137 m_get_msgid(&m, &msgid); 138 m_end(&m); 139 140 fd = queue_message_fd_rw(msgid); 141 142 m_create(p, IMSG_SMTP_MESSAGE_OPEN, 0, 0, fd); 143 m_add_id(p, reqid); 144 m_add_int(p, (fd == -1) ? 0 : 1); 145 m_close(p); 146 return; 147 148 case IMSG_QUEUE_SMTP_SESSION: 149 bounce_fd(imsg->fd); 150 return; 151 } 152 } 153 154 if (p->proc == PROC_LKA) { 155 switch (imsg->hdr.type) { 156 case IMSG_LKA_ENVELOPE_SUBMIT: 157 m_msg(&m, imsg); 158 m_get_id(&m, &reqid); 159 m_get_envelope(&m, &evp); 160 m_end(&m); 161 162 if (evp.id == 0) 163 log_warnx("warn: imsg_queue_submit_envelope: evpid=0"); 164 if (evpid_to_msgid(evp.id) == 0) 165 log_warnx("warn: imsg_queue_submit_envelope: msgid=0, " 166 "evpid=%016"PRIx64, evp.id); 167 ret = queue_envelope_create(&evp); 168 m_create(p_pony, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 169 m_add_id(p_pony, reqid); 170 if (ret == 0) 171 m_add_int(p_pony, 0); 172 else { 173 m_add_int(p_pony, 1); 174 m_add_evpid(p_pony, evp.id); 175 } 176 m_close(p_pony); 177 if (ret) { 178 m_create(p_scheduler, 179 IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 180 m_add_envelope(p_scheduler, &evp); 181 m_close(p_scheduler); 182 183 } 184 return; 185 186 case IMSG_LKA_ENVELOPE_COMMIT: 187 m_msg(&m, imsg); 188 m_get_id(&m, &reqid); 189 m_end(&m); 190 m_create(p_pony, IMSG_QUEUE_ENVELOPE_COMMIT, 0, 0, -1); 191 m_add_id(p_pony, reqid); 192 m_add_int(p_pony, 1); 193 m_close(p_pony); 194 return; 195 } 196 } 197 198 if (p->proc == PROC_SCHEDULER) { 199 switch (imsg->hdr.type) { 200 case IMSG_SCHED_ENVELOPE_REMOVE: 201 m_msg(&m, imsg); 202 m_get_evpid(&m, &evpid); 203 m_end(&m); 204 205 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); 206 m_add_evpid(p_scheduler, evpid); 207 m_close(p_scheduler); 208 209 /* already removed by scheduler */ 210 if (queue_envelope_load(evpid, &evp) == 0) 211 return; 212 213 queue_log(&evp, "Remove", "Removed by administrator"); 214 queue_envelope_delete(evpid); 215 return; 216 217 case IMSG_SCHED_ENVELOPE_EXPIRE: 218 m_msg(&m, imsg); 219 m_get_evpid(&m, &evpid); 220 m_end(&m); 221 222 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); 223 m_add_evpid(p_scheduler, evpid); 224 m_close(p_scheduler); 225 226 /* already removed by scheduler*/ 227 if (queue_envelope_load(evpid, &evp) == 0) 228 return; 229 230 bounce.type = B_ERROR; 231 envelope_set_errormsg(&evp, "Envelope expired"); 232 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 233 envelope_set_esc_code(&evp, ESC_DELIVERY_TIME_EXPIRED); 234 queue_bounce(&evp, &bounce); 235 queue_log(&evp, "Expire", "Envelope expired"); 236 queue_envelope_delete(evpid); 237 return; 238 239 case IMSG_SCHED_ENVELOPE_BOUNCE: 240 CHECK_IMSG_DATA_SIZE(imsg, sizeof *req_bounce); 241 req_bounce = imsg->data; 242 evpid = req_bounce->evpid; 243 244 if (queue_envelope_load(evpid, &evp) == 0) { 245 log_warnx("queue: bounce: failed to load envelope"); 246 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 247 m_add_evpid(p_scheduler, evpid); 248 m_add_u32(p_scheduler, 0); /* not in-flight */ 249 m_close(p_scheduler); 250 return; 251 } 252 queue_bounce(&evp, &req_bounce->bounce); 253 evp.lastbounce = req_bounce->timestamp; 254 if (!queue_envelope_update(&evp)) 255 log_warnx("warn: could not update envelope %016"PRIx64, evpid); 256 return; 257 258 case IMSG_SCHED_ENVELOPE_DELIVER: 259 m_msg(&m, imsg); 260 m_get_evpid(&m, &evpid); 261 m_end(&m); 262 if (queue_envelope_load(evpid, &evp) == 0) { 263 log_warnx("queue: deliver: failed to load envelope"); 264 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 265 m_add_evpid(p_scheduler, evpid); 266 m_add_u32(p_scheduler, 1); /* in-flight */ 267 m_close(p_scheduler); 268 return; 269 } 270 evp.lasttry = time(NULL); 271 m_create(p_pony, IMSG_QUEUE_DELIVER, 0, 0, -1); 272 m_add_envelope(p_pony, &evp); 273 m_close(p_pony); 274 return; 275 276 case IMSG_SCHED_ENVELOPE_INJECT: 277 m_msg(&m, imsg); 278 m_get_evpid(&m, &evpid); 279 m_end(&m); 280 bounce_add(evpid); 281 return; 282 283 case IMSG_SCHED_ENVELOPE_TRANSFER: 284 m_msg(&m, imsg); 285 m_get_evpid(&m, &evpid); 286 m_end(&m); 287 if (queue_envelope_load(evpid, &evp) == 0) { 288 log_warnx("queue: failed to load envelope"); 289 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 290 m_add_evpid(p_scheduler, evpid); 291 m_add_u32(p_scheduler, 1); /* in-flight */ 292 m_close(p_scheduler); 293 return; 294 } 295 evp.lasttry = time(NULL); 296 m_create(p_pony, IMSG_QUEUE_TRANSFER, 0, 0, -1); 297 m_add_envelope(p_pony, &evp); 298 m_close(p_pony); 299 return; 300 301 case IMSG_CTL_LIST_ENVELOPES: 302 if (imsg->hdr.len == sizeof imsg->hdr) { 303 m_forward(p_control, imsg); 304 return; 305 } 306 307 m_msg(&m, imsg); 308 m_get_evpid(&m, &evpid); 309 m_get_int(&m, &flags); 310 m_get_time(&m, &nexttry); 311 m_end(&m); 312 313 if (queue_envelope_load(evpid, &evp) == 0) 314 return; /* Envelope is gone, drop it */ 315 316 /* 317 * XXX consistency: The envelope might already be on 318 * its way back to the scheduler. We need to detect 319 * this properly and report that state. 320 */ 321 if (flags & EF_INFLIGHT) { 322 /* 323 * Not exactly correct but pretty close: The 324 * value is not recorded on the envelope unless 325 * a tempfail occurs. 326 */ 327 evp.lasttry = nexttry; 328 } 329 330 m_create(p_control, IMSG_CTL_LIST_ENVELOPES, 331 imsg->hdr.peerid, 0, -1); 332 m_add_int(p_control, flags); 333 m_add_time(p_control, nexttry); 334 m_add_envelope(p_control, &evp); 335 m_close(p_control); 336 return; 337 } 338 } 339 340 if (p->proc == PROC_PONY) { 341 switch (imsg->hdr.type) { 342 case IMSG_MDA_OPEN_MESSAGE: 343 case IMSG_MTA_OPEN_MESSAGE: 344 m_msg(&m, imsg); 345 m_get_id(&m, &reqid); 346 m_get_msgid(&m, &msgid); 347 m_end(&m); 348 fd = queue_message_fd_r(msgid); 349 m_create(p, imsg->hdr.type, 0, 0, fd); 350 m_add_id(p, reqid); 351 m_close(p); 352 return; 353 354 case IMSG_MDA_DELIVERY_OK: 355 case IMSG_MTA_DELIVERY_OK: 356 m_msg(&m, imsg); 357 m_get_evpid(&m, &evpid); 358 if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK) 359 m_get_int(&m, &mta_ext); 360 m_end(&m); 361 if (queue_envelope_load(evpid, &evp) == 0) { 362 log_warn("queue: dsn: failed to load envelope"); 363 return; 364 } 365 if (evp.dsn_notify & DSN_SUCCESS) { 366 bounce.type = B_DSN; 367 bounce.dsn_ret = evp.dsn_ret; 368 envelope_set_esc_class(&evp, ESC_STATUS_OK); 369 if (imsg->hdr.type == IMSG_MDA_DELIVERY_OK) 370 queue_bounce(&evp, &bounce); 371 else if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK && 372 (mta_ext & MTA_EXT_DSN) == 0) { 373 bounce.mta_without_dsn = 1; 374 queue_bounce(&evp, &bounce); 375 } 376 } 377 queue_envelope_delete(evpid); 378 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_OK, 0, 0, -1); 379 m_add_evpid(p_scheduler, evpid); 380 m_close(p_scheduler); 381 return; 382 383 case IMSG_MDA_DELIVERY_TEMPFAIL: 384 case IMSG_MTA_DELIVERY_TEMPFAIL: 385 m_msg(&m, imsg); 386 m_get_evpid(&m, &evpid); 387 m_get_string(&m, &reason); 388 m_get_int(&m, &code); 389 m_end(&m); 390 if (queue_envelope_load(evpid, &evp) == 0) { 391 log_warnx("queue: tempfail: failed to load envelope"); 392 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 393 m_add_evpid(p_scheduler, evpid); 394 m_add_u32(p_scheduler, 1); /* in-flight */ 395 m_close(p_scheduler); 396 return; 397 } 398 envelope_set_errormsg(&evp, "%s", reason); 399 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 400 envelope_set_esc_code(&evp, code); 401 evp.retry++; 402 if (!queue_envelope_update(&evp)) 403 log_warnx("warn: could not update envelope %016"PRIx64, evpid); 404 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1); 405 m_add_envelope(p_scheduler, &evp); 406 m_close(p_scheduler); 407 return; 408 409 case IMSG_MDA_DELIVERY_PERMFAIL: 410 case IMSG_MTA_DELIVERY_PERMFAIL: 411 m_msg(&m, imsg); 412 m_get_evpid(&m, &evpid); 413 m_get_string(&m, &reason); 414 m_get_int(&m, &code); 415 m_end(&m); 416 if (queue_envelope_load(evpid, &evp) == 0) { 417 log_warnx("queue: permfail: failed to load envelope"); 418 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 419 m_add_evpid(p_scheduler, evpid); 420 m_add_u32(p_scheduler, 1); /* in-flight */ 421 m_close(p_scheduler); 422 return; 423 } 424 bounce.type = B_ERROR; 425 envelope_set_errormsg(&evp, "%s", reason); 426 envelope_set_esc_class(&evp, ESC_STATUS_PERMFAIL); 427 envelope_set_esc_code(&evp, code); 428 queue_bounce(&evp, &bounce); 429 queue_envelope_delete(evpid); 430 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1); 431 m_add_evpid(p_scheduler, evpid); 432 m_close(p_scheduler); 433 return; 434 435 case IMSG_MDA_DELIVERY_LOOP: 436 case IMSG_MTA_DELIVERY_LOOP: 437 m_msg(&m, imsg); 438 m_get_evpid(&m, &evpid); 439 m_end(&m); 440 if (queue_envelope_load(evpid, &evp) == 0) { 441 log_warnx("queue: loop: failed to load envelope"); 442 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 443 m_add_evpid(p_scheduler, evpid); 444 m_add_u32(p_scheduler, 1); /* in-flight */ 445 m_close(p_scheduler); 446 return; 447 } 448 envelope_set_errormsg(&evp, "%s", "Loop detected"); 449 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 450 envelope_set_esc_code(&evp, ESC_ROUTING_LOOP_DETECTED); 451 bounce.type = B_ERROR; 452 queue_bounce(&evp, &bounce); 453 queue_envelope_delete(evp.id); 454 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1); 455 m_add_evpid(p_scheduler, evp.id); 456 m_close(p_scheduler); 457 return; 458 459 case IMSG_MTA_DELIVERY_HOLD: 460 case IMSG_MDA_DELIVERY_HOLD: 461 imsg->hdr.type = IMSG_QUEUE_HOLDQ_HOLD; 462 m_forward(p_scheduler, imsg); 463 return; 464 465 case IMSG_MTA_SCHEDULE: 466 imsg->hdr.type = IMSG_QUEUE_ENVELOPE_SCHEDULE; 467 m_forward(p_scheduler, imsg); 468 return; 469 470 case IMSG_MTA_HOLDQ_RELEASE: 471 case IMSG_MDA_HOLDQ_RELEASE: 472 m_msg(&m, imsg); 473 m_get_id(&m, &holdq); 474 m_get_int(&m, &v); 475 m_end(&m); 476 m_create(p_scheduler, IMSG_QUEUE_HOLDQ_RELEASE, 0, 0, -1); 477 if (imsg->hdr.type == IMSG_MTA_HOLDQ_RELEASE) 478 m_add_int(p_scheduler, D_MTA); 479 else 480 m_add_int(p_scheduler, D_MDA); 481 m_add_id(p_scheduler, holdq); 482 m_add_int(p_scheduler, v); 483 m_close(p_scheduler); 484 return; 485 } 486 } 487 488 if (p->proc == PROC_CONTROL) { 489 switch (imsg->hdr.type) { 490 case IMSG_CTL_PAUSE_MDA: 491 case IMSG_CTL_PAUSE_MTA: 492 case IMSG_CTL_RESUME_MDA: 493 case IMSG_CTL_RESUME_MTA: 494 m_forward(p_scheduler, imsg); 495 return; 496 497 case IMSG_CTL_VERBOSE: 498 m_msg(&m, imsg); 499 m_get_int(&m, &v); 500 m_end(&m); 501 log_verbose(v); 502 return; 503 504 case IMSG_CTL_PROFILE: 505 m_msg(&m, imsg); 506 m_get_int(&m, &v); 507 m_end(&m); 508 profiling = v; 509 return; 510 511 case IMSG_CTL_DISCOVER_EVPID: 512 m_msg(&m, imsg); 513 m_get_evpid(&m, &evpid); 514 m_end(&m); 515 if (queue_envelope_load(evpid, &evp) == 0) { 516 log_warnx("queue: discover: failed to load " 517 "envelope %016" PRIx64, evpid); 518 n_evp = 0; 519 m_compose(p_control, imsg->hdr.type, 520 imsg->hdr.peerid, 0, -1, 521 &n_evp, sizeof n_evp); 522 return; 523 } 524 525 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 526 0, 0, -1); 527 m_add_envelope(p_scheduler, &evp); 528 m_close(p_scheduler); 529 530 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID, 531 0, 0, -1); 532 m_add_msgid(p_scheduler, evpid_to_msgid(evpid)); 533 m_close(p_scheduler); 534 n_evp = 1; 535 m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid, 536 0, -1, &n_evp, sizeof n_evp); 537 return; 538 539 case IMSG_CTL_DISCOVER_MSGID: 540 m_msg(&m, imsg); 541 m_get_msgid(&m, &msgid); 542 m_end(&m); 543 /* handle concurrent walk requests */ 544 wi = xcalloc(1, sizeof *wi, "queu_imsg"); 545 wi->msgid = msgid; 546 wi->peerid = imsg->hdr.peerid; 547 evtimer_set(&wi->ev, queue_msgid_walk, wi); 548 tv.tv_sec = 0; 549 tv.tv_usec = 10; 550 evtimer_add(&wi->ev, &tv); 551 return; 552 553 case IMSG_CTL_UNCORRUPT_MSGID: 554 m_msg(&m, imsg); 555 m_get_msgid(&m, &msgid); 556 m_end(&m); 557 ret = queue_message_uncorrupt(msgid); 558 m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid, 559 0, -1, &ret, sizeof ret); 560 return; 561 } 562 } 563 564 errx(1, "queue_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type)); 565 } 566 567 static void 568 queue_msgid_walk(int fd, short event, void *arg) 569 { 570 struct envelope evp; 571 struct timeval tv; 572 struct msg_walkinfo *wi = arg; 573 int r; 574 575 r = queue_message_walk(&evp, wi->msgid, &wi->done, &wi->data); 576 if (r == -1) { 577 if (wi->n_evp) { 578 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID, 579 0, 0, -1); 580 m_add_msgid(p_scheduler, wi->msgid); 581 m_close(p_scheduler); 582 } 583 584 m_compose(p_control, IMSG_CTL_DISCOVER_MSGID, wi->peerid, 0, -1, 585 &wi->n_evp, sizeof wi->n_evp); 586 evtimer_del(&wi->ev); 587 free(wi); 588 return; 589 } 590 591 if (r) { 592 m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 0, 0, -1); 593 m_add_envelope(p_scheduler, &evp); 594 m_close(p_scheduler); 595 wi->n_evp += 1; 596 } 597 598 tv.tv_sec = 0; 599 tv.tv_usec = 10; 600 evtimer_set(&wi->ev, queue_msgid_walk, wi); 601 evtimer_add(&wi->ev, &tv); 602 } 603 604 static void 605 queue_bounce(struct envelope *e, struct delivery_bounce *d) 606 { 607 struct envelope b; 608 609 b = *e; 610 b.type = D_BOUNCE; 611 b.agent.bounce = *d; 612 b.retry = 0; 613 b.lasttry = 0; 614 b.creation = time(NULL); 615 b.expire = 3600 * 24 * 7; 616 617 if (e->dsn_notify & DSN_NEVER) 618 return; 619 620 if (b.id == 0) 621 log_warnx("warn: queue_bounce: evpid=0"); 622 if (evpid_to_msgid(b.id) == 0) 623 log_warnx("warn: queue_bounce: msgid=0, evpid=%016"PRIx64, 624 b.id); 625 if (e->type == D_BOUNCE) { 626 log_warnx("warn: queue: double bounce!"); 627 } else if (e->sender.user[0] == '\0') { 628 log_warnx("warn: queue: no return path!"); 629 } else if (!queue_envelope_create(&b)) { 630 log_warnx("warn: queue: cannot bounce!"); 631 } else { 632 log_debug("debug: queue: bouncing evp:%016" PRIx64 633 " as evp:%016" PRIx64, e->id, b.id); 634 635 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 636 m_add_envelope(p_scheduler, &b); 637 m_close(p_scheduler); 638 639 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 0, 0, -1); 640 m_add_msgid(p_scheduler, evpid_to_msgid(b.id)); 641 m_close(p_scheduler); 642 643 stat_increment("queue.bounce", 1); 644 } 645 } 646 647 static void 648 queue_sig_handler(int sig, short event, void *p) 649 { 650 switch (sig) { 651 case SIGINT: 652 case SIGTERM: 653 queue_shutdown(); 654 break; 655 default: 656 fatalx("queue_sig_handler: unexpected signal"); 657 } 658 } 659 660 static void 661 queue_shutdown(void) 662 { 663 log_info("info: queue handler exiting"); 664 queue_close(); 665 _exit(0); 666 } 667 668 int 669 queue(void) 670 { 671 struct passwd *pw; 672 struct timeval tv; 673 struct event ev_qload; 674 struct event ev_sigint; 675 struct event ev_sigterm; 676 677 purge_config(PURGE_EVERYTHING); 678 679 if ((pw = getpwnam(SMTPD_QUEUE_USER)) == NULL) 680 if ((pw = getpwnam(SMTPD_USER)) == NULL) 681 fatalx("unknown user " SMTPD_USER); 682 683 env->sc_queue_flags |= QUEUE_EVPCACHE; 684 env->sc_queue_evpcache_size = 1024; 685 686 if (chroot(PATH_SPOOL) == -1) 687 fatal("queue: chroot"); 688 if (chdir("/") == -1) 689 fatal("queue: chdir(\"/\")"); 690 691 config_process(PROC_QUEUE); 692 693 if (env->sc_queue_flags & QUEUE_COMPRESSION) 694 log_info("queue: queue compression enabled"); 695 696 if (env->sc_queue_key) { 697 if (!crypto_setup(env->sc_queue_key, strlen(env->sc_queue_key))) 698 fatalx("crypto_setup: invalid key for queue encryption"); 699 log_info("queue: queue encryption enabled"); 700 } 701 702 if (setgroups(1, &pw->pw_gid) || 703 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 704 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 705 fatal("queue: cannot drop privileges"); 706 707 imsg_callback = queue_imsg; 708 event_init(); 709 710 signal_set(&ev_sigint, SIGINT, queue_sig_handler, NULL); 711 signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, NULL); 712 signal_add(&ev_sigint, NULL); 713 signal_add(&ev_sigterm, NULL); 714 signal(SIGPIPE, SIG_IGN); 715 signal(SIGHUP, SIG_IGN); 716 717 config_peer(PROC_PARENT); 718 config_peer(PROC_CONTROL); 719 config_peer(PROC_LKA); 720 config_peer(PROC_SCHEDULER); 721 config_peer(PROC_PONY); 722 config_done(); 723 724 /* setup queue loading task */ 725 evtimer_set(&ev_qload, queue_timeout, &ev_qload); 726 tv.tv_sec = 0; 727 tv.tv_usec = 10; 728 evtimer_add(&ev_qload, &tv); 729 730 if (pledge("stdio rpath wpath cpath flock recvfd sendfd", NULL) == -1) 731 err(1, "pledge"); 732 733 if (event_dispatch() < 0) 734 fatal("event_dispatch"); 735 queue_shutdown(); 736 737 return (0); 738 } 739 740 static void 741 queue_timeout(int fd, short event, void *p) 742 { 743 static uint32_t msgid = 0; 744 struct envelope evp; 745 struct event *ev = p; 746 struct timeval tv; 747 int r; 748 749 r = queue_envelope_walk(&evp); 750 if (r == -1) { 751 if (msgid) { 752 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 753 0, 0, -1); 754 m_add_msgid(p_scheduler, msgid); 755 m_close(p_scheduler); 756 } 757 log_debug("debug: queue: done loading queue into scheduler"); 758 return; 759 } 760 761 if (r) { 762 if (msgid && evpid_to_msgid(evp.id) != msgid) { 763 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 764 0, 0, -1); 765 m_add_msgid(p_scheduler, msgid); 766 m_close(p_scheduler); 767 } 768 msgid = evpid_to_msgid(evp.id); 769 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 770 m_add_envelope(p_scheduler, &evp); 771 m_close(p_scheduler); 772 } 773 774 tv.tv_sec = 0; 775 tv.tv_usec = 10; 776 evtimer_add(ev, &tv); 777 } 778 779 static void 780 queue_log(const struct envelope *e, const char *prefix, const char *status) 781 { 782 char rcpt[LINE_MAX]; 783 784 (void)strlcpy(rcpt, "-", sizeof rcpt); 785 if (strcmp(e->rcpt.user, e->dest.user) || 786 strcmp(e->rcpt.domain, e->dest.domain)) 787 (void)snprintf(rcpt, sizeof rcpt, "%s@%s", 788 e->rcpt.user, e->rcpt.domain); 789 790 log_info("%s: %s for %016" PRIx64 ": from=<%s@%s>, to=<%s@%s>, " 791 "rcpt=<%s>, delay=%s, stat=%s", 792 e->type == D_MDA ? "delivery" : "relay", 793 prefix, 794 e->id, e->sender.user, e->sender.domain, 795 e->dest.user, e->dest.domain, 796 rcpt, 797 duration_to_text(time(NULL) - e->creation), 798 status); 799 } 800 801 void 802 queue_flow_control(void) 803 { 804 size_t bufsz; 805 int oldlimit = limit; 806 int set, unset; 807 808 bufsz = p_pony->bytes_queued; 809 if (bufsz <= flow_agent_lowat) 810 limit &= ~LIMIT_AGENT; 811 else if (bufsz > flow_agent_hiwat) 812 limit |= LIMIT_AGENT; 813 814 if (p_scheduler->bytes_queued <= flow_scheduler_lowat) 815 limit &= ~LIMIT_SCHEDULER; 816 else if (p_scheduler->bytes_queued > flow_scheduler_hiwat) 817 limit |= LIMIT_SCHEDULER; 818 819 set = limit & (limit ^ oldlimit); 820 unset = oldlimit & (limit ^ oldlimit); 821 822 if (set & LIMIT_SCHEDULER) { 823 log_warnx("warn: queue: Hiwat reached on scheduler buffer: " 824 "suspending transfer, delivery and lookup input"); 825 mproc_disable(p_pony); 826 mproc_disable(p_lka); 827 } 828 else if (unset & LIMIT_SCHEDULER) { 829 log_warnx("warn: queue: Down to lowat on scheduler buffer: " 830 "resuming transfer, delivery and lookup input"); 831 mproc_enable(p_pony); 832 mproc_enable(p_lka); 833 } 834 835 if (set & LIMIT_AGENT) { 836 log_warnx("warn: queue: Hiwat reached on transfer and delivery " 837 "buffers: suspending scheduler input"); 838 mproc_disable(p_scheduler); 839 } 840 else if (unset & LIMIT_AGENT) { 841 log_warnx("warn: queue: Down to lowat on transfer and delivery " 842 "buffers: resuming scheduler input"); 843 mproc_enable(p_scheduler); 844 } 845 } 846