1 /* $OpenBSD: scheduler.c,v 1.6 2012/07/10 11:13:40 gilles Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net> 7 * 8 * Permission to use, copy, modify, and distribute this software for any 9 * purpose with or without fee is hereby granted, provided that the above 10 * copyright notice and this permission notice appear in all copies. 11 * 12 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 13 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 14 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 15 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 16 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 17 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 18 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 19 */ 20 21 #include <sys/types.h> 22 #include <sys/queue.h> 23 #include <sys/tree.h> 24 #include <sys/param.h> 25 #include <sys/socket.h> 26 #include <sys/stat.h> 27 28 #include <ctype.h> 29 #include <dirent.h> 30 #include <err.h> 31 #include <errno.h> 32 #include <event.h> 33 #include <imsg.h> 34 #include <inttypes.h> 35 #include <libgen.h> 36 #include <pwd.h> 37 #include <signal.h> 38 #include <stdio.h> 39 #include <stdlib.h> 40 #include <string.h> 41 #include <time.h> 42 #include <unistd.h> 43 44 #include "smtpd.h" 45 #include "log.h" 46 47 static void scheduler_imsg(struct imsgev *, struct imsg *); 48 static void scheduler_shutdown(void); 49 static void scheduler_sig_handler(int, short, void *); 50 static void scheduler_setup_events(void); 51 static void scheduler_reset_events(void); 52 static void scheduler_disable_events(void); 53 static void scheduler_timeout(int, short, void *); 54 static void scheduler_remove(u_int64_t); 55 static void scheduler_remove_envelope(u_int64_t); 56 static int scheduler_process_envelope(u_int64_t); 57 static int scheduler_process_batch(enum delivery_type, u_int64_t); 58 static int scheduler_check_loop(struct envelope *); 59 static int scheduler_load_message(u_int32_t); 60 61 static struct scheduler_backend *backend = NULL; 62 63 extern const char *backend_scheduler; 64 65 void 66 scheduler_imsg(struct imsgev *iev, struct imsg *imsg) 67 { 68 struct envelope *e, bounce; 69 struct scheduler_info si; 70 71 log_imsg(PROC_SCHEDULER, iev->proc, imsg); 72 73 switch (imsg->hdr.type) { 74 case IMSG_QUEUE_COMMIT_MESSAGE: 75 e = imsg->data; 76 log_trace(TRACE_SCHEDULER, 77 "scheduler: IMSG_QUEUE_COMMIT_MESSAGE: %016"PRIx64, e->id); 78 scheduler_load_message(evpid_to_msgid(e->id)); 79 scheduler_reset_events(); 80 return; 81 82 case IMSG_QUEUE_DELIVERY_OK: 83 stat_decrement(STATS_SCHEDULER); 84 e = imsg->data; 85 log_trace(TRACE_SCHEDULER, 86 "scheduler: IMSG_QUEUE_DELIVERY_OK: %016"PRIx64, e->id); 87 backend->remove(e->id); 88 queue_envelope_delete(e); 89 return; 90 91 case IMSG_QUEUE_DELIVERY_TEMPFAIL: 92 stat_decrement(STATS_SCHEDULER); 93 e = imsg->data; 94 log_trace(TRACE_SCHEDULER, 95 "scheduler: IMSG_QUEUE_DELIVERY_TEMPFAIL: %016"PRIx64, e->id); 96 e->retry++; 97 queue_envelope_update(e); 98 scheduler_info(&si, e); 99 backend->insert(&si); 100 scheduler_reset_events(); 101 return; 102 103 case IMSG_QUEUE_DELIVERY_PERMFAIL: 104 stat_decrement(STATS_SCHEDULER); 105 e = imsg->data; 106 log_trace(TRACE_SCHEDULER, 107 "scheduler: IMSG_QUEUE_DELIVERY_PERMFAIL: %016"PRIx64, e->id); 108 if (e->type != D_BOUNCE && e->sender.user[0] != '\0') { 109 bounce_record_message(e, &bounce); 110 scheduler_info(&si, &bounce); 111 backend->insert(&si); 112 scheduler_reset_events(); 113 } 114 backend->remove(e->id); 115 queue_envelope_delete(e); 116 return; 117 118 case IMSG_MDA_SESS_NEW: 119 log_trace(TRACE_SCHEDULER, "scheduler: IMSG_MDA_SESS_NEW"); 120 stat_decrement(STATS_MDA_SESSION); 121 if (env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE)) 122 env->sc_flags &= ~SMTPD_MDA_BUSY; 123 scheduler_reset_events(); 124 return; 125 126 case IMSG_BATCH_DONE: 127 log_trace(TRACE_SCHEDULER, "scheduler: IMSG_BATCH_DONE"); 128 stat_decrement(STATS_MTA_SESSION); 129 if (env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE)) 130 env->sc_flags &= ~SMTPD_MTA_BUSY; 131 scheduler_reset_events(); 132 return; 133 134 case IMSG_SMTP_ENQUEUE: 135 e = imsg->data; 136 log_trace(TRACE_SCHEDULER, 137 "scheduler: IMSG_SMTP_ENQUEUE: %016"PRIx64, e->id); 138 if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) { 139 queue_envelope_update(e); 140 scheduler_info(&si, e); 141 backend->insert(&si); 142 scheduler_reset_events(); 143 return; 144 } 145 return; 146 147 case IMSG_QUEUE_PAUSE_MDA: 148 log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_PAUSE_MDA"); 149 env->sc_flags |= SMTPD_MDA_PAUSED; 150 return; 151 152 case IMSG_QUEUE_RESUME_MDA: 153 log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_RESUME_MDA"); 154 env->sc_flags &= ~SMTPD_MDA_PAUSED; 155 scheduler_reset_events(); 156 return; 157 158 case IMSG_QUEUE_PAUSE_MTA: 159 log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_PAUSE_MTA"); 160 env->sc_flags |= SMTPD_MTA_PAUSED; 161 return; 162 163 case IMSG_QUEUE_RESUME_MTA: 164 log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_RESUME_MTA"); 165 env->sc_flags &= ~SMTPD_MTA_PAUSED; 166 scheduler_reset_events(); 167 return; 168 169 case IMSG_CTL_VERBOSE: 170 log_trace(TRACE_SCHEDULER, "scheduler: IMSG_CTL_VERBOSE"); 171 log_verbose(*(int *)imsg->data); 172 return; 173 174 case IMSG_SCHEDULER_SCHEDULE: 175 log_trace(TRACE_SCHEDULER, 176 "scheduler: IMSG_SCHEDULER_SCHEDULE: %016"PRIx64, 177 *(u_int64_t *)imsg->data); 178 backend->force(*(u_int64_t *)imsg->data); 179 scheduler_reset_events(); 180 return; 181 182 case IMSG_SCHEDULER_REMOVE: 183 log_trace(TRACE_SCHEDULER, 184 "scheduler: IMSG_SCHEDULER_REMOVE: %016"PRIx64, 185 *(u_int64_t *)imsg->data); 186 scheduler_remove(*(u_int64_t *)imsg->data); 187 scheduler_reset_events(); 188 return; 189 190 } 191 192 errx(1, "scheduler_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type)); 193 } 194 195 void 196 scheduler_sig_handler(int sig, short event, void *p) 197 { 198 switch (sig) { 199 case SIGINT: 200 case SIGTERM: 201 scheduler_shutdown(); 202 break; 203 default: 204 fatalx("scheduler_sig_handler: unexpected signal"); 205 } 206 } 207 208 void 209 scheduler_shutdown(void) 210 { 211 log_info("scheduler handler exiting"); 212 _exit(0); 213 } 214 215 void 216 scheduler_setup_events(void) 217 { 218 struct timeval tv; 219 220 evtimer_set(&env->sc_ev, scheduler_timeout, NULL); 221 tv.tv_sec = 0; 222 tv.tv_usec = 10; 223 evtimer_add(&env->sc_ev, &tv); 224 } 225 226 void 227 scheduler_reset_events(void) 228 { 229 struct timeval tv; 230 231 tv.tv_sec = 0; 232 tv.tv_usec = 10; 233 evtimer_add(&env->sc_ev, &tv); 234 } 235 236 void 237 scheduler_disable_events(void) 238 { 239 evtimer_del(&env->sc_ev); 240 } 241 242 pid_t 243 scheduler(void) 244 { 245 pid_t pid; 246 struct passwd *pw; 247 248 struct event ev_sigint; 249 struct event ev_sigterm; 250 251 struct peer peers[] = { 252 { PROC_CONTROL, imsg_dispatch }, 253 { PROC_QUEUE, imsg_dispatch } 254 }; 255 256 switch (pid = fork()) { 257 case -1: 258 fatal("scheduler: cannot fork"); 259 case 0: 260 break; 261 default: 262 return (pid); 263 } 264 265 purge_config(PURGE_EVERYTHING); 266 267 pw = env->sc_pw; 268 269 if (chroot(PATH_SPOOL) == -1) 270 fatal("scheduler: chroot"); 271 if (chdir("/") == -1) 272 fatal("scheduler: chdir(\"/\")"); 273 274 smtpd_process = PROC_SCHEDULER; 275 setproctitle("%s", env->sc_title[smtpd_process]); 276 277 if (setgroups(1, &pw->pw_gid) || 278 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 279 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 280 fatal("scheduler: cannot drop privileges"); 281 282 /* see fdlimit()-related comment in queue.c */ 283 fdlimit(1.0); 284 if ((env->sc_maxconn = availdesc() / 4) < 1) 285 fatalx("scheduler: fd starvation"); 286 287 env->sc_scheduler = scheduler_backend_lookup(backend_scheduler); 288 if (env->sc_scheduler == NULL) 289 errx(1, "cannot find scheduler backend \"%s\"", backend_scheduler); 290 backend = env->sc_scheduler; 291 292 backend->init(); 293 294 imsg_callback = scheduler_imsg; 295 event_init(); 296 297 signal_set(&ev_sigint, SIGINT, scheduler_sig_handler, NULL); 298 signal_set(&ev_sigterm, SIGTERM, scheduler_sig_handler, NULL); 299 signal_add(&ev_sigint, NULL); 300 signal_add(&ev_sigterm, NULL); 301 signal(SIGPIPE, SIG_IGN); 302 signal(SIGHUP, SIG_IGN); 303 304 config_pipes(peers, nitems(peers)); 305 config_peers(peers, nitems(peers)); 306 307 scheduler_setup_events(); 308 event_dispatch(); 309 scheduler_disable_events(); 310 scheduler_shutdown(); 311 312 return (0); 313 } 314 315 void 316 scheduler_timeout(int fd, short event, void *p) 317 { 318 time_t nsched; 319 time_t curtm; 320 u_int64_t evpid; 321 static int setup = 0; 322 int delay = 0; 323 struct timeval tv; 324 325 log_trace(TRACE_SCHEDULER, "scheduler: entering scheduler_timeout"); 326 327 /* if we're not done setting up the scheduler, do it some more */ 328 if (! setup) 329 setup = backend->setup(); 330 331 /* we don't have a schedulable envelope ... sleep */ 332 if (! backend->next(&evpid, &nsched)) 333 goto scheduler_sleep; 334 335 /* is the envelope schedulable right away ? */ 336 curtm = time(NULL); 337 if (nsched <= curtm) { 338 /* yup */ 339 scheduler_process_envelope(evpid); 340 } 341 else { 342 /* nope, so we can either keep the timeout delay to 0 if we 343 * are not done setting up the scheduler, or sleep until it 344 * is time to schedule that envelope otherwise. 345 */ 346 if (setup) 347 delay = nsched - curtm; 348 } 349 350 if (delay) 351 log_info("scheduler: pausing for %d seconds", delay); 352 tv.tv_sec = delay; 353 tv.tv_usec = 0; 354 evtimer_add(&env->sc_ev, &tv); 355 return; 356 357 scheduler_sleep: 358 log_info("scheduler: sleeping"); 359 return; 360 } 361 362 static int 363 scheduler_process_envelope(u_int64_t evpid) 364 { 365 struct envelope envelope; 366 size_t mta_av, mda_av, bnc_av; 367 struct scheduler_info si; 368 369 mta_av = env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE); 370 mda_av = env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE); 371 bnc_av = env->sc_maxconn - stat_get(STATS_SCHEDULER_BOUNCES, STAT_ACTIVE); 372 373 if (! queue_envelope_load(evpid, &envelope)) 374 return 0; 375 376 if (envelope.type == D_MDA) 377 if (mda_av == 0) { 378 env->sc_flags |= SMTPD_MDA_BUSY; 379 return 0; 380 } 381 382 if (envelope.type == D_MTA) 383 if (mta_av == 0) { 384 env->sc_flags |= SMTPD_MTA_BUSY; 385 return 0; 386 } 387 388 if (envelope.type == D_BOUNCE) 389 if (bnc_av == 0) { 390 env->sc_flags |= SMTPD_BOUNCE_BUSY; 391 return 0; 392 } 393 394 if (scheduler_check_loop(&envelope)) { 395 struct envelope bounce; 396 397 envelope_set_errormsg(&envelope, "loop has been detected"); 398 if (bounce_record_message(&envelope, &bounce)) { 399 scheduler_info(&si, &bounce); 400 backend->insert(&si); 401 } 402 backend->remove(evpid); 403 queue_envelope_delete(&envelope); 404 405 scheduler_reset_events(); 406 407 return 0; 408 } 409 410 411 return scheduler_process_batch(envelope.type, evpid); 412 } 413 414 static int 415 scheduler_process_batch(enum delivery_type type, u_int64_t evpid) 416 { 417 struct envelope evp; 418 void *batch; 419 int fd; 420 421 batch = backend->batch(evpid); 422 switch (type) { 423 case D_BOUNCE: 424 while (backend->fetch(batch, &evpid)) { 425 if (! queue_envelope_load(evpid, &evp)) 426 goto end; 427 428 evp.lasttry = time(NULL); 429 imsg_compose_event(env->sc_ievs[PROC_QUEUE], 430 IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp, 431 sizeof evp); 432 backend->schedule(evpid); 433 } 434 stat_increment(STATS_SCHEDULER); 435 stat_increment(STATS_SCHEDULER_BOUNCES); 436 break; 437 438 case D_MDA: 439 backend->fetch(batch, &evpid); 440 if (! queue_envelope_load(evpid, &evp)) 441 goto end; 442 443 evp.lasttry = time(NULL); 444 fd = queue_message_fd_r(evpid_to_msgid(evpid)); 445 imsg_compose_event(env->sc_ievs[PROC_QUEUE], 446 IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp, 447 sizeof evp); 448 backend->schedule(evpid); 449 450 stat_increment(STATS_SCHEDULER); 451 stat_increment(STATS_MDA_SESSION); 452 break; 453 454 case D_MTA: { 455 struct mta_batch mta_batch; 456 457 /* FIXME */ 458 if (! backend->fetch(batch, &evpid)) 459 goto end; 460 if (! queue_envelope_load(evpid, &evp)) 461 goto end; 462 463 bzero(&mta_batch, sizeof mta_batch); 464 mta_batch.id = arc4random(); 465 mta_batch.relay = evp.agent.mta.relay; 466 467 imsg_compose_event(env->sc_ievs[PROC_QUEUE], 468 IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch, 469 sizeof mta_batch); 470 471 while (backend->fetch(batch, &evpid)) { 472 if (! queue_envelope_load(evpid, &evp)) 473 goto end; 474 evp.lasttry = time(NULL); /* FIXME */ 475 evp.batch_id = mta_batch.id; 476 477 imsg_compose_event(env->sc_ievs[PROC_QUEUE], 478 IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &evp, 479 sizeof evp); 480 481 backend->schedule(evpid); 482 stat_increment(STATS_SCHEDULER); 483 } 484 485 imsg_compose_event(env->sc_ievs[PROC_QUEUE], 486 IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, &mta_batch, 487 sizeof mta_batch); 488 489 stat_increment(STATS_MTA_SESSION); 490 break; 491 } 492 493 default: 494 fatalx("scheduler_process_batchqueue: unknown type"); 495 } 496 497 end: 498 backend->close(batch); 499 return 1; 500 } 501 502 static int 503 scheduler_load_message(u_int32_t msgid) 504 { 505 struct qwalk *q; 506 u_int64_t evpid; 507 struct envelope envelope; 508 struct scheduler_info si; 509 510 q = qwalk_new(msgid); 511 while (qwalk(q, &evpid)) { 512 if (! queue_envelope_load(evpid, &envelope)) 513 continue; 514 scheduler_info(&si, &envelope); 515 backend->insert(&si); 516 } 517 qwalk_close(q); 518 519 return 1; 520 } 521 522 static int 523 scheduler_check_loop(struct envelope *ep) 524 { 525 int fd; 526 FILE *fp; 527 char *buf, *lbuf; 528 size_t len; 529 struct mailaddr maddr; 530 int ret = 0; 531 int rcvcount = 0; 532 533 fd = queue_message_fd_r(evpid_to_msgid(ep->id)); 534 if ((fp = fdopen(fd, "r")) == NULL) 535 fatal("fdopen"); 536 537 lbuf = NULL; 538 while ((buf = fgetln(fp, &len))) { 539 if (buf[len - 1] == '\n') 540 buf[len - 1] = '\0'; 541 else { 542 /* EOF without EOL, copy and add the NUL */ 543 if ((lbuf = malloc(len + 1)) == NULL) 544 err(1, NULL); 545 memcpy(lbuf, buf, len); 546 lbuf[len] = '\0'; 547 buf = lbuf; 548 } 549 550 if (strchr(buf, ':') == NULL && !isspace((int)*buf)) 551 break; 552 553 if (strncasecmp("Received: ", buf, 10) == 0) { 554 rcvcount++; 555 if (rcvcount == MAX_HOPS_COUNT) { 556 ret = 1; 557 break; 558 } 559 } 560 561 else if (strncasecmp("Delivered-To: ", buf, 14) == 0) { 562 struct mailaddr dest; 563 564 bzero(&maddr, sizeof (struct mailaddr)); 565 if (! email_to_mailaddr(&maddr, buf + 14)) 566 continue; 567 568 dest = ep->dest; 569 if (ep->type == D_BOUNCE) 570 dest = ep->sender; 571 572 if (strcasecmp(maddr.user, dest.user) == 0 && 573 strcasecmp(maddr.domain, dest.domain) == 0) { 574 ret = 1; 575 break; 576 } 577 } 578 } 579 free(lbuf); 580 581 fclose(fp); 582 return ret; 583 } 584 585 static void 586 scheduler_remove(u_int64_t id) 587 { 588 void *msg; 589 590 /* removing by evpid */ 591 if (id > 0xffffffffL) { 592 scheduler_remove_envelope(id); 593 return; 594 } 595 596 /* removing by msgid */ 597 msg = backend->message(id); 598 while (backend->fetch(msg, &id)) 599 scheduler_remove_envelope(id); 600 backend->close(msg); 601 } 602 603 static void 604 scheduler_remove_envelope(u_int64_t evpid) 605 { 606 struct envelope evp; 607 608 evp.id = evpid; 609 queue_envelope_delete(&evp); 610 backend->remove(evpid); 611 } 612