1 /* $OpenBSD: queue.c,v 1.165 2014/07/10 15:54:55 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 7 * 8 * Permission to use, copy, modify, and distribute this software for any 9 * purpose with or without fee is hereby granted, provided that the above 10 * copyright notice and this permission notice appear in all copies. 11 * 12 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 13 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 14 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 15 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 16 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 17 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 18 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 19 */ 20 21 #include <sys/types.h> 22 #include <sys/queue.h> 23 #include <sys/tree.h> 24 #include <sys/socket.h> 25 #include <sys/stat.h> 26 27 #include <err.h> 28 #include <event.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <libgen.h> 32 #include <pwd.h> 33 #include <signal.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <time.h> 38 #include <unistd.h> 39 40 #include "smtpd.h" 41 #include "log.h" 42 43 static void queue_imsg(struct mproc *, struct imsg *); 44 static void queue_timeout(int, short, void *); 45 static void queue_bounce(struct envelope *, struct delivery_bounce *); 46 static void queue_shutdown(void); 47 static void queue_sig_handler(int, short, void *); 48 static void queue_log(const struct envelope *, const char *, const char *); 49 50 static size_t flow_agent_hiwat = 10 * 1024 * 1024; 51 static size_t flow_agent_lowat = 1 * 1024 * 1024; 52 static size_t flow_scheduler_hiwat = 10 * 1024 * 1024; 53 static size_t flow_scheduler_lowat = 1 * 1024 * 1024; 54 55 #define LIMIT_AGENT 0x01 56 #define LIMIT_SCHEDULER 0x02 57 58 static int limit = 0; 59 60 static void 61 queue_imsg(struct mproc *p, struct imsg *imsg) 62 { 63 struct delivery_bounce bounce; 64 struct bounce_req_msg *req_bounce; 65 struct envelope evp; 66 struct msg m; 67 const char *reason; 68 uint64_t reqid, evpid, holdq; 69 uint32_t msgid; 70 time_t nexttry; 71 int fd, mta_ext, ret, v, flags, code; 72 73 memset(&bounce, 0, sizeof(struct delivery_bounce)); 74 if (p->proc == PROC_PONY) { 75 76 switch (imsg->hdr.type) { 77 case IMSG_SMTP_MESSAGE_CREATE: 78 m_msg(&m, imsg); 79 m_get_id(&m, &reqid); 80 m_end(&m); 81 82 ret = queue_message_create(&msgid); 83 84 m_create(p, IMSG_SMTP_MESSAGE_CREATE, 0, 0, -1); 85 m_add_id(p, reqid); 86 if (ret == 0) 87 m_add_int(p, 0); 88 else { 89 m_add_int(p, 1); 90 m_add_msgid(p, msgid); 91 } 92 m_close(p); 93 return; 94 95 case IMSG_SMTP_MESSAGE_ROLLBACK: 96 m_msg(&m, imsg); 97 m_get_msgid(&m, &msgid); 98 m_end(&m); 99 100 queue_message_delete(msgid); 101 102 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_ROLLBACK, 103 0, 0, -1); 104 m_add_msgid(p_scheduler, msgid); 105 m_close(p_scheduler); 106 return; 107 108 case IMSG_SMTP_MESSAGE_COMMIT: 109 m_msg(&m, imsg); 110 m_get_id(&m, &reqid); 111 m_get_msgid(&m, &msgid); 112 m_end(&m); 113 114 ret = queue_message_commit(msgid); 115 116 m_create(p, IMSG_SMTP_MESSAGE_COMMIT, 0, 0, -1); 117 m_add_id(p, reqid); 118 m_add_int(p, (ret == 0) ? 0 : 1); 119 m_close(p); 120 121 if (ret) { 122 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 123 0, 0, -1); 124 m_add_msgid(p_scheduler, msgid); 125 m_close(p_scheduler); 126 } 127 return; 128 129 case IMSG_SMTP_MESSAGE_OPEN: 130 m_msg(&m, imsg); 131 m_get_id(&m, &reqid); 132 m_get_msgid(&m, &msgid); 133 m_end(&m); 134 135 fd = queue_message_fd_rw(msgid); 136 137 m_create(p, IMSG_SMTP_MESSAGE_OPEN, 0, 0, fd); 138 m_add_id(p, reqid); 139 m_add_int(p, (fd == -1) ? 0 : 1); 140 m_close(p); 141 return; 142 143 case IMSG_QUEUE_SMTP_SESSION: 144 bounce_fd(imsg->fd); 145 return; 146 } 147 } 148 149 if (p->proc == PROC_LKA) { 150 switch (imsg->hdr.type) { 151 case IMSG_LKA_ENVELOPE_SUBMIT: 152 m_msg(&m, imsg); 153 m_get_id(&m, &reqid); 154 m_get_envelope(&m, &evp); 155 m_end(&m); 156 157 if (evp.id == 0) 158 log_warnx("warn: imsg_queue_submit_envelope: evpid=0"); 159 if (evpid_to_msgid(evp.id) == 0) 160 log_warnx("warn: imsg_queue_submit_envelope: msgid=0, " 161 "evpid=%016"PRIx64, evp.id); 162 ret = queue_envelope_create(&evp); 163 m_create(p_pony, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 164 m_add_id(p_pony, reqid); 165 if (ret == 0) 166 m_add_int(p_pony, 0); 167 else { 168 m_add_int(p_pony, 1); 169 m_add_evpid(p_pony, evp.id); 170 } 171 m_close(p_pony); 172 if (ret) { 173 m_create(p_scheduler, 174 IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 175 m_add_envelope(p_scheduler, &evp); 176 m_close(p_scheduler); 177 178 } 179 return; 180 181 case IMSG_LKA_ENVELOPE_COMMIT: 182 m_msg(&m, imsg); 183 m_get_id(&m, &reqid); 184 m_end(&m); 185 m_create(p_pony, IMSG_QUEUE_ENVELOPE_COMMIT, 0, 0, -1); 186 m_add_id(p_pony, reqid); 187 m_add_int(p_pony, 1); 188 m_close(p_pony); 189 return; 190 } 191 } 192 193 if (p->proc == PROC_SCHEDULER) { 194 switch (imsg->hdr.type) { 195 case IMSG_SCHED_ENVELOPE_REMOVE: 196 m_msg(&m, imsg); 197 m_get_evpid(&m, &evpid); 198 m_end(&m); 199 200 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); 201 m_add_evpid(p_scheduler, evpid); 202 m_close(p_scheduler); 203 204 /* already removed by scheduler */ 205 if (queue_envelope_load(evpid, &evp) == 0) 206 return; 207 208 queue_log(&evp, "Remove", "Removed by administrator"); 209 queue_envelope_delete(evpid); 210 return; 211 212 case IMSG_SCHED_ENVELOPE_EXPIRE: 213 m_msg(&m, imsg); 214 m_get_evpid(&m, &evpid); 215 m_end(&m); 216 217 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_ACK, 0, 0, -1); 218 m_add_evpid(p_scheduler, evpid); 219 m_close(p_scheduler); 220 221 /* already removed by scheduler*/ 222 if (queue_envelope_load(evpid, &evp) == 0) 223 return; 224 225 bounce.type = B_ERROR; 226 envelope_set_errormsg(&evp, "Envelope expired"); 227 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 228 envelope_set_esc_code(&evp, ESC_DELIVERY_TIME_EXPIRED); 229 queue_bounce(&evp, &bounce); 230 queue_log(&evp, "Expire", "Envelope expired"); 231 queue_envelope_delete(evpid); 232 return; 233 234 case IMSG_SCHED_ENVELOPE_BOUNCE: 235 req_bounce = imsg->data; 236 evpid = req_bounce->evpid; 237 238 if (queue_envelope_load(evpid, &evp) == 0) { 239 log_warnx("queue: bounce: failed to load envelope"); 240 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 241 m_add_evpid(p_scheduler, evpid); 242 m_add_u32(p_scheduler, 0); /* not in-flight */ 243 m_close(p_scheduler); 244 return; 245 } 246 queue_bounce(&evp, &req_bounce->bounce); 247 evp.lastbounce = req_bounce->timestamp; 248 if (!queue_envelope_update(&evp)) 249 log_warnx("warn: could not update envelope %016"PRIx64, evpid); 250 return; 251 252 case IMSG_SCHED_ENVELOPE_DELIVER: 253 m_msg(&m, imsg); 254 m_get_evpid(&m, &evpid); 255 m_end(&m); 256 if (queue_envelope_load(evpid, &evp) == 0) { 257 log_warnx("queue: deliver: failed to load envelope"); 258 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 259 m_add_evpid(p_scheduler, evpid); 260 m_add_u32(p_scheduler, 1); /* in-flight */ 261 m_close(p_scheduler); 262 return; 263 } 264 evp.lasttry = time(NULL); 265 m_create(p_pony, IMSG_QUEUE_DELIVER, 0, 0, -1); 266 m_add_envelope(p_pony, &evp); 267 m_close(p_pony); 268 return; 269 270 case IMSG_SCHED_ENVELOPE_INJECT: 271 m_msg(&m, imsg); 272 m_get_evpid(&m, &evpid); 273 m_end(&m); 274 bounce_add(evpid); 275 return; 276 277 case IMSG_SCHED_ENVELOPE_TRANSFER: 278 m_msg(&m, imsg); 279 m_get_evpid(&m, &evpid); 280 m_end(&m); 281 if (queue_envelope_load(evpid, &evp) == 0) { 282 log_warnx("queue: failed to load envelope"); 283 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 284 m_add_evpid(p_scheduler, evpid); 285 m_add_u32(p_scheduler, 1); /* in-flight */ 286 m_close(p_scheduler); 287 return; 288 } 289 evp.lasttry = time(NULL); 290 m_create(p_pony, IMSG_QUEUE_TRANSFER, 0, 0, -1); 291 m_add_envelope(p_pony, &evp); 292 m_close(p_pony); 293 return; 294 295 case IMSG_CTL_LIST_ENVELOPES: 296 if (imsg->hdr.len == sizeof imsg->hdr) { 297 m_forward(p_control, imsg); 298 return; 299 } 300 301 m_msg(&m, imsg); 302 m_get_evpid(&m, &evpid); 303 m_get_int(&m, &flags); 304 m_get_time(&m, &nexttry); 305 m_end(&m); 306 307 if (queue_envelope_load(evpid, &evp) == 0) 308 return; /* Envelope is gone, drop it */ 309 310 /* 311 * XXX consistency: The envelope might already be on 312 * its way back to the scheduler. We need to detect 313 * this properly and report that state. 314 */ 315 evp.flags |= flags; 316 /* In the past if running or runnable */ 317 evp.nexttry = nexttry; 318 if (flags & EF_INFLIGHT) { 319 /* 320 * Not exactly correct but pretty close: The 321 * value is not recorded on the envelope unless 322 * a tempfail occurs. 323 */ 324 evp.lasttry = nexttry; 325 } 326 m_compose(p_control, IMSG_CTL_LIST_ENVELOPES, 327 imsg->hdr.peerid, 0, -1, &evp, sizeof evp); 328 return; 329 } 330 } 331 332 if (p->proc == PROC_PONY) { 333 switch (imsg->hdr.type) { 334 case IMSG_MDA_OPEN_MESSAGE: 335 case IMSG_MTA_OPEN_MESSAGE: 336 m_msg(&m, imsg); 337 m_get_id(&m, &reqid); 338 m_get_msgid(&m, &msgid); 339 m_end(&m); 340 fd = queue_message_fd_r(msgid); 341 m_create(p, imsg->hdr.type, 0, 0, fd); 342 m_add_id(p, reqid); 343 m_close(p); 344 return; 345 346 case IMSG_MDA_DELIVERY_OK: 347 case IMSG_MTA_DELIVERY_OK: 348 m_msg(&m, imsg); 349 m_get_evpid(&m, &evpid); 350 if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK) 351 m_get_int(&m, &mta_ext); 352 m_end(&m); 353 if (queue_envelope_load(evpid, &evp) == 0) { 354 log_warn("queue: dsn: failed to load envelope"); 355 return; 356 } 357 if (evp.dsn_notify & DSN_SUCCESS) { 358 bounce.type = B_DSN; 359 bounce.dsn_ret = evp.dsn_ret; 360 361 if (imsg->hdr.type == IMSG_MDA_DELIVERY_OK) 362 queue_bounce(&evp, &bounce); 363 else if (imsg->hdr.type == IMSG_MTA_DELIVERY_OK && 364 (mta_ext & MTA_EXT_DSN) == 0) { 365 bounce.mta_without_dsn = 1; 366 queue_bounce(&evp, &bounce); 367 } 368 } 369 queue_envelope_delete(evpid); 370 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_OK, 0, 0, -1); 371 m_add_evpid(p_scheduler, evpid); 372 m_close(p_scheduler); 373 return; 374 375 case IMSG_MDA_DELIVERY_TEMPFAIL: 376 case IMSG_MTA_DELIVERY_TEMPFAIL: 377 m_msg(&m, imsg); 378 m_get_evpid(&m, &evpid); 379 m_get_string(&m, &reason); 380 m_get_int(&m, &code); 381 m_end(&m); 382 if (queue_envelope_load(evpid, &evp) == 0) { 383 log_warnx("queue: tempfail: failed to load envelope"); 384 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 385 m_add_evpid(p_scheduler, evpid); 386 m_add_u32(p_scheduler, 1); /* in-flight */ 387 m_close(p_scheduler); 388 return; 389 } 390 envelope_set_errormsg(&evp, "%s", reason); 391 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 392 envelope_set_esc_code(&evp, code); 393 evp.retry++; 394 if (!queue_envelope_update(&evp)) 395 log_warnx("warn: could not update envelope %016"PRIx64, evpid); 396 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1); 397 m_add_envelope(p_scheduler, &evp); 398 m_close(p_scheduler); 399 return; 400 401 case IMSG_MDA_DELIVERY_PERMFAIL: 402 case IMSG_MTA_DELIVERY_PERMFAIL: 403 m_msg(&m, imsg); 404 m_get_evpid(&m, &evpid); 405 m_get_string(&m, &reason); 406 m_get_int(&m, &code); 407 m_end(&m); 408 if (queue_envelope_load(evpid, &evp) == 0) { 409 log_warnx("queue: permfail: failed to load envelope"); 410 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 411 m_add_evpid(p_scheduler, evpid); 412 m_add_u32(p_scheduler, 1); /* in-flight */ 413 m_close(p_scheduler); 414 return; 415 } 416 bounce.type = B_ERROR; 417 envelope_set_errormsg(&evp, "%s", reason); 418 envelope_set_esc_class(&evp, ESC_STATUS_PERMFAIL); 419 envelope_set_esc_code(&evp, code); 420 queue_bounce(&evp, &bounce); 421 queue_envelope_delete(evpid); 422 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1); 423 m_add_evpid(p_scheduler, evpid); 424 m_close(p_scheduler); 425 return; 426 427 case IMSG_MDA_DELIVERY_LOOP: 428 case IMSG_MTA_DELIVERY_LOOP: 429 m_msg(&m, imsg); 430 m_get_evpid(&m, &evpid); 431 m_end(&m); 432 if (queue_envelope_load(evpid, &evp) == 0) { 433 log_warnx("queue: loop: failed to load envelope"); 434 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_REMOVE, 0, 0, -1); 435 m_add_evpid(p_scheduler, evpid); 436 m_add_u32(p_scheduler, 1); /* in-flight */ 437 m_close(p_scheduler); 438 return; 439 } 440 envelope_set_errormsg(&evp, "%s", "Loop detected"); 441 envelope_set_esc_class(&evp, ESC_STATUS_TEMPFAIL); 442 envelope_set_esc_code(&evp, ESC_ROUTING_LOOP_DETECTED); 443 bounce.type = B_ERROR; 444 queue_bounce(&evp, &bounce); 445 queue_envelope_delete(evp.id); 446 m_create(p_scheduler, IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1); 447 m_add_evpid(p_scheduler, evp.id); 448 m_close(p_scheduler); 449 return; 450 451 case IMSG_MTA_DELIVERY_HOLD: 452 case IMSG_MDA_DELIVERY_HOLD: 453 imsg->hdr.type = IMSG_QUEUE_HOLDQ_HOLD; 454 m_forward(p_scheduler, imsg); 455 return; 456 457 case IMSG_MTA_SCHEDULE: 458 imsg->hdr.type = IMSG_QUEUE_ENVELOPE_SCHEDULE; 459 m_forward(p_scheduler, imsg); 460 return; 461 462 case IMSG_MTA_HOLDQ_RELEASE: 463 case IMSG_MDA_HOLDQ_RELEASE: 464 m_msg(&m, imsg); 465 m_get_id(&m, &holdq); 466 m_get_int(&m, &v); 467 m_end(&m); 468 m_create(p_scheduler, IMSG_QUEUE_HOLDQ_RELEASE, 0, 0, -1); 469 if (imsg->hdr.type == IMSG_MTA_HOLDQ_RELEASE) 470 m_add_int(p_scheduler, D_MTA); 471 else 472 m_add_int(p_scheduler, D_MDA); 473 m_add_id(p_scheduler, holdq); 474 m_add_int(p_scheduler, v); 475 m_close(p_scheduler); 476 return; 477 } 478 } 479 480 if (p->proc == PROC_CONTROL) { 481 switch (imsg->hdr.type) { 482 case IMSG_CTL_PAUSE_MDA: 483 case IMSG_CTL_PAUSE_MTA: 484 case IMSG_CTL_RESUME_MDA: 485 case IMSG_CTL_RESUME_MTA: 486 m_forward(p_scheduler, imsg); 487 return; 488 489 case IMSG_CTL_VERBOSE: 490 m_msg(&m, imsg); 491 m_get_int(&m, &v); 492 m_end(&m); 493 log_verbose(v); 494 return; 495 496 case IMSG_CTL_PROFILE: 497 m_msg(&m, imsg); 498 m_get_int(&m, &v); 499 m_end(&m); 500 profiling = v; 501 return; 502 } 503 } 504 505 errx(1, "queue_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type)); 506 } 507 508 static void 509 queue_bounce(struct envelope *e, struct delivery_bounce *d) 510 { 511 struct envelope b; 512 513 b = *e; 514 b.type = D_BOUNCE; 515 b.agent.bounce = *d; 516 b.retry = 0; 517 b.lasttry = 0; 518 b.creation = time(NULL); 519 b.expire = 3600 * 24 * 7; 520 521 if (e->dsn_notify & DSN_NEVER) 522 return; 523 524 if (b.id == 0) 525 log_warnx("warn: queue_bounce: evpid=0"); 526 if (evpid_to_msgid(b.id) == 0) 527 log_warnx("warn: queue_bounce: msgid=0, evpid=%016"PRIx64, 528 b.id); 529 if (e->type == D_BOUNCE) { 530 log_warnx("warn: queue: double bounce!"); 531 } else if (e->sender.user[0] == '\0') { 532 log_warnx("warn: queue: no return path!"); 533 } else if (!queue_envelope_create(&b)) { 534 log_warnx("warn: queue: cannot bounce!"); 535 } else { 536 log_debug("debug: queue: bouncing evp:%016" PRIx64 537 " as evp:%016" PRIx64, e->id, b.id); 538 539 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 540 m_add_envelope(p_scheduler, &b); 541 m_close(p_scheduler); 542 543 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 0, 0, -1); 544 m_add_msgid(p_scheduler, evpid_to_msgid(b.id)); 545 m_close(p_scheduler); 546 547 stat_increment("queue.bounce", 1); 548 } 549 } 550 551 static void 552 queue_sig_handler(int sig, short event, void *p) 553 { 554 switch (sig) { 555 case SIGINT: 556 case SIGTERM: 557 queue_shutdown(); 558 break; 559 default: 560 fatalx("queue_sig_handler: unexpected signal"); 561 } 562 } 563 564 static void 565 queue_shutdown(void) 566 { 567 log_info("info: queue handler exiting"); 568 queue_close(); 569 _exit(0); 570 } 571 572 pid_t 573 queue(void) 574 { 575 pid_t pid; 576 struct passwd *pw; 577 struct timeval tv; 578 struct event ev_qload; 579 struct event ev_sigint; 580 struct event ev_sigterm; 581 582 switch (pid = fork()) { 583 case -1: 584 fatal("queue: cannot fork"); 585 case 0: 586 post_fork(PROC_QUEUE); 587 break; 588 default: 589 return (pid); 590 } 591 592 purge_config(PURGE_EVERYTHING); 593 594 if ((pw = getpwnam(SMTPD_QUEUE_USER)) == NULL) 595 if ((pw = getpwnam(SMTPD_USER)) == NULL) 596 fatalx("unknown user " SMTPD_USER); 597 598 env->sc_queue_flags |= QUEUE_EVPCACHE; 599 env->sc_queue_evpcache_size = 1024; 600 601 if (chroot(PATH_SPOOL) == -1) 602 fatal("queue: chroot"); 603 if (chdir("/") == -1) 604 fatal("queue: chdir(\"/\")"); 605 606 config_process(PROC_QUEUE); 607 608 if (env->sc_queue_flags & QUEUE_COMPRESSION) 609 log_info("queue: queue compression enabled"); 610 611 if (env->sc_queue_key) { 612 if (! crypto_setup(env->sc_queue_key, strlen(env->sc_queue_key))) 613 fatalx("crypto_setup: invalid key for queue encryption"); 614 log_info("queue: queue encryption enabled"); 615 } 616 617 if (setgroups(1, &pw->pw_gid) || 618 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 619 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 620 fatal("queue: cannot drop privileges"); 621 622 imsg_callback = queue_imsg; 623 event_init(); 624 625 signal_set(&ev_sigint, SIGINT, queue_sig_handler, NULL); 626 signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, NULL); 627 signal_add(&ev_sigint, NULL); 628 signal_add(&ev_sigterm, NULL); 629 signal(SIGPIPE, SIG_IGN); 630 signal(SIGHUP, SIG_IGN); 631 632 config_peer(PROC_PARENT); 633 config_peer(PROC_CONTROL); 634 config_peer(PROC_LKA); 635 config_peer(PROC_SCHEDULER); 636 config_peer(PROC_PONY); 637 config_done(); 638 639 /* setup queue loading task */ 640 evtimer_set(&ev_qload, queue_timeout, &ev_qload); 641 tv.tv_sec = 0; 642 tv.tv_usec = 10; 643 evtimer_add(&ev_qload, &tv); 644 645 if (event_dispatch() < 0) 646 fatal("event_dispatch"); 647 queue_shutdown(); 648 649 return (0); 650 } 651 652 static void 653 queue_timeout(int fd, short event, void *p) 654 { 655 static uint32_t msgid = 0; 656 struct envelope evp; 657 struct event *ev = p; 658 struct timeval tv; 659 int r; 660 661 r = queue_envelope_walk(&evp); 662 if (r == -1) { 663 if (msgid) { 664 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 665 0, 0, -1); 666 m_add_msgid(p_scheduler, msgid); 667 m_close(p_scheduler); 668 } 669 log_debug("debug: queue: done loading queue into scheduler"); 670 return; 671 } 672 673 if (r) { 674 if (msgid && evpid_to_msgid(evp.id) != msgid) { 675 m_create(p_scheduler, IMSG_QUEUE_MESSAGE_COMMIT, 676 0, 0, -1); 677 m_add_msgid(p_scheduler, msgid); 678 m_close(p_scheduler); 679 } 680 msgid = evpid_to_msgid(evp.id); 681 m_create(p_scheduler, IMSG_QUEUE_ENVELOPE_SUBMIT, 0, 0, -1); 682 m_add_envelope(p_scheduler, &evp); 683 m_close(p_scheduler); 684 } 685 686 tv.tv_sec = 0; 687 tv.tv_usec = 10; 688 evtimer_add(ev, &tv); 689 } 690 691 static void 692 queue_log(const struct envelope *e, const char *prefix, const char *status) 693 { 694 char rcpt[SMTPD_MAXLINESIZE]; 695 696 (void)strlcpy(rcpt, "-", sizeof rcpt); 697 if (strcmp(e->rcpt.user, e->dest.user) || 698 strcmp(e->rcpt.domain, e->dest.domain)) 699 (void)snprintf(rcpt, sizeof rcpt, "%s@%s", 700 e->rcpt.user, e->rcpt.domain); 701 702 log_info("%s: %s for %016" PRIx64 ": from=<%s@%s>, to=<%s@%s>, " 703 "rcpt=<%s>, delay=%s, stat=%s", 704 e->type == D_MDA ? "delivery" : "relay", 705 prefix, 706 e->id, e->sender.user, e->sender.domain, 707 e->dest.user, e->dest.domain, 708 rcpt, 709 duration_to_text(time(NULL) - e->creation), 710 status); 711 } 712 713 void 714 queue_flow_control(void) 715 { 716 size_t bufsz; 717 int oldlimit = limit; 718 int set, unset; 719 720 bufsz = p_pony->bytes_queued; 721 if (bufsz <= flow_agent_lowat) 722 limit &= ~LIMIT_AGENT; 723 else if (bufsz > flow_agent_hiwat) 724 limit |= LIMIT_AGENT; 725 726 if (p_scheduler->bytes_queued <= flow_scheduler_lowat) 727 limit &= ~LIMIT_SCHEDULER; 728 else if (p_scheduler->bytes_queued > flow_scheduler_hiwat) 729 limit |= LIMIT_SCHEDULER; 730 731 set = limit & (limit ^ oldlimit); 732 unset = oldlimit & (limit ^ oldlimit); 733 734 if (set & LIMIT_SCHEDULER) { 735 log_warnx("warn: queue: Hiwat reached on scheduler buffer: " 736 "suspending transfer, delivery and lookup input"); 737 mproc_disable(p_pony); 738 mproc_disable(p_lka); 739 } 740 else if (unset & LIMIT_SCHEDULER) { 741 log_warnx("warn: queue: Down to lowat on scheduler buffer: " 742 "resuming transfer, delivery and lookup input"); 743 mproc_enable(p_pony); 744 mproc_enable(p_lka); 745 } 746 747 if (set & LIMIT_AGENT) { 748 log_warnx("warn: queue: Hiwat reached on transfer and delivery " 749 "buffers: suspending scheduler input"); 750 mproc_disable(p_scheduler); 751 } 752 else if (unset & LIMIT_AGENT) { 753 log_warnx("warn: queue: Down to lowat on transfer and delivery " 754 "buffers: resuming scheduler input"); 755 mproc_enable(p_scheduler); 756 } 757 } 758