1 /* $OpenBSD: scheduler.c,v 1.47 2014/07/10 14:45:02 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net> 7 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 8 * 9 * Permission to use, copy, modify, and distribute this software for any 10 * purpose with or without fee is hereby granted, provided that the above 11 * copyright notice and this permission notice appear in all copies. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 14 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 15 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 16 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 17 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 18 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 19 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 20 */ 21 22 #include <sys/types.h> 23 #include <sys/queue.h> 24 #include <sys/tree.h> 25 #include <sys/socket.h> 26 #include <sys/stat.h> 27 28 #include <ctype.h> 29 #include <dirent.h> 30 #include <err.h> 31 #include <errno.h> 32 #include <event.h> 33 #include <imsg.h> 34 #include <inttypes.h> 35 #include <libgen.h> 36 #include <pwd.h> 37 #include <signal.h> 38 #include <stdio.h> 39 #include <stdlib.h> 40 #include <string.h> 41 #include <time.h> 42 #include <unistd.h> 43 44 #include "smtpd.h" 45 #include "log.h" 46 47 static void scheduler_imsg(struct mproc *, struct imsg *); 48 static void scheduler_shutdown(void); 49 static void scheduler_sig_handler(int, short, void *); 50 static void scheduler_reset_events(void); 51 static void scheduler_timeout(int, short, void *); 52 53 static struct scheduler_backend *backend = NULL; 54 static struct event ev; 55 static size_t ninflight = 0; 56 static int *types; 57 static uint64_t *evpids; 58 static uint32_t *msgids; 59 static struct evpstate *state; 60 61 extern const char *backend_scheduler; 62 63 void 64 scheduler_imsg(struct mproc *p, struct imsg *imsg) 65 { 66 struct bounce_req_msg req; 67 struct envelope evp; 68 struct scheduler_info si; 69 struct msg m; 70 uint64_t evpid, id, holdq; 71 uint32_t msgid; 72 uint32_t inflight; 73 size_t n, i; 74 time_t timestamp; 75 int v, r, type; 76 77 switch (imsg->hdr.type) { 78 79 case IMSG_QUEUE_ENVELOPE_SUBMIT: 80 m_msg(&m, imsg); 81 m_get_envelope(&m, &evp); 82 m_end(&m); 83 log_trace(TRACE_SCHEDULER, 84 "scheduler: inserting evp:%016" PRIx64, evp.id); 85 scheduler_info(&si, &evp); 86 stat_increment("scheduler.envelope.incoming", 1); 87 backend->insert(&si); 88 return; 89 90 case IMSG_QUEUE_MESSAGE_COMMIT: 91 m_msg(&m, imsg); 92 m_get_msgid(&m, &msgid); 93 m_end(&m); 94 log_trace(TRACE_SCHEDULER, 95 "scheduler: committing msg:%08" PRIx32, msgid); 96 n = backend->commit(msgid); 97 stat_decrement("scheduler.envelope.incoming", n); 98 stat_increment("scheduler.envelope", n); 99 scheduler_reset_events(); 100 return; 101 102 case IMSG_QUEUE_MESSAGE_ROLLBACK: 103 m_msg(&m, imsg); 104 m_get_msgid(&m, &msgid); 105 m_end(&m); 106 log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32, 107 msgid); 108 n = backend->rollback(msgid); 109 stat_decrement("scheduler.envelope.incoming", n); 110 scheduler_reset_events(); 111 return; 112 113 case IMSG_QUEUE_ENVELOPE_REMOVE: 114 m_msg(&m, imsg); 115 m_get_evpid(&m, &evpid); 116 m_get_u32(&m, &inflight); 117 m_end(&m); 118 log_trace(TRACE_SCHEDULER, 119 "scheduler: queue requested removal of evp:%016" PRIx64, 120 evpid); 121 stat_decrement("scheduler.envelope", 1); 122 if (! inflight) 123 backend->remove(evpid); 124 else { 125 backend->delete(evpid); 126 ninflight -= 1; 127 stat_decrement("scheduler.envelope.inflight", 1); 128 } 129 130 scheduler_reset_events(); 131 return; 132 133 case IMSG_QUEUE_ENVELOPE_ACK: 134 m_msg(&m, imsg); 135 m_get_evpid(&m, &evpid); 136 m_end(&m); 137 log_trace(TRACE_SCHEDULER, 138 "scheduler: queue ack removal of evp:%016" PRIx64, 139 evpid); 140 ninflight -= 1; 141 stat_decrement("scheduler.envelope.inflight", 1); 142 scheduler_reset_events(); 143 return; 144 145 case IMSG_QUEUE_DELIVERY_OK: 146 m_msg(&m, imsg); 147 m_get_evpid(&m, &evpid); 148 m_end(&m); 149 log_trace(TRACE_SCHEDULER, 150 "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid); 151 backend->delete(evpid); 152 ninflight -= 1; 153 stat_increment("scheduler.delivery.ok", 1); 154 stat_decrement("scheduler.envelope.inflight", 1); 155 stat_decrement("scheduler.envelope", 1); 156 scheduler_reset_events(); 157 return; 158 159 case IMSG_QUEUE_DELIVERY_TEMPFAIL: 160 m_msg(&m, imsg); 161 m_get_envelope(&m, &evp); 162 m_end(&m); 163 log_trace(TRACE_SCHEDULER, 164 "scheduler: updating evp:%016" PRIx64, evp.id); 165 scheduler_info(&si, &evp); 166 backend->update(&si); 167 ninflight -= 1; 168 stat_increment("scheduler.delivery.tempfail", 1); 169 stat_decrement("scheduler.envelope.inflight", 1); 170 171 for (i = 0; i < MAX_BOUNCE_WARN; i++) { 172 if (env->sc_bounce_warn[i] == 0) 173 break; 174 timestamp = si.creation + env->sc_bounce_warn[i]; 175 if (si.nexttry >= timestamp && 176 si.lastbounce < timestamp) { 177 req.evpid = evp.id; 178 req.timestamp = timestamp; 179 req.bounce.type = B_WARNING; 180 req.bounce.delay = env->sc_bounce_warn[i]; 181 req.bounce.expire = si.expire; 182 m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1, 183 &req, sizeof req); 184 break; 185 } 186 } 187 scheduler_reset_events(); 188 return; 189 190 case IMSG_QUEUE_DELIVERY_PERMFAIL: 191 m_msg(&m, imsg); 192 m_get_evpid(&m, &evpid); 193 m_end(&m); 194 log_trace(TRACE_SCHEDULER, 195 "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid); 196 backend->delete(evpid); 197 ninflight -= 1; 198 stat_increment("scheduler.delivery.permfail", 1); 199 stat_decrement("scheduler.envelope.inflight", 1); 200 stat_decrement("scheduler.envelope", 1); 201 scheduler_reset_events(); 202 return; 203 204 case IMSG_QUEUE_DELIVERY_LOOP: 205 m_msg(&m, imsg); 206 m_get_evpid(&m, &evpid); 207 m_end(&m); 208 log_trace(TRACE_SCHEDULER, 209 "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid); 210 backend->delete(evpid); 211 ninflight -= 1; 212 stat_increment("scheduler.delivery.loop", 1); 213 stat_decrement("scheduler.envelope.inflight", 1); 214 stat_decrement("scheduler.envelope", 1); 215 scheduler_reset_events(); 216 return; 217 218 case IMSG_QUEUE_HOLDQ_HOLD: 219 m_msg(&m, imsg); 220 m_get_evpid(&m, &evpid); 221 m_get_id(&m, &holdq); 222 m_end(&m); 223 log_trace(TRACE_SCHEDULER, 224 "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64, 225 evpid, holdq); 226 backend->hold(evpid, holdq); 227 ninflight -= 1; 228 stat_decrement("scheduler.envelope.inflight", 1); 229 scheduler_reset_events(); 230 return; 231 232 case IMSG_QUEUE_HOLDQ_RELEASE: 233 m_msg(&m, imsg); 234 m_get_int(&m, &type); 235 m_get_id(&m, &holdq); 236 m_get_int(&m, &r); 237 m_end(&m); 238 log_trace(TRACE_SCHEDULER, 239 "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")", 240 r, type, holdq); 241 backend->release(type, holdq, r); 242 scheduler_reset_events(); 243 return; 244 245 case IMSG_CTL_PAUSE_MDA: 246 log_trace(TRACE_SCHEDULER, "scheduler: pausing mda"); 247 env->sc_flags |= SMTPD_MDA_PAUSED; 248 return; 249 250 case IMSG_CTL_RESUME_MDA: 251 log_trace(TRACE_SCHEDULER, "scheduler: resuming mda"); 252 env->sc_flags &= ~SMTPD_MDA_PAUSED; 253 scheduler_reset_events(); 254 return; 255 256 case IMSG_CTL_PAUSE_MTA: 257 log_trace(TRACE_SCHEDULER, "scheduler: pausing mta"); 258 env->sc_flags |= SMTPD_MTA_PAUSED; 259 return; 260 261 case IMSG_CTL_RESUME_MTA: 262 log_trace(TRACE_SCHEDULER, "scheduler: resuming mta"); 263 env->sc_flags &= ~SMTPD_MTA_PAUSED; 264 scheduler_reset_events(); 265 return; 266 267 case IMSG_CTL_VERBOSE: 268 m_msg(&m, imsg); 269 m_get_int(&m, &v); 270 m_end(&m); 271 log_verbose(v); 272 return; 273 274 case IMSG_CTL_PROFILE: 275 m_msg(&m, imsg); 276 m_get_int(&m, &v); 277 m_end(&m); 278 profiling = v; 279 return; 280 281 case IMSG_CTL_LIST_MESSAGES: 282 msgid = *(uint32_t *)(imsg->data); 283 n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size); 284 m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1, 285 msgids, n * sizeof (*msgids)); 286 return; 287 288 case IMSG_CTL_LIST_ENVELOPES: 289 id = *(uint64_t *)(imsg->data); 290 n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size); 291 for (i = 0; i < n; i++) { 292 m_create(p_queue, IMSG_CTL_LIST_ENVELOPES, 293 imsg->hdr.peerid, 0, -1); 294 m_add_evpid(p_queue, state[i].evpid); 295 m_add_int(p_queue, state[i].flags); 296 m_add_time(p_queue, state[i].time); 297 m_close(p_queue); 298 } 299 m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES, 300 imsg->hdr.peerid, 0, -1, NULL, 0); 301 return; 302 303 case IMSG_CTL_SCHEDULE: 304 id = *(uint64_t *)(imsg->data); 305 if (id <= 0xffffffffL) 306 log_debug("debug: scheduler: " 307 "scheduling msg:%08" PRIx64, id); 308 else 309 log_debug("debug: scheduler: " 310 "scheduling evp:%016" PRIx64, id); 311 r = backend->schedule(id); 312 scheduler_reset_events(); 313 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 314 0, -1, NULL, 0); 315 return; 316 317 case IMSG_QUEUE_ENVELOPE_SCHEDULE: 318 id = *(uint64_t *)(imsg->data); 319 backend->schedule(id); 320 scheduler_reset_events(); 321 return; 322 323 case IMSG_CTL_REMOVE: 324 id = *(uint64_t *)(imsg->data); 325 if (id <= 0xffffffffL) 326 log_debug("debug: scheduler: " 327 "removing msg:%08" PRIx64, id); 328 else 329 log_debug("debug: scheduler: " 330 "removing evp:%016" PRIx64, id); 331 r = backend->remove(id); 332 scheduler_reset_events(); 333 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 334 0, -1, NULL, 0); 335 return; 336 337 case IMSG_CTL_PAUSE_EVP: 338 id = *(uint64_t *)(imsg->data); 339 if (id <= 0xffffffffL) 340 log_debug("debug: scheduler: " 341 "suspending msg:%08" PRIx64, id); 342 else 343 log_debug("debug: scheduler: " 344 "suspending evp:%016" PRIx64, id); 345 r = backend->suspend(id); 346 scheduler_reset_events(); 347 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 348 0, -1, NULL, 0); 349 return; 350 351 case IMSG_CTL_RESUME_EVP: 352 id = *(uint64_t *)(imsg->data); 353 if (id <= 0xffffffffL) 354 log_debug("debug: scheduler: " 355 "resuming msg:%08" PRIx64, id); 356 else 357 log_debug("debug: scheduler: " 358 "resuming evp:%016" PRIx64, id); 359 r = backend->resume(id); 360 scheduler_reset_events(); 361 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 362 0, -1, NULL, 0); 363 return; 364 } 365 366 errx(1, "scheduler_imsg: unexpected %s imsg", 367 imsg_to_str(imsg->hdr.type)); 368 } 369 370 static void 371 scheduler_sig_handler(int sig, short event, void *p) 372 { 373 switch (sig) { 374 case SIGINT: 375 case SIGTERM: 376 scheduler_shutdown(); 377 break; 378 default: 379 fatalx("scheduler_sig_handler: unexpected signal"); 380 } 381 } 382 383 static void 384 scheduler_shutdown(void) 385 { 386 log_info("info: scheduler handler exiting"); 387 _exit(0); 388 } 389 390 static void 391 scheduler_reset_events(void) 392 { 393 struct timeval tv; 394 395 evtimer_del(&ev); 396 tv.tv_sec = 0; 397 tv.tv_usec = 0; 398 evtimer_add(&ev, &tv); 399 } 400 401 pid_t 402 scheduler(void) 403 { 404 pid_t pid; 405 struct passwd *pw; 406 struct event ev_sigint; 407 struct event ev_sigterm; 408 409 backend = scheduler_backend_lookup(backend_scheduler); 410 if (backend == NULL) 411 errx(1, "cannot find scheduler backend \"%s\"", 412 backend_scheduler); 413 414 switch (pid = fork()) { 415 case -1: 416 fatal("scheduler: cannot fork"); 417 case 0: 418 post_fork(PROC_SCHEDULER); 419 break; 420 default: 421 return (pid); 422 } 423 424 purge_config(PURGE_EVERYTHING); 425 426 if ((pw = getpwnam(SMTPD_USER)) == NULL) 427 fatalx("unknown user " SMTPD_USER); 428 429 config_process(PROC_SCHEDULER); 430 431 backend->init(backend_scheduler); 432 433 if (chroot(PATH_CHROOT) == -1) 434 fatal("scheduler: chroot"); 435 if (chdir("/") == -1) 436 fatal("scheduler: chdir(\"/\")"); 437 438 if (setgroups(1, &pw->pw_gid) || 439 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 440 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 441 fatal("scheduler: cannot drop privileges"); 442 443 evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids, "scheduler: init evpids"); 444 types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types, "scheduler: init types"); 445 msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids, "scheduler: list msg"); 446 state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state, "scheduler: list evp"); 447 448 imsg_callback = scheduler_imsg; 449 event_init(); 450 451 signal_set(&ev_sigint, SIGINT, scheduler_sig_handler, NULL); 452 signal_set(&ev_sigterm, SIGTERM, scheduler_sig_handler, NULL); 453 signal_add(&ev_sigint, NULL); 454 signal_add(&ev_sigterm, NULL); 455 signal(SIGPIPE, SIG_IGN); 456 signal(SIGHUP, SIG_IGN); 457 458 config_peer(PROC_CONTROL); 459 config_peer(PROC_QUEUE); 460 config_done(); 461 462 evtimer_set(&ev, scheduler_timeout, NULL); 463 scheduler_reset_events(); 464 if (event_dispatch() < 0) 465 fatal("event_dispatch"); 466 scheduler_shutdown(); 467 468 return (0); 469 } 470 471 static void 472 scheduler_timeout(int fd, short event, void *p) 473 { 474 struct timeval tv; 475 size_t i; 476 size_t d_inflight; 477 size_t d_envelope; 478 size_t d_removed; 479 size_t d_expired; 480 size_t d_updated; 481 size_t count; 482 int mask, r, delay; 483 484 tv.tv_sec = 0; 485 tv.tv_usec = 0; 486 487 mask = SCHED_UPDATE; 488 489 if (ninflight < env->sc_scheduler_max_inflight) { 490 mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE; 491 if (!(env->sc_flags & SMTPD_MDA_PAUSED)) 492 mask |= SCHED_MDA; 493 if (!(env->sc_flags & SMTPD_MTA_PAUSED)) 494 mask |= SCHED_MTA; 495 } 496 497 count = env->sc_scheduler_max_schedule; 498 499 log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count); 500 501 r = backend->batch(mask, &delay, &count, evpids, types); 502 503 log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count); 504 505 if (r < 0) 506 fatalx("scheduler: error in batch handler"); 507 508 if (r == 0) { 509 510 if (delay < -1) 511 fatalx("scheduler: invalid delay %d", delay); 512 513 if (delay == -1) { 514 log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); 515 return; 516 } 517 518 tv.tv_sec = delay; 519 tv.tv_usec = 0; 520 log_trace(TRACE_SCHEDULER, 521 "scheduler: waiting for %s", duration_to_text(tv.tv_sec)); 522 evtimer_add(&ev, &tv); 523 return; 524 } 525 526 d_inflight = 0; 527 d_envelope = 0; 528 d_removed = 0; 529 d_expired = 0; 530 d_updated = 0; 531 532 for (i = 0; i < count; i++) { 533 switch(types[i]) { 534 case SCHED_REMOVE: 535 log_debug("debug: scheduler: evp:%016" PRIx64 536 " removed", evpids[i]); 537 m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1); 538 m_add_evpid(p_queue, evpids[i]); 539 m_close(p_queue); 540 d_envelope += 1; 541 d_removed += 1; 542 d_inflight += 1; 543 break; 544 545 case SCHED_EXPIRE: 546 log_debug("debug: scheduler: evp:%016" PRIx64 547 " expired", evpids[i]); 548 m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1); 549 m_add_evpid(p_queue, evpids[i]); 550 m_close(p_queue); 551 d_envelope += 1; 552 d_expired += 1; 553 d_inflight += 1; 554 break; 555 556 case SCHED_UPDATE: 557 log_debug("debug: scheduler: evp:%016" PRIx64 558 " scheduled (update)", evpids[i]); 559 d_updated += 1; 560 break; 561 562 case SCHED_BOUNCE: 563 log_debug("debug: scheduler: evp:%016" PRIx64 564 " scheduled (bounce)", evpids[i]); 565 m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1); 566 m_add_evpid(p_queue, evpids[i]); 567 m_close(p_queue); 568 d_inflight += 1; 569 break; 570 571 case SCHED_MDA: 572 log_debug("debug: scheduler: evp:%016" PRIx64 573 " scheduled (mda)", evpids[i]); 574 m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1); 575 m_add_evpid(p_queue, evpids[i]); 576 m_close(p_queue); 577 d_inflight += 1; 578 break; 579 580 case SCHED_MTA: 581 log_debug("debug: scheduler: evp:%016" PRIx64 582 " scheduled (mta)", evpids[i]); 583 m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1); 584 m_add_evpid(p_queue, evpids[i]); 585 m_close(p_queue); 586 d_inflight += 1; 587 break; 588 } 589 } 590 591 stat_decrement("scheduler.envelope", d_envelope); 592 stat_increment("scheduler.envelope.inflight", d_inflight); 593 stat_increment("scheduler.envelope.expired", d_expired); 594 stat_increment("scheduler.envelope.removed", d_removed); 595 stat_increment("scheduler.envelope.updated", d_updated); 596 597 ninflight += d_inflight; 598 599 tv.tv_sec = 0; 600 tv.tv_usec = 0; 601 evtimer_add(&ev, &tv); 602 } 603