1 /* $OpenBSD: queue.c,v 1.61 2009/04/21 14:37:32 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * 7 * Permission to use, copy, modify, and distribute this software for any 8 * purpose with or without fee is hereby granted, provided that the above 9 * copyright notice and this permission notice appear in all copies. 10 * 11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 18 */ 19 20 #include <sys/types.h> 21 #include <sys/queue.h> 22 #include <sys/tree.h> 23 #include <sys/param.h> 24 #include <sys/socket.h> 25 #include <sys/stat.h> 26 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <libgen.h> 31 #include <pwd.h> 32 #include <signal.h> 33 #include <stdio.h> 34 #include <stdlib.h> 35 #include <string.h> 36 #include <unistd.h> 37 38 #include "smtpd.h" 39 40 __dead void queue_shutdown(void); 41 void queue_sig_handler(int, short, void *); 42 void queue_dispatch_control(int, short, void *); 43 void queue_dispatch_smtp(int, short, void *); 44 void queue_dispatch_mda(int, short, void *); 45 void queue_dispatch_mta(int, short, void *); 46 void queue_dispatch_lka(int, short, void *); 47 void queue_dispatch_runner(int, short, void *); 48 void queue_setup_events(struct smtpd *); 49 void queue_disable_events(struct smtpd *); 50 void queue_purge(char *); 51 52 int queue_create_layout_message(char *, char *); 53 void queue_delete_layout_message(char *, char *); 54 int queue_record_layout_envelope(char *, struct message *); 55 int queue_remove_layout_envelope(char *, struct message *); 56 int queue_commit_layout_message(char *, struct message *); 57 int queue_open_layout_messagefile(char *, struct message *); 58 59 struct s_queue s_queue; 60 61 void 62 queue_sig_handler(int sig, short event, void *p) 63 { 64 switch (sig) { 65 case SIGINT: 66 case SIGTERM: 67 queue_shutdown(); 68 break; 69 default: 70 fatalx("queue_sig_handler: unexpected signal"); 71 } 72 } 73 74 void 75 queue_dispatch_control(int sig, short event, void *p) 76 { 77 struct smtpd *env = p; 78 struct imsgbuf *ibuf; 79 struct imsg imsg; 80 ssize_t n; 81 82 ibuf = env->sc_ibufs[PROC_CONTROL]; 83 switch (event) { 84 case EV_READ: 85 if ((n = imsg_read(ibuf)) == -1) 86 fatal("imsg_read_error"); 87 if (n == 0) { 88 /* this pipe is dead, so remove the event handler */ 89 event_del(&ibuf->ev); 90 event_loopexit(NULL); 91 return; 92 } 93 break; 94 case EV_WRITE: 95 if (msgbuf_write(&ibuf->w) == -1) 96 fatal("msgbuf_write"); 97 imsg_event_add(ibuf); 98 return; 99 default: 100 fatalx("unknown event"); 101 } 102 103 for (;;) { 104 if ((n = imsg_get(ibuf, &imsg)) == -1) 105 fatalx("queue_dispatch_control: imsg_get error"); 106 if (n == 0) 107 break; 108 109 switch (imsg.hdr.type) { 110 case IMSG_STATS: { 111 struct stats *s; 112 113 s = imsg.data; 114 s->u.queue = s_queue; 115 imsg_compose(ibuf, IMSG_STATS, 0, 0, -1, s, sizeof(*s)); 116 break; 117 } 118 default: 119 log_warnx("queue_dispatch_control: got imsg %d", 120 imsg.hdr.type); 121 fatalx("queue_dispatch_control: unexpected imsg"); 122 } 123 imsg_free(&imsg); 124 } 125 imsg_event_add(ibuf); 126 } 127 128 void 129 queue_dispatch_smtp(int sig, short event, void *p) 130 { 131 struct smtpd *env = p; 132 struct imsgbuf *ibuf; 133 struct imsg imsg; 134 ssize_t n; 135 136 ibuf = env->sc_ibufs[PROC_SMTP]; 137 switch (event) { 138 case EV_READ: 139 if ((n = imsg_read(ibuf)) == -1) 140 fatal("imsg_read_error"); 141 if (n == 0) { 142 /* this pipe is dead, so remove the event handler */ 143 event_del(&ibuf->ev); 144 event_loopexit(NULL); 145 return; 146 } 147 break; 148 case EV_WRITE: 149 if (msgbuf_write(&ibuf->w) == -1) 150 fatal("msgbuf_write"); 151 imsg_event_add(ibuf); 152 return; 153 default: 154 fatalx("unknown event"); 155 } 156 157 for (;;) { 158 if ((n = imsg_get(ibuf, &imsg)) == -1) 159 fatalx("queue_dispatch_smtp: imsg_get error"); 160 if (n == 0) 161 break; 162 163 switch (imsg.hdr.type) { 164 case IMSG_QUEUE_CREATE_MESSAGE: { 165 struct message *messagep; 166 struct submit_status ss; 167 int (*f)(char *); 168 169 log_debug("queue_dispatch_smtp: creating message file"); 170 messagep = imsg.data; 171 ss.id = messagep->session_id; 172 ss.code = 250; 173 bzero(ss.u.msgid, MAX_ID_SIZE); 174 175 if (messagep->flags & F_MESSAGE_ENQUEUED) 176 f = enqueue_create_layout; 177 else 178 f = queue_create_incoming_layout; 179 180 if (! f(ss.u.msgid)) 181 ss.code = 421; 182 183 imsg_compose(ibuf, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, 184 &ss, sizeof(ss)); 185 break; 186 } 187 case IMSG_QUEUE_REMOVE_MESSAGE: { 188 struct message *messagep; 189 void (*f)(char *); 190 191 messagep = imsg.data; 192 if (messagep->flags & F_MESSAGE_ENQUEUED) 193 f = enqueue_delete_message; 194 else 195 f = queue_delete_incoming_message; 196 197 f(messagep->message_id); 198 199 break; 200 } 201 case IMSG_QUEUE_COMMIT_MESSAGE: { 202 struct message *messagep; 203 struct submit_status ss; 204 size_t *counter; 205 int (*f)(struct message *); 206 207 messagep = imsg.data; 208 ss.id = messagep->session_id; 209 210 if (messagep->flags & F_MESSAGE_ENQUEUED) { 211 f = enqueue_commit_message; 212 counter = &s_queue.inserts_local; 213 } else { 214 f = queue_commit_incoming_message; 215 counter = &s_queue.inserts_remote; 216 } 217 218 if (f(messagep)) 219 (*counter)++; 220 else 221 ss.code = 421; 222 223 imsg_compose(ibuf, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, 224 &ss, sizeof(ss)); 225 226 break; 227 } 228 case IMSG_QUEUE_MESSAGE_FILE: { 229 struct message *messagep; 230 struct submit_status ss; 231 int fd; 232 int (*f)(struct message *); 233 234 messagep = imsg.data; 235 ss.id = messagep->session_id; 236 237 if (messagep->flags & F_MESSAGE_ENQUEUED) 238 f = enqueue_open_messagefile; 239 else 240 f = queue_open_incoming_message_file; 241 242 fd = f(messagep); 243 if (fd == -1) 244 ss.code = 421; 245 246 imsg_compose(ibuf, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, 247 &ss, sizeof(ss)); 248 break; 249 } 250 default: 251 log_warnx("queue_dispatch_smtp: got imsg %d", 252 imsg.hdr.type); 253 fatalx("queue_dispatch_smtp: unexpected imsg"); 254 } 255 imsg_free(&imsg); 256 } 257 imsg_event_add(ibuf); 258 } 259 260 void 261 queue_dispatch_mda(int sig, short event, void *p) 262 { 263 struct smtpd *env = p; 264 struct imsgbuf *ibuf; 265 struct imsg imsg; 266 ssize_t n; 267 268 ibuf = env->sc_ibufs[PROC_MDA]; 269 switch (event) { 270 case EV_READ: 271 if ((n = imsg_read(ibuf)) == -1) 272 fatal("imsg_read_error"); 273 if (n == 0) { 274 /* this pipe is dead, so remove the event handler */ 275 event_del(&ibuf->ev); 276 event_loopexit(NULL); 277 return; 278 } 279 break; 280 case EV_WRITE: 281 if (msgbuf_write(&ibuf->w) == -1) 282 fatal("msgbuf_write"); 283 imsg_event_add(ibuf); 284 return; 285 default: 286 fatalx("unknown event"); 287 } 288 289 for (;;) { 290 if ((n = imsg_get(ibuf, &imsg)) == -1) 291 fatalx("queue_dispatch_mda: imsg_get error"); 292 if (n == 0) 293 break; 294 295 switch (imsg.hdr.type) { 296 297 case IMSG_QUEUE_MESSAGE_UPDATE: { 298 imsg_compose(env->sc_ibufs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE, 299 0, 0, -1, imsg.data, sizeof(struct message)); 300 break; 301 } 302 303 default: 304 log_warnx("got imsg %d", imsg.hdr.type); 305 fatalx("queue_dispatch_mda: unexpected imsg"); 306 } 307 imsg_free(&imsg); 308 } 309 imsg_event_add(ibuf); 310 } 311 312 void 313 queue_dispatch_mta(int sig, short event, void *p) 314 { 315 struct smtpd *env = p; 316 struct imsgbuf *ibuf; 317 struct imsg imsg; 318 ssize_t n; 319 320 ibuf = env->sc_ibufs[PROC_MTA]; 321 switch (event) { 322 case EV_READ: 323 if ((n = imsg_read(ibuf)) == -1) 324 fatal("imsg_read_error"); 325 if (n == 0) { 326 /* this pipe is dead, so remove the event handler */ 327 event_del(&ibuf->ev); 328 event_loopexit(NULL); 329 return; 330 } 331 break; 332 case EV_WRITE: 333 if (msgbuf_write(&ibuf->w) == -1) 334 fatal("msgbuf_write"); 335 imsg_event_add(ibuf); 336 return; 337 default: 338 fatalx("unknown event"); 339 } 340 341 for (;;) { 342 if ((n = imsg_get(ibuf, &imsg)) == -1) 343 fatalx("queue_dispatch_mta: imsg_get error"); 344 if (n == 0) 345 break; 346 347 switch (imsg.hdr.type) { 348 349 case IMSG_QUEUE_MESSAGE_FD: { 350 int fd; 351 struct batch *batchp; 352 353 batchp = imsg.data; 354 fd = queue_open_message_file(batchp->message_id); 355 imsg_compose(ibuf, IMSG_QUEUE_MESSAGE_FD, 0, 0, fd, batchp, 356 sizeof(*batchp)); 357 break; 358 } 359 360 case IMSG_QUEUE_MESSAGE_UPDATE: { 361 imsg_compose(env->sc_ibufs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE, 362 0, 0, -1, imsg.data, sizeof(struct message)); 363 break; 364 } 365 366 default: 367 log_warnx("got imsg %d", imsg.hdr.type); 368 fatalx("queue_dispatch_mda: unexpected imsg"); 369 } 370 imsg_free(&imsg); 371 } 372 imsg_event_add(ibuf); 373 } 374 375 void 376 queue_dispatch_lka(int sig, short event, void *p) 377 { 378 struct smtpd *env = p; 379 struct imsgbuf *ibuf; 380 struct imsg imsg; 381 ssize_t n; 382 383 ibuf = env->sc_ibufs[PROC_LKA]; 384 switch (event) { 385 case EV_READ: 386 if ((n = imsg_read(ibuf)) == -1) 387 fatal("imsg_read_error"); 388 if (n == 0) { 389 /* this pipe is dead, so remove the event handler */ 390 event_del(&ibuf->ev); 391 event_loopexit(NULL); 392 return; 393 } 394 break; 395 case EV_WRITE: 396 if (msgbuf_write(&ibuf->w) == -1) 397 fatal("msgbuf_write"); 398 imsg_event_add(ibuf); 399 return; 400 default: 401 fatalx("unknown event"); 402 } 403 404 for (;;) { 405 if ((n = imsg_get(ibuf, &imsg)) == -1) 406 fatalx("queue_dispatch_lka: imsg_get error"); 407 if (n == 0) 408 break; 409 410 switch (imsg.hdr.type) { 411 412 case IMSG_QUEUE_SUBMIT_ENVELOPE: { 413 struct message *messagep; 414 struct submit_status ss; 415 int (*f)(struct message *); 416 417 messagep = imsg.data; 418 messagep->id = queue_generate_id(); 419 ss.id = messagep->session_id; 420 421 if (IS_MAILBOX(messagep->recipient.rule.r_action) || 422 IS_EXT(messagep->recipient.rule.r_action)) 423 messagep->type = T_MDA_MESSAGE; 424 else 425 messagep->type = T_MTA_MESSAGE; 426 427 /* Write to disk */ 428 if (messagep->flags & F_MESSAGE_ENQUEUED) 429 f = enqueue_record_envelope; 430 else 431 f = queue_record_incoming_envelope; 432 433 if (! f(messagep)) { 434 ss.code = 421; 435 imsg_compose(env->sc_ibufs[PROC_SMTP], 436 IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss, 437 sizeof(ss)); 438 } 439 440 break; 441 } 442 443 case IMSG_QUEUE_COMMIT_ENVELOPES: { 444 struct message *messagep; 445 struct submit_status ss; 446 447 messagep = imsg.data; 448 ss.id = messagep->session_id; 449 ss.code = 250; 450 451 imsg_compose(env->sc_ibufs[PROC_SMTP], IMSG_QUEUE_COMMIT_ENVELOPES, 452 0, 0, -1, &ss, sizeof(ss)); 453 454 break; 455 } 456 457 default: 458 log_warnx("got imsg %d", imsg.hdr.type); 459 fatalx("queue_dispatch_lka: unexpected imsg"); 460 } 461 imsg_free(&imsg); 462 } 463 imsg_event_add(ibuf); 464 } 465 466 void 467 queue_dispatch_runner(int sig, short event, void *p) 468 { 469 struct smtpd *env = p; 470 struct imsgbuf *ibuf; 471 struct imsg imsg; 472 ssize_t n; 473 474 ibuf = env->sc_ibufs[PROC_RUNNER]; 475 switch (event) { 476 case EV_READ: 477 if ((n = imsg_read(ibuf)) == -1) 478 fatal("imsg_read_error"); 479 if (n == 0) { 480 /* this pipe is dead, so remove the event handler */ 481 event_del(&ibuf->ev); 482 event_loopexit(NULL); 483 return; 484 } 485 break; 486 case EV_WRITE: 487 if (msgbuf_write(&ibuf->w) == -1) 488 fatal("msgbuf_write"); 489 imsg_event_add(ibuf); 490 return; 491 default: 492 fatalx("unknown event"); 493 } 494 495 for (;;) { 496 if ((n = imsg_get(ibuf, &imsg)) == -1) 497 fatalx("queue_dispatch_runner: imsg_get error"); 498 if (n == 0) 499 break; 500 501 switch (imsg.hdr.type) { 502 default: 503 log_warnx("got imsg %d", imsg.hdr.type); 504 fatalx("queue_dispatch_runner: unexpected imsg"); 505 } 506 imsg_free(&imsg); 507 } 508 imsg_event_add(ibuf); 509 } 510 511 void 512 queue_shutdown(void) 513 { 514 log_info("queue handler"); 515 _exit(0); 516 } 517 518 void 519 queue_setup_events(struct smtpd *env) 520 { 521 } 522 523 void 524 queue_disable_events(struct smtpd *env) 525 { 526 } 527 528 pid_t 529 queue(struct smtpd *env) 530 { 531 pid_t pid; 532 struct passwd *pw; 533 534 struct event ev_sigint; 535 struct event ev_sigterm; 536 537 struct peer peers[] = { 538 { PROC_CONTROL, queue_dispatch_control }, 539 { PROC_SMTP, queue_dispatch_smtp }, 540 { PROC_MDA, queue_dispatch_mda }, 541 { PROC_MTA, queue_dispatch_mta }, 542 { PROC_LKA, queue_dispatch_lka }, 543 { PROC_RUNNER, queue_dispatch_runner } 544 }; 545 546 switch (pid = fork()) { 547 case -1: 548 fatal("queue: cannot fork"); 549 case 0: 550 break; 551 default: 552 return (pid); 553 } 554 555 purge_config(env, PURGE_EVERYTHING); 556 557 pw = env->sc_pw; 558 559 #ifndef DEBUG 560 if (chroot(PATH_SPOOL) == -1) 561 fatal("queue: chroot"); 562 if (chdir("/") == -1) 563 fatal("queue: chdir(\"/\")"); 564 #else 565 #warning disabling privilege revocation and chroot in DEBUG MODE 566 #endif 567 568 setproctitle("queue handler"); 569 smtpd_process = PROC_QUEUE; 570 571 #ifndef DEBUG 572 if (setgroups(1, &pw->pw_gid) || 573 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 574 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 575 fatal("queue: cannot drop privileges"); 576 #endif 577 578 event_init(); 579 580 signal_set(&ev_sigint, SIGINT, queue_sig_handler, env); 581 signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, env); 582 signal_add(&ev_sigint, NULL); 583 signal_add(&ev_sigterm, NULL); 584 signal(SIGPIPE, SIG_IGN); 585 signal(SIGHUP, SIG_IGN); 586 587 config_pipes(env, peers, 6); 588 config_peers(env, peers, 6); 589 590 queue_purge(PATH_INCOMING); 591 queue_purge(PATH_ENQUEUE); 592 593 queue_setup_events(env); 594 event_dispatch(); 595 queue_shutdown(); 596 597 return (0); 598 } 599 600 int 601 queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct message *messagep) 602 { 603 TAILQ_REMOVE(&batchp->messages, messagep, entry); 604 bzero(messagep, sizeof(struct message)); 605 free(messagep); 606 607 if (TAILQ_FIRST(&batchp->messages) == NULL) { 608 SPLAY_REMOVE(batchtree, &env->batch_queue, batchp); 609 bzero(batchp, sizeof(struct batch)); 610 free(batchp); 611 return 1; 612 } 613 return 0; 614 } 615 616 struct batch * 617 batch_by_id(struct smtpd *env, u_int64_t id) 618 { 619 struct batch lookup; 620 621 lookup.id = id; 622 return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); 623 } 624 625 626 struct message * 627 message_by_id(struct smtpd *env, struct batch *batchp, u_int64_t id) 628 { 629 struct message *messagep; 630 631 if (batchp != NULL) { 632 TAILQ_FOREACH(messagep, &batchp->messages, entry) { 633 if (messagep->id == id) 634 break; 635 } 636 return messagep; 637 } 638 639 SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) { 640 TAILQ_FOREACH(messagep, &batchp->messages, entry) { 641 if (messagep->id == id) 642 return messagep; 643 } 644 } 645 return NULL; 646 } 647 648 void 649 queue_purge(char *queuepath) 650 { 651 char path[MAXPATHLEN]; 652 struct qwalk *q; 653 654 q = qwalk_new(queuepath); 655 656 while (qwalk(q, path)) 657 queue_delete_layout_message(queuepath, basename(path)); 658 659 qwalk_close(q); 660 } 661