1 /* $OpenBSD: queue.c,v 1.58 2009/03/29 14:18:20 jacekm Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * 7 * Permission to use, copy, modify, and distribute this software for any 8 * purpose with or without fee is hereby granted, provided that the above 9 * copyright notice and this permission notice appear in all copies. 10 * 11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 18 */ 19 20 #include <sys/types.h> 21 #include <sys/queue.h> 22 #include <sys/tree.h> 23 #include <sys/param.h> 24 #include <sys/socket.h> 25 #include <sys/stat.h> 26 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <libgen.h> 31 #include <pwd.h> 32 #include <signal.h> 33 #include <stdio.h> 34 #include <stdlib.h> 35 #include <string.h> 36 #include <unistd.h> 37 38 #include "smtpd.h" 39 40 __dead void queue_shutdown(void); 41 void queue_sig_handler(int, short, void *); 42 void queue_dispatch_control(int, short, void *); 43 void queue_dispatch_smtp(int, short, void *); 44 void queue_dispatch_mda(int, short, void *); 45 void queue_dispatch_mta(int, short, void *); 46 void queue_dispatch_lka(int, short, void *); 47 void queue_dispatch_runner(int, short, void *); 48 void queue_setup_events(struct smtpd *); 49 void queue_disable_events(struct smtpd *); 50 void queue_purge(char *); 51 52 int queue_create_layout_message(char *, char *); 53 void queue_delete_layout_message(char *, char *); 54 int queue_record_layout_envelope(char *, struct message *); 55 int queue_remove_layout_envelope(char *, struct message *); 56 int queue_commit_layout_message(char *, struct message *); 57 int queue_open_layout_messagefile(char *, struct message *); 58 59 struct s_queue s_queue; 60 61 void 62 queue_sig_handler(int sig, short event, void *p) 63 { 64 switch (sig) { 65 case SIGINT: 66 case SIGTERM: 67 queue_shutdown(); 68 break; 69 default: 70 fatalx("queue_sig_handler: unexpected signal"); 71 } 72 } 73 74 void 75 queue_dispatch_control(int sig, short event, void *p) 76 { 77 struct smtpd *env = p; 78 struct imsgbuf *ibuf; 79 struct imsg imsg; 80 ssize_t n; 81 82 ibuf = env->sc_ibufs[PROC_CONTROL]; 83 switch (event) { 84 case EV_READ: 85 if ((n = imsg_read(ibuf)) == -1) 86 fatal("imsg_read_error"); 87 if (n == 0) { 88 /* this pipe is dead, so remove the event handler */ 89 event_del(&ibuf->ev); 90 event_loopexit(NULL); 91 return; 92 } 93 break; 94 case EV_WRITE: 95 if (msgbuf_write(&ibuf->w) == -1) 96 fatal("msgbuf_write"); 97 imsg_event_add(ibuf); 98 return; 99 default: 100 fatalx("unknown event"); 101 } 102 103 for (;;) { 104 if ((n = imsg_get(ibuf, &imsg)) == -1) 105 fatal("queue_dispatch_control: imsg_read error"); 106 if (n == 0) 107 break; 108 109 switch (imsg.hdr.type) { 110 case IMSG_QUEUE_CREATE_MESSAGE: { 111 struct message *messagep; 112 struct submit_status ss; 113 114 log_debug("queue_dispatch_control: creating message file"); 115 messagep = imsg.data; 116 117 ss.id = messagep->session_id; 118 ss.code = 250; 119 bzero(ss.u.msgid, MAX_ID_SIZE); 120 121 if (! enqueue_create_layout(ss.u.msgid)) 122 ss.code = 421; 123 124 imsg_compose(ibuf, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, 125 &ss, sizeof(ss)); 126 127 break; 128 } 129 case IMSG_QUEUE_MESSAGE_FILE: { 130 int fd; 131 struct submit_status ss; 132 struct message *messagep; 133 134 messagep = imsg.data; 135 ss.msg = *messagep; 136 ss.id = messagep->session_id; 137 ss.code = 250; 138 fd = enqueue_open_messagefile(messagep); 139 if (fd == -1) 140 ss.code = 421; 141 142 imsg_compose(ibuf, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, &ss, 143 sizeof(ss)); 144 145 break; 146 } 147 case IMSG_QUEUE_COMMIT_MESSAGE: { 148 struct message *messagep; 149 struct submit_status ss; 150 151 messagep = imsg.data; 152 ss.id = messagep->session_id; 153 154 ss.code = 250; 155 if (enqueue_commit_message(messagep)) 156 s_queue.inserts_local++; 157 else 158 ss.code = 421; 159 160 imsg_compose(ibuf, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, 161 &ss, sizeof(ss)); 162 163 break; 164 } 165 case IMSG_STATS: { 166 struct stats *s; 167 168 s = imsg.data; 169 s->u.queue = s_queue; 170 imsg_compose(ibuf, IMSG_STATS, 0, 0, -1, s, sizeof(*s)); 171 break; 172 } 173 default: 174 log_warnx("queue_dispatch_control: got imsg %d", 175 imsg.hdr.type); 176 fatalx("queue_dispatch_control: unexpected imsg"); 177 } 178 imsg_free(&imsg); 179 } 180 imsg_event_add(ibuf); 181 } 182 183 void 184 queue_dispatch_smtp(int sig, short event, void *p) 185 { 186 struct smtpd *env = p; 187 struct imsgbuf *ibuf; 188 struct imsg imsg; 189 ssize_t n; 190 191 ibuf = env->sc_ibufs[PROC_SMTP]; 192 switch (event) { 193 case EV_READ: 194 if ((n = imsg_read(ibuf)) == -1) 195 fatal("imsg_read_error"); 196 if (n == 0) { 197 /* this pipe is dead, so remove the event handler */ 198 event_del(&ibuf->ev); 199 event_loopexit(NULL); 200 return; 201 } 202 break; 203 case EV_WRITE: 204 if (msgbuf_write(&ibuf->w) == -1) 205 fatal("msgbuf_write"); 206 imsg_event_add(ibuf); 207 return; 208 default: 209 fatalx("unknown event"); 210 } 211 212 for (;;) { 213 if ((n = imsg_get(ibuf, &imsg)) == -1) 214 fatal("queue_dispatch_smtp: imsg_read error"); 215 if (n == 0) 216 break; 217 218 switch (imsg.hdr.type) { 219 case IMSG_QUEUE_CREATE_MESSAGE: { 220 struct message *messagep; 221 struct submit_status ss; 222 223 log_debug("queue_dispatch_smtp: creating message file"); 224 messagep = imsg.data; 225 ss.id = messagep->session_id; 226 ss.code = 250; 227 bzero(ss.u.msgid, MAX_ID_SIZE); 228 229 if (! queue_create_incoming_layout(ss.u.msgid)) 230 ss.code = 421; 231 232 imsg_compose(ibuf, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, 233 &ss, sizeof(ss)); 234 break; 235 } 236 case IMSG_QUEUE_REMOVE_MESSAGE: { 237 struct message *messagep; 238 239 messagep = imsg.data; 240 queue_delete_incoming_message(messagep->message_id); 241 break; 242 } 243 case IMSG_QUEUE_COMMIT_MESSAGE: { 244 struct message *messagep; 245 struct submit_status ss; 246 247 messagep = imsg.data; 248 ss.id = messagep->session_id; 249 250 if (queue_commit_incoming_message(messagep)) 251 s_queue.inserts_remote++; 252 else 253 ss.code = 421; 254 255 imsg_compose(ibuf, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, 256 &ss, sizeof(ss)); 257 258 break; 259 } 260 case IMSG_QUEUE_MESSAGE_FILE: { 261 struct message *messagep; 262 struct submit_status ss; 263 int fd; 264 265 messagep = imsg.data; 266 ss.id = messagep->session_id; 267 268 fd = queue_open_incoming_message_file(messagep); 269 if (fd == -1) 270 ss.code = 421; 271 272 imsg_compose(ibuf, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, 273 &ss, sizeof(ss)); 274 break; 275 } 276 default: 277 log_warnx("queue_dispatch_smtp: got imsg %d", 278 imsg.hdr.type); 279 fatalx("queue_dispatch_smtp: unexpected imsg"); 280 } 281 imsg_free(&imsg); 282 } 283 imsg_event_add(ibuf); 284 } 285 286 void 287 queue_dispatch_mda(int sig, short event, void *p) 288 { 289 struct smtpd *env = p; 290 struct imsgbuf *ibuf; 291 struct imsg imsg; 292 ssize_t n; 293 294 ibuf = env->sc_ibufs[PROC_MDA]; 295 switch (event) { 296 case EV_READ: 297 if ((n = imsg_read(ibuf)) == -1) 298 fatal("imsg_read_error"); 299 if (n == 0) { 300 /* this pipe is dead, so remove the event handler */ 301 event_del(&ibuf->ev); 302 event_loopexit(NULL); 303 return; 304 } 305 break; 306 case EV_WRITE: 307 if (msgbuf_write(&ibuf->w) == -1) 308 fatal("msgbuf_write"); 309 imsg_event_add(ibuf); 310 return; 311 default: 312 fatalx("unknown event"); 313 } 314 315 for (;;) { 316 if ((n = imsg_get(ibuf, &imsg)) == -1) 317 fatal("queue_dispatch_mda: imsg_read error"); 318 if (n == 0) 319 break; 320 321 switch (imsg.hdr.type) { 322 323 case IMSG_QUEUE_MESSAGE_UPDATE: { 324 imsg_compose(env->sc_ibufs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE, 325 0, 0, -1, imsg.data, sizeof(struct message)); 326 break; 327 } 328 329 default: 330 log_warnx("got imsg %d", imsg.hdr.type); 331 fatalx("queue_dispatch_mda: unexpected imsg"); 332 } 333 imsg_free(&imsg); 334 } 335 imsg_event_add(ibuf); 336 } 337 338 void 339 queue_dispatch_mta(int sig, short event, void *p) 340 { 341 struct smtpd *env = p; 342 struct imsgbuf *ibuf; 343 struct imsg imsg; 344 ssize_t n; 345 346 ibuf = env->sc_ibufs[PROC_MTA]; 347 switch (event) { 348 case EV_READ: 349 if ((n = imsg_read(ibuf)) == -1) 350 fatal("imsg_read_error"); 351 if (n == 0) { 352 /* this pipe is dead, so remove the event handler */ 353 event_del(&ibuf->ev); 354 event_loopexit(NULL); 355 return; 356 } 357 break; 358 case EV_WRITE: 359 if (msgbuf_write(&ibuf->w) == -1) 360 fatal("msgbuf_write"); 361 imsg_event_add(ibuf); 362 return; 363 default: 364 fatalx("unknown event"); 365 } 366 367 for (;;) { 368 if ((n = imsg_get(ibuf, &imsg)) == -1) 369 fatal("queue_dispatch_mda: imsg_read error"); 370 if (n == 0) 371 break; 372 373 switch (imsg.hdr.type) { 374 375 case IMSG_QUEUE_MESSAGE_FD: { 376 int fd; 377 struct batch *batchp; 378 379 batchp = imsg.data; 380 fd = queue_open_message_file(batchp->message_id); 381 imsg_compose(ibuf, IMSG_QUEUE_MESSAGE_FD, 0, 0, fd, batchp, 382 sizeof(*batchp)); 383 break; 384 } 385 386 case IMSG_QUEUE_MESSAGE_UPDATE: { 387 imsg_compose(env->sc_ibufs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE, 388 0, 0, -1, imsg.data, sizeof(struct message)); 389 break; 390 } 391 392 default: 393 log_warnx("got imsg %d", imsg.hdr.type); 394 fatalx("queue_dispatch_mda: unexpected imsg"); 395 } 396 imsg_free(&imsg); 397 } 398 imsg_event_add(ibuf); 399 } 400 401 void 402 queue_dispatch_lka(int sig, short event, void *p) 403 { 404 struct smtpd *env = p; 405 struct imsgbuf *ibuf; 406 struct imsg imsg; 407 ssize_t n; 408 409 ibuf = env->sc_ibufs[PROC_LKA]; 410 switch (event) { 411 case EV_READ: 412 if ((n = imsg_read(ibuf)) == -1) 413 fatal("imsg_read_error"); 414 if (n == 0) { 415 /* this pipe is dead, so remove the event handler */ 416 event_del(&ibuf->ev); 417 event_loopexit(NULL); 418 return; 419 } 420 break; 421 case EV_WRITE: 422 if (msgbuf_write(&ibuf->w) == -1) 423 fatal("msgbuf_write"); 424 imsg_event_add(ibuf); 425 return; 426 default: 427 fatalx("unknown event"); 428 } 429 430 for (;;) { 431 if ((n = imsg_get(ibuf, &imsg)) == -1) 432 fatal("queue_dispatch_lka: imsg_read error"); 433 if (n == 0) 434 break; 435 436 switch (imsg.hdr.type) { 437 438 case IMSG_QUEUE_SUBMIT_ENVELOPE: { 439 struct message *messagep; 440 struct submit_status ss; 441 int (*f)(struct message *); 442 enum smtp_proc_type peer; 443 444 messagep = imsg.data; 445 messagep->id = queue_generate_id(); 446 ss.id = messagep->session_id; 447 448 if (IS_MAILBOX(messagep->recipient.rule.r_action) || 449 IS_EXT(messagep->recipient.rule.r_action)) 450 messagep->type = T_MDA_MESSAGE; 451 else 452 messagep->type = T_MTA_MESSAGE; 453 454 /* Write to disk */ 455 if (messagep->flags & F_MESSAGE_ENQUEUED) { 456 f = enqueue_record_envelope; 457 peer = PROC_CONTROL; 458 } 459 else { 460 f = queue_record_incoming_envelope; 461 peer = PROC_SMTP; 462 } 463 464 if (! f(messagep)) { 465 ss.code = 421; 466 imsg_compose(env->sc_ibufs[peer], IMSG_QUEUE_TEMPFAIL, 467 0, 0, -1, &ss, sizeof(ss)); 468 } 469 470 break; 471 } 472 473 case IMSG_QUEUE_COMMIT_ENVELOPES: { 474 struct message *messagep; 475 struct submit_status ss; 476 enum smtp_proc_type peer; 477 478 messagep = imsg.data; 479 ss.id = messagep->session_id; 480 ss.code = 250; 481 482 if (messagep->flags & F_MESSAGE_ENQUEUED) 483 peer = PROC_CONTROL; 484 else 485 peer = PROC_SMTP; 486 487 imsg_compose(env->sc_ibufs[peer], IMSG_QUEUE_COMMIT_ENVELOPES, 488 0, 0, -1, &ss, sizeof(ss)); 489 490 break; 491 } 492 493 default: 494 log_warnx("got imsg %d", imsg.hdr.type); 495 fatalx("queue_dispatch_lka: unexpected imsg"); 496 } 497 imsg_free(&imsg); 498 } 499 imsg_event_add(ibuf); 500 } 501 502 void 503 queue_dispatch_runner(int sig, short event, void *p) 504 { 505 struct smtpd *env = p; 506 struct imsgbuf *ibuf; 507 struct imsg imsg; 508 ssize_t n; 509 510 ibuf = env->sc_ibufs[PROC_RUNNER]; 511 switch (event) { 512 case EV_READ: 513 if ((n = imsg_read(ibuf)) == -1) 514 fatal("imsg_read_error"); 515 if (n == 0) { 516 /* this pipe is dead, so remove the event handler */ 517 event_del(&ibuf->ev); 518 event_loopexit(NULL); 519 return; 520 } 521 break; 522 case EV_WRITE: 523 if (msgbuf_write(&ibuf->w) == -1) 524 fatal("msgbuf_write"); 525 imsg_event_add(ibuf); 526 return; 527 default: 528 fatalx("unknown event"); 529 } 530 531 for (;;) { 532 if ((n = imsg_get(ibuf, &imsg)) == -1) 533 fatal("queue_dispatch_runner: imsg_read error"); 534 if (n == 0) 535 break; 536 537 switch (imsg.hdr.type) { 538 default: 539 log_warnx("got imsg %d", imsg.hdr.type); 540 fatalx("queue_dispatch_runner: unexpected imsg"); 541 } 542 imsg_free(&imsg); 543 } 544 imsg_event_add(ibuf); 545 } 546 547 void 548 queue_shutdown(void) 549 { 550 log_info("queue handler"); 551 _exit(0); 552 } 553 554 void 555 queue_setup_events(struct smtpd *env) 556 { 557 } 558 559 void 560 queue_disable_events(struct smtpd *env) 561 { 562 } 563 564 pid_t 565 queue(struct smtpd *env) 566 { 567 pid_t pid; 568 struct passwd *pw; 569 570 struct event ev_sigint; 571 struct event ev_sigterm; 572 573 struct peer peers[] = { 574 { PROC_CONTROL, queue_dispatch_control }, 575 { PROC_SMTP, queue_dispatch_smtp }, 576 { PROC_MDA, queue_dispatch_mda }, 577 { PROC_MTA, queue_dispatch_mta }, 578 { PROC_LKA, queue_dispatch_lka }, 579 { PROC_RUNNER, queue_dispatch_runner } 580 }; 581 582 switch (pid = fork()) { 583 case -1: 584 fatal("queue: cannot fork"); 585 case 0: 586 break; 587 default: 588 return (pid); 589 } 590 591 purge_config(env, PURGE_EVERYTHING); 592 593 pw = env->sc_pw; 594 595 #ifndef DEBUG 596 if (chroot(PATH_SPOOL) == -1) 597 fatal("queue: chroot"); 598 if (chdir("/") == -1) 599 fatal("queue: chdir(\"/\")"); 600 #else 601 #warning disabling privilege revocation and chroot in DEBUG MODE 602 #endif 603 604 setproctitle("queue handler"); 605 smtpd_process = PROC_QUEUE; 606 607 #ifndef DEBUG 608 if (setgroups(1, &pw->pw_gid) || 609 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 610 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 611 fatal("queue: cannot drop privileges"); 612 #endif 613 614 event_init(); 615 616 signal_set(&ev_sigint, SIGINT, queue_sig_handler, env); 617 signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, env); 618 signal_add(&ev_sigint, NULL); 619 signal_add(&ev_sigterm, NULL); 620 signal(SIGPIPE, SIG_IGN); 621 signal(SIGHUP, SIG_IGN); 622 623 config_pipes(env, peers, 6); 624 config_peers(env, peers, 6); 625 626 queue_purge(PATH_INCOMING); 627 queue_purge(PATH_ENQUEUE); 628 629 queue_setup_events(env); 630 event_dispatch(); 631 queue_shutdown(); 632 633 return (0); 634 } 635 636 int 637 queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct message *messagep) 638 { 639 TAILQ_REMOVE(&batchp->messages, messagep, entry); 640 bzero(messagep, sizeof(struct message)); 641 free(messagep); 642 643 if (TAILQ_FIRST(&batchp->messages) == NULL) { 644 SPLAY_REMOVE(batchtree, &env->batch_queue, batchp); 645 bzero(batchp, sizeof(struct batch)); 646 free(batchp); 647 return 1; 648 } 649 return 0; 650 } 651 652 struct batch * 653 batch_by_id(struct smtpd *env, u_int64_t id) 654 { 655 struct batch lookup; 656 657 lookup.id = id; 658 return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); 659 } 660 661 662 struct message * 663 message_by_id(struct smtpd *env, struct batch *batchp, u_int64_t id) 664 { 665 struct message *messagep; 666 667 if (batchp != NULL) { 668 TAILQ_FOREACH(messagep, &batchp->messages, entry) { 669 if (messagep->id == id) 670 break; 671 } 672 return messagep; 673 } 674 675 SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) { 676 TAILQ_FOREACH(messagep, &batchp->messages, entry) { 677 if (messagep->id == id) 678 return messagep; 679 } 680 } 681 return NULL; 682 } 683 684 void 685 queue_purge(char *queuepath) 686 { 687 char path[MAXPATHLEN]; 688 struct qwalk *q; 689 690 q = qwalk_new(queuepath); 691 692 while (qwalk(q, path)) 693 queue_delete_layout_message(queuepath, basename(path)); 694 695 qwalk_close(q); 696 } 697