1 /* $OpenBSD: scheduler.c,v 1.55 2016/09/08 12:06:43 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net> 7 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 8 * 9 * Permission to use, copy, modify, and distribute this software for any 10 * purpose with or without fee is hereby granted, provided that the above 11 * copyright notice and this permission notice appear in all copies. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 14 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 15 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 16 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 17 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 18 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 19 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 20 */ 21 22 #include <sys/types.h> 23 #include <sys/queue.h> 24 #include <sys/tree.h> 25 #include <sys/socket.h> 26 #include <sys/stat.h> 27 28 #include <ctype.h> 29 #include <dirent.h> 30 #include <err.h> 31 #include <errno.h> 32 #include <event.h> 33 #include <imsg.h> 34 #include <inttypes.h> 35 #include <libgen.h> 36 #include <pwd.h> 37 #include <signal.h> 38 #include <stdio.h> 39 #include <stdlib.h> 40 #include <string.h> 41 #include <time.h> 42 #include <unistd.h> 43 #include <limits.h> 44 45 #include "smtpd.h" 46 #include "log.h" 47 48 static void scheduler_imsg(struct mproc *, struct imsg *); 49 static void scheduler_shutdown(void); 50 static void scheduler_reset_events(void); 51 static void scheduler_timeout(int, short, void *); 52 53 static struct scheduler_backend *backend = NULL; 54 static struct event ev; 55 static size_t ninflight = 0; 56 static int *types; 57 static uint64_t *evpids; 58 static uint32_t *msgids; 59 static struct evpstate *state; 60 61 extern const char *backend_scheduler; 62 63 void 64 scheduler_imsg(struct mproc *p, struct imsg *imsg) 65 { 66 struct bounce_req_msg req; 67 struct envelope evp; 68 struct scheduler_info si; 69 struct msg m; 70 uint64_t evpid, id, holdq; 71 uint32_t msgid; 72 uint32_t inflight; 73 size_t n, i; 74 time_t timestamp; 75 int v, r, type; 76 77 if (imsg == NULL) 78 scheduler_shutdown(); 79 80 switch (imsg->hdr.type) { 81 82 case IMSG_QUEUE_ENVELOPE_SUBMIT: 83 m_msg(&m, imsg); 84 m_get_envelope(&m, &evp); 85 m_end(&m); 86 log_trace(TRACE_SCHEDULER, 87 "scheduler: inserting evp:%016" PRIx64, evp.id); 88 scheduler_info(&si, &evp); 89 stat_increment("scheduler.envelope.incoming", 1); 90 backend->insert(&si); 91 return; 92 93 case IMSG_QUEUE_MESSAGE_COMMIT: 94 m_msg(&m, imsg); 95 m_get_msgid(&m, &msgid); 96 m_end(&m); 97 log_trace(TRACE_SCHEDULER, 98 "scheduler: committing msg:%08" PRIx32, msgid); 99 n = backend->commit(msgid); 100 stat_decrement("scheduler.envelope.incoming", n); 101 stat_increment("scheduler.envelope", n); 102 scheduler_reset_events(); 103 return; 104 105 case IMSG_QUEUE_DISCOVER_EVPID: 106 m_msg(&m, imsg); 107 m_get_envelope(&m, &evp); 108 m_end(&m); 109 r = backend->query(evp.id); 110 if (r) { 111 log_debug("debug: scheduler: evp:%016" PRIx64 112 " already scheduled", evp.id); 113 return; 114 } 115 log_trace(TRACE_SCHEDULER, 116 "scheduler: discovering evp:%016" PRIx64, evp.id); 117 scheduler_info(&si, &evp); 118 stat_increment("scheduler.envelope.incoming", 1); 119 backend->insert(&si); 120 return; 121 122 case IMSG_QUEUE_DISCOVER_MSGID: 123 m_msg(&m, imsg); 124 m_get_msgid(&m, &msgid); 125 m_end(&m); 126 r = backend->query(msgid); 127 if (r) { 128 log_debug("debug: scheduler: msgid:%08" PRIx32 129 " already scheduled", msgid); 130 return; 131 } 132 log_trace(TRACE_SCHEDULER, 133 "scheduler: committing msg:%08" PRIx32, msgid); 134 n = backend->commit(msgid); 135 stat_decrement("scheduler.envelope.incoming", n); 136 stat_increment("scheduler.envelope", n); 137 scheduler_reset_events(); 138 return; 139 140 case IMSG_QUEUE_MESSAGE_ROLLBACK: 141 m_msg(&m, imsg); 142 m_get_msgid(&m, &msgid); 143 m_end(&m); 144 log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32, 145 msgid); 146 n = backend->rollback(msgid); 147 stat_decrement("scheduler.envelope.incoming", n); 148 scheduler_reset_events(); 149 return; 150 151 case IMSG_QUEUE_ENVELOPE_REMOVE: 152 m_msg(&m, imsg); 153 m_get_evpid(&m, &evpid); 154 m_get_u32(&m, &inflight); 155 m_end(&m); 156 log_trace(TRACE_SCHEDULER, 157 "scheduler: queue requested removal of evp:%016" PRIx64, 158 evpid); 159 stat_decrement("scheduler.envelope", 1); 160 if (!inflight) 161 backend->remove(evpid); 162 else { 163 backend->delete(evpid); 164 ninflight -= 1; 165 stat_decrement("scheduler.envelope.inflight", 1); 166 } 167 168 scheduler_reset_events(); 169 return; 170 171 case IMSG_QUEUE_ENVELOPE_ACK: 172 m_msg(&m, imsg); 173 m_get_evpid(&m, &evpid); 174 m_end(&m); 175 log_trace(TRACE_SCHEDULER, 176 "scheduler: queue ack removal of evp:%016" PRIx64, 177 evpid); 178 ninflight -= 1; 179 stat_decrement("scheduler.envelope.inflight", 1); 180 scheduler_reset_events(); 181 return; 182 183 case IMSG_QUEUE_DELIVERY_OK: 184 m_msg(&m, imsg); 185 m_get_evpid(&m, &evpid); 186 m_end(&m); 187 log_trace(TRACE_SCHEDULER, 188 "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid); 189 backend->delete(evpid); 190 ninflight -= 1; 191 stat_increment("scheduler.delivery.ok", 1); 192 stat_decrement("scheduler.envelope.inflight", 1); 193 stat_decrement("scheduler.envelope", 1); 194 scheduler_reset_events(); 195 return; 196 197 case IMSG_QUEUE_DELIVERY_TEMPFAIL: 198 m_msg(&m, imsg); 199 m_get_envelope(&m, &evp); 200 m_end(&m); 201 log_trace(TRACE_SCHEDULER, 202 "scheduler: updating evp:%016" PRIx64, evp.id); 203 scheduler_info(&si, &evp); 204 backend->update(&si); 205 ninflight -= 1; 206 stat_increment("scheduler.delivery.tempfail", 1); 207 stat_decrement("scheduler.envelope.inflight", 1); 208 209 for (i = 0; i < MAX_BOUNCE_WARN; i++) { 210 if (env->sc_bounce_warn[i] == 0) 211 break; 212 timestamp = si.creation + env->sc_bounce_warn[i]; 213 if (si.nexttry >= timestamp && 214 si.lastbounce < timestamp) { 215 req.evpid = evp.id; 216 req.timestamp = timestamp; 217 req.bounce.type = B_WARNING; 218 req.bounce.delay = env->sc_bounce_warn[i]; 219 req.bounce.expire = si.expire; 220 m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1, 221 &req, sizeof req); 222 break; 223 } 224 } 225 scheduler_reset_events(); 226 return; 227 228 case IMSG_QUEUE_DELIVERY_PERMFAIL: 229 m_msg(&m, imsg); 230 m_get_evpid(&m, &evpid); 231 m_end(&m); 232 log_trace(TRACE_SCHEDULER, 233 "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid); 234 backend->delete(evpid); 235 ninflight -= 1; 236 stat_increment("scheduler.delivery.permfail", 1); 237 stat_decrement("scheduler.envelope.inflight", 1); 238 stat_decrement("scheduler.envelope", 1); 239 scheduler_reset_events(); 240 return; 241 242 case IMSG_QUEUE_DELIVERY_LOOP: 243 m_msg(&m, imsg); 244 m_get_evpid(&m, &evpid); 245 m_end(&m); 246 log_trace(TRACE_SCHEDULER, 247 "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid); 248 backend->delete(evpid); 249 ninflight -= 1; 250 stat_increment("scheduler.delivery.loop", 1); 251 stat_decrement("scheduler.envelope.inflight", 1); 252 stat_decrement("scheduler.envelope", 1); 253 scheduler_reset_events(); 254 return; 255 256 case IMSG_QUEUE_HOLDQ_HOLD: 257 m_msg(&m, imsg); 258 m_get_evpid(&m, &evpid); 259 m_get_id(&m, &holdq); 260 m_end(&m); 261 log_trace(TRACE_SCHEDULER, 262 "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64, 263 evpid, holdq); 264 backend->hold(evpid, holdq); 265 ninflight -= 1; 266 stat_decrement("scheduler.envelope.inflight", 1); 267 scheduler_reset_events(); 268 return; 269 270 case IMSG_QUEUE_HOLDQ_RELEASE: 271 m_msg(&m, imsg); 272 m_get_int(&m, &type); 273 m_get_id(&m, &holdq); 274 m_get_int(&m, &r); 275 m_end(&m); 276 log_trace(TRACE_SCHEDULER, 277 "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")", 278 r, type, holdq); 279 backend->release(type, holdq, r); 280 scheduler_reset_events(); 281 return; 282 283 case IMSG_CTL_PAUSE_MDA: 284 log_trace(TRACE_SCHEDULER, "scheduler: pausing mda"); 285 env->sc_flags |= SMTPD_MDA_PAUSED; 286 return; 287 288 case IMSG_CTL_RESUME_MDA: 289 log_trace(TRACE_SCHEDULER, "scheduler: resuming mda"); 290 env->sc_flags &= ~SMTPD_MDA_PAUSED; 291 scheduler_reset_events(); 292 return; 293 294 case IMSG_CTL_PAUSE_MTA: 295 log_trace(TRACE_SCHEDULER, "scheduler: pausing mta"); 296 env->sc_flags |= SMTPD_MTA_PAUSED; 297 return; 298 299 case IMSG_CTL_RESUME_MTA: 300 log_trace(TRACE_SCHEDULER, "scheduler: resuming mta"); 301 env->sc_flags &= ~SMTPD_MTA_PAUSED; 302 scheduler_reset_events(); 303 return; 304 305 case IMSG_CTL_VERBOSE: 306 m_msg(&m, imsg); 307 m_get_int(&m, &v); 308 m_end(&m); 309 log_verbose(v); 310 return; 311 312 case IMSG_CTL_PROFILE: 313 m_msg(&m, imsg); 314 m_get_int(&m, &v); 315 m_end(&m); 316 profiling = v; 317 return; 318 319 case IMSG_CTL_LIST_MESSAGES: 320 msgid = *(uint32_t *)(imsg->data); 321 n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size); 322 m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1, 323 msgids, n * sizeof (*msgids)); 324 return; 325 326 case IMSG_CTL_LIST_ENVELOPES: 327 id = *(uint64_t *)(imsg->data); 328 n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size); 329 for (i = 0; i < n; i++) { 330 m_create(p_queue, IMSG_CTL_LIST_ENVELOPES, 331 imsg->hdr.peerid, 0, -1); 332 m_add_evpid(p_queue, state[i].evpid); 333 m_add_int(p_queue, state[i].flags); 334 m_add_time(p_queue, state[i].time); 335 m_close(p_queue); 336 } 337 m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES, 338 imsg->hdr.peerid, 0, -1, NULL, 0); 339 return; 340 341 case IMSG_CTL_SCHEDULE: 342 id = *(uint64_t *)(imsg->data); 343 if (id <= 0xffffffffL) 344 log_debug("debug: scheduler: " 345 "scheduling msg:%08" PRIx64, id); 346 else 347 log_debug("debug: scheduler: " 348 "scheduling evp:%016" PRIx64, id); 349 r = backend->schedule(id); 350 scheduler_reset_events(); 351 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 352 0, -1, NULL, 0); 353 return; 354 355 case IMSG_QUEUE_ENVELOPE_SCHEDULE: 356 id = *(uint64_t *)(imsg->data); 357 backend->schedule(id); 358 scheduler_reset_events(); 359 return; 360 361 case IMSG_CTL_REMOVE: 362 id = *(uint64_t *)(imsg->data); 363 if (id <= 0xffffffffL) 364 log_debug("debug: scheduler: " 365 "removing msg:%08" PRIx64, id); 366 else 367 log_debug("debug: scheduler: " 368 "removing evp:%016" PRIx64, id); 369 r = backend->remove(id); 370 scheduler_reset_events(); 371 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 372 0, -1, NULL, 0); 373 return; 374 375 case IMSG_CTL_PAUSE_EVP: 376 id = *(uint64_t *)(imsg->data); 377 if (id <= 0xffffffffL) 378 log_debug("debug: scheduler: " 379 "suspending msg:%08" PRIx64, id); 380 else 381 log_debug("debug: scheduler: " 382 "suspending evp:%016" PRIx64, id); 383 r = backend->suspend(id); 384 scheduler_reset_events(); 385 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 386 0, -1, NULL, 0); 387 return; 388 389 case IMSG_CTL_RESUME_EVP: 390 id = *(uint64_t *)(imsg->data); 391 if (id <= 0xffffffffL) 392 log_debug("debug: scheduler: " 393 "resuming msg:%08" PRIx64, id); 394 else 395 log_debug("debug: scheduler: " 396 "resuming evp:%016" PRIx64, id); 397 r = backend->resume(id); 398 scheduler_reset_events(); 399 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 400 0, -1, NULL, 0); 401 return; 402 } 403 404 errx(1, "scheduler_imsg: unexpected %s imsg", 405 imsg_to_str(imsg->hdr.type)); 406 } 407 408 static void 409 scheduler_shutdown(void) 410 { 411 log_debug("debug: scheduler agent exiting"); 412 _exit(0); 413 } 414 415 static void 416 scheduler_reset_events(void) 417 { 418 struct timeval tv; 419 420 evtimer_del(&ev); 421 tv.tv_sec = 0; 422 tv.tv_usec = 0; 423 evtimer_add(&ev, &tv); 424 } 425 426 int 427 scheduler(void) 428 { 429 struct passwd *pw; 430 431 backend = scheduler_backend_lookup(backend_scheduler); 432 if (backend == NULL) 433 errx(1, "cannot find scheduler backend \"%s\"", 434 backend_scheduler); 435 436 purge_config(PURGE_EVERYTHING); 437 438 if ((pw = getpwnam(SMTPD_USER)) == NULL) 439 fatalx("unknown user " SMTPD_USER); 440 441 config_process(PROC_SCHEDULER); 442 443 backend->init(backend_scheduler); 444 445 if (chroot(PATH_CHROOT) == -1) 446 fatal("scheduler: chroot"); 447 if (chdir("/") == -1) 448 fatal("scheduler: chdir(\"/\")"); 449 450 if (setgroups(1, &pw->pw_gid) || 451 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 452 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 453 fatal("scheduler: cannot drop privileges"); 454 455 evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids, "scheduler: init evpids"); 456 types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types, "scheduler: init types"); 457 msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids, "scheduler: list msg"); 458 state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state, "scheduler: list evp"); 459 460 imsg_callback = scheduler_imsg; 461 event_init(); 462 463 signal(SIGINT, SIG_IGN); 464 signal(SIGTERM, SIG_IGN); 465 signal(SIGPIPE, SIG_IGN); 466 signal(SIGHUP, SIG_IGN); 467 468 config_peer(PROC_CONTROL); 469 config_peer(PROC_QUEUE); 470 471 evtimer_set(&ev, scheduler_timeout, NULL); 472 scheduler_reset_events(); 473 474 if (pledge("stdio", NULL) == -1) 475 err(1, "pledge"); 476 477 event_dispatch(); 478 fatalx("exited event loop"); 479 480 return (0); 481 } 482 483 static void 484 scheduler_timeout(int fd, short event, void *p) 485 { 486 struct timeval tv; 487 size_t i; 488 size_t d_inflight; 489 size_t d_envelope; 490 size_t d_removed; 491 size_t d_expired; 492 size_t d_updated; 493 size_t count; 494 int mask, r, delay; 495 496 tv.tv_sec = 0; 497 tv.tv_usec = 0; 498 499 mask = SCHED_UPDATE; 500 501 if (ninflight < env->sc_scheduler_max_inflight) { 502 mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE; 503 if (!(env->sc_flags & SMTPD_MDA_PAUSED)) 504 mask |= SCHED_MDA; 505 if (!(env->sc_flags & SMTPD_MTA_PAUSED)) 506 mask |= SCHED_MTA; 507 } 508 509 count = env->sc_scheduler_max_schedule; 510 511 log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count); 512 513 r = backend->batch(mask, &delay, &count, evpids, types); 514 515 log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count); 516 517 if (r < 0) 518 fatalx("scheduler: error in batch handler"); 519 520 if (r == 0) { 521 522 if (delay < -1) 523 fatalx("scheduler: invalid delay %d", delay); 524 525 if (delay == -1) { 526 log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); 527 return; 528 } 529 530 tv.tv_sec = delay; 531 tv.tv_usec = 0; 532 log_trace(TRACE_SCHEDULER, 533 "scheduler: waiting for %s", duration_to_text(tv.tv_sec)); 534 evtimer_add(&ev, &tv); 535 return; 536 } 537 538 d_inflight = 0; 539 d_envelope = 0; 540 d_removed = 0; 541 d_expired = 0; 542 d_updated = 0; 543 544 for (i = 0; i < count; i++) { 545 switch(types[i]) { 546 case SCHED_REMOVE: 547 log_debug("debug: scheduler: evp:%016" PRIx64 548 " removed", evpids[i]); 549 m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1); 550 m_add_evpid(p_queue, evpids[i]); 551 m_close(p_queue); 552 d_envelope += 1; 553 d_removed += 1; 554 d_inflight += 1; 555 break; 556 557 case SCHED_EXPIRE: 558 log_debug("debug: scheduler: evp:%016" PRIx64 559 " expired", evpids[i]); 560 m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1); 561 m_add_evpid(p_queue, evpids[i]); 562 m_close(p_queue); 563 d_envelope += 1; 564 d_expired += 1; 565 d_inflight += 1; 566 break; 567 568 case SCHED_UPDATE: 569 log_debug("debug: scheduler: evp:%016" PRIx64 570 " scheduled (update)", evpids[i]); 571 d_updated += 1; 572 break; 573 574 case SCHED_BOUNCE: 575 log_debug("debug: scheduler: evp:%016" PRIx64 576 " scheduled (bounce)", evpids[i]); 577 m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1); 578 m_add_evpid(p_queue, evpids[i]); 579 m_close(p_queue); 580 d_inflight += 1; 581 break; 582 583 case SCHED_MDA: 584 log_debug("debug: scheduler: evp:%016" PRIx64 585 " scheduled (mda)", evpids[i]); 586 m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1); 587 m_add_evpid(p_queue, evpids[i]); 588 m_close(p_queue); 589 d_inflight += 1; 590 break; 591 592 case SCHED_MTA: 593 log_debug("debug: scheduler: evp:%016" PRIx64 594 " scheduled (mta)", evpids[i]); 595 m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1); 596 m_add_evpid(p_queue, evpids[i]); 597 m_close(p_queue); 598 d_inflight += 1; 599 break; 600 } 601 } 602 603 stat_decrement("scheduler.envelope", d_envelope); 604 stat_increment("scheduler.envelope.inflight", d_inflight); 605 stat_increment("scheduler.envelope.expired", d_expired); 606 stat_increment("scheduler.envelope.removed", d_removed); 607 stat_increment("scheduler.envelope.updated", d_updated); 608 609 ninflight += d_inflight; 610 611 tv.tv_sec = 0; 612 tv.tv_usec = 0; 613 evtimer_add(&ev, &tv); 614 } 615