1 /* $OpenBSD: queue.c,v 1.73 2009/11/08 21:40:05 gilles Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * 7 * Permission to use, copy, modify, and distribute this software for any 8 * purpose with or without fee is hereby granted, provided that the above 9 * copyright notice and this permission notice appear in all copies. 10 * 11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 18 */ 19 20 #include <sys/types.h> 21 #include <sys/queue.h> 22 #include <sys/tree.h> 23 #include <sys/param.h> 24 #include <sys/socket.h> 25 #include <sys/stat.h> 26 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <libgen.h> 31 #include <pwd.h> 32 #include <signal.h> 33 #include <stdio.h> 34 #include <stdlib.h> 35 #include <string.h> 36 #include <unistd.h> 37 38 #include "smtpd.h" 39 40 __dead void queue_shutdown(void); 41 void queue_sig_handler(int, short, void *); 42 void queue_dispatch_control(int, short, void *); 43 void queue_dispatch_smtp(int, short, void *); 44 void queue_dispatch_mda(int, short, void *); 45 void queue_dispatch_mta(int, short, void *); 46 void queue_dispatch_lka(int, short, void *); 47 void queue_dispatch_runner(int, short, void *); 48 void queue_setup_events(struct smtpd *); 49 void queue_disable_events(struct smtpd *); 50 void queue_purge(char *); 51 52 int queue_create_layout_message(char *, char *); 53 void queue_delete_layout_message(char *, char *); 54 int queue_record_layout_envelope(char *, struct message *); 55 int queue_remove_layout_envelope(char *, struct message *); 56 int queue_commit_layout_message(char *, struct message *); 57 int queue_open_layout_messagefile(char *, struct message *); 58 59 void queue_submit_envelope(struct smtpd *, struct message *); 60 void queue_commit_envelopes(struct smtpd *, struct message*); 61 62 void 63 queue_sig_handler(int sig, short event, void *p) 64 { 65 switch (sig) { 66 case SIGINT: 67 case SIGTERM: 68 queue_shutdown(); 69 break; 70 default: 71 fatalx("queue_sig_handler: unexpected signal"); 72 } 73 } 74 75 void 76 queue_dispatch_control(int sig, short event, void *p) 77 { 78 struct smtpd *env = p; 79 struct imsgev *iev; 80 struct imsgbuf *ibuf; 81 struct imsg imsg; 82 ssize_t n; 83 84 iev = env->sc_ievs[PROC_CONTROL]; 85 ibuf = &iev->ibuf; 86 87 if (event & EV_READ) { 88 if ((n = imsg_read(ibuf)) == -1) 89 fatal("imsg_read_error"); 90 if (n == 0) { 91 /* this pipe is dead, so remove the event handler */ 92 event_del(&iev->ev); 93 event_loopexit(NULL); 94 return; 95 } 96 } 97 98 if (event & EV_WRITE) { 99 if (msgbuf_write(&ibuf->w) == -1) 100 fatal("msgbuf_write"); 101 } 102 103 for (;;) { 104 if ((n = imsg_get(ibuf, &imsg)) == -1) 105 fatal("queue_dispatch_control: imsg_get error"); 106 if (n == 0) 107 break; 108 109 switch (imsg.hdr.type) { 110 default: 111 log_warnx("queue_dispatch_control: got imsg %d", 112 imsg.hdr.type); 113 fatalx("queue_dispatch_control: unexpected imsg"); 114 } 115 imsg_free(&imsg); 116 } 117 imsg_event_add(iev); 118 } 119 120 void 121 queue_dispatch_smtp(int sig, short event, void *p) 122 { 123 struct smtpd *env = p; 124 struct imsgev *iev; 125 struct imsgbuf *ibuf; 126 struct imsg imsg; 127 ssize_t n; 128 129 iev = env->sc_ievs[PROC_SMTP]; 130 ibuf = &iev->ibuf; 131 132 if (event & EV_READ) { 133 if ((n = imsg_read(ibuf)) == -1) 134 fatal("imsg_read_error"); 135 if (n == 0) { 136 /* this pipe is dead, so remove the event handler */ 137 event_del(&iev->ev); 138 event_loopexit(NULL); 139 return; 140 } 141 } 142 143 if (event & EV_WRITE) { 144 if (msgbuf_write(&ibuf->w) == -1) 145 fatal("msgbuf_write"); 146 } 147 148 for (;;) { 149 if ((n = imsg_get(ibuf, &imsg)) == -1) 150 fatal("queue_dispatch_smtp: imsg_get error"); 151 if (n == 0) 152 break; 153 154 switch (imsg.hdr.type) { 155 case IMSG_QUEUE_CREATE_MESSAGE: { 156 struct message *messagep = imsg.data; 157 struct submit_status ss; 158 int (*f)(char *); 159 160 log_debug("queue_dispatch_smtp: creating message file"); 161 162 IMSG_SIZE_CHECK(messagep); 163 164 ss.id = messagep->session_id; 165 ss.code = 250; 166 bzero(ss.u.msgid, MAX_ID_SIZE); 167 168 if (messagep->flags & F_MESSAGE_ENQUEUED) 169 f = enqueue_create_layout; 170 else 171 f = queue_create_incoming_layout; 172 173 if (! f(ss.u.msgid)) 174 ss.code = 421; 175 176 imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, 177 &ss, sizeof(ss)); 178 break; 179 } 180 case IMSG_QUEUE_REMOVE_MESSAGE: { 181 struct message *messagep = imsg.data; 182 void (*f)(char *); 183 184 IMSG_SIZE_CHECK(messagep); 185 186 if (messagep->flags & F_MESSAGE_ENQUEUED) 187 f = enqueue_delete_message; 188 else 189 f = queue_delete_incoming_message; 190 191 f(messagep->message_id); 192 193 break; 194 } 195 case IMSG_QUEUE_COMMIT_MESSAGE: { 196 struct message *messagep = imsg.data; 197 struct submit_status ss; 198 size_t *counter; 199 int (*f)(struct message *); 200 201 IMSG_SIZE_CHECK(messagep); 202 203 ss.id = messagep->session_id; 204 205 if (messagep->flags & F_MESSAGE_ENQUEUED) { 206 f = enqueue_commit_message; 207 counter = &env->stats->queue.inserts_local; 208 } else { 209 f = queue_commit_incoming_message; 210 counter = &env->stats->queue.inserts_remote; 211 } 212 213 if (f(messagep)) 214 (*counter)++; 215 else 216 ss.code = 421; 217 218 imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, 219 &ss, sizeof(ss)); 220 221 break; 222 } 223 case IMSG_QUEUE_MESSAGE_FILE: { 224 struct message *messagep = imsg.data; 225 struct submit_status ss; 226 int fd; 227 int (*f)(struct message *); 228 229 IMSG_SIZE_CHECK(messagep); 230 231 ss.id = messagep->session_id; 232 233 if (messagep->flags & F_MESSAGE_ENQUEUED) 234 f = enqueue_open_messagefile; 235 else 236 f = queue_open_incoming_message_file; 237 238 fd = f(messagep); 239 if (fd == -1) 240 ss.code = 421; 241 242 imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, 243 &ss, sizeof(ss)); 244 break; 245 } 246 default: 247 log_warnx("queue_dispatch_smtp: got imsg %d", 248 imsg.hdr.type); 249 fatalx("queue_dispatch_smtp: unexpected imsg"); 250 } 251 imsg_free(&imsg); 252 } 253 imsg_event_add(iev); 254 } 255 256 void 257 queue_dispatch_mda(int sig, short event, void *p) 258 { 259 struct smtpd *env = p; 260 struct imsgev *iev; 261 struct imsgbuf *ibuf; 262 struct imsg imsg; 263 ssize_t n; 264 265 iev = env->sc_ievs[PROC_MDA]; 266 ibuf = &iev->ibuf; 267 268 if (event & EV_READ) { 269 if ((n = imsg_read(ibuf)) == -1) 270 fatal("imsg_read_error"); 271 if (n == 0) { 272 /* this pipe is dead, so remove the event handler */ 273 event_del(&iev->ev); 274 event_loopexit(NULL); 275 return; 276 } 277 } 278 279 if (event & EV_WRITE) { 280 if (msgbuf_write(&ibuf->w) == -1) 281 fatal("msgbuf_write"); 282 } 283 284 for (;;) { 285 if ((n = imsg_get(ibuf, &imsg)) == -1) 286 fatal("queue_dispatch_mda: imsg_get error"); 287 if (n == 0) 288 break; 289 290 switch (imsg.hdr.type) { 291 292 case IMSG_QUEUE_MESSAGE_UPDATE: { 293 imsg_compose_event(env->sc_ievs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE, 294 0, 0, -1, imsg.data, sizeof(struct message)); 295 break; 296 } 297 298 default: 299 log_warnx("got imsg %d", imsg.hdr.type); 300 fatalx("queue_dispatch_mda: unexpected imsg"); 301 } 302 imsg_free(&imsg); 303 } 304 imsg_event_add(iev); 305 } 306 307 void 308 queue_dispatch_mta(int sig, short event, void *p) 309 { 310 struct smtpd *env = p; 311 struct imsgev *iev; 312 struct imsgbuf *ibuf; 313 struct imsg imsg; 314 ssize_t n; 315 316 iev = env->sc_ievs[PROC_MTA]; 317 ibuf = &iev->ibuf; 318 319 if (event & EV_READ) { 320 if ((n = imsg_read(ibuf)) == -1) 321 fatal("imsg_read_error"); 322 if (n == 0) { 323 /* this pipe is dead, so remove the event handler */ 324 event_del(&iev->ev); 325 event_loopexit(NULL); 326 return; 327 } 328 } 329 330 if (event & EV_WRITE) { 331 if (msgbuf_write(&ibuf->w) == -1) 332 fatal("msgbuf_write"); 333 } 334 335 for (;;) { 336 if ((n = imsg_get(ibuf, &imsg)) == -1) 337 fatal("queue_dispatch_mta: imsg_get error"); 338 if (n == 0) 339 break; 340 341 switch (imsg.hdr.type) { 342 343 case IMSG_QUEUE_MESSAGE_FD: { 344 struct batch *batchp = imsg.data; 345 int fd; 346 347 IMSG_SIZE_CHECK(batchp); 348 349 fd = queue_open_message_file(batchp->message_id); 350 imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FD, 0, 0, fd, batchp, 351 sizeof(*batchp)); 352 break; 353 } 354 355 case IMSG_QUEUE_MESSAGE_UPDATE: { 356 imsg_compose_event(env->sc_ievs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE, 357 0, 0, -1, imsg.data, sizeof(struct message)); 358 break; 359 } 360 361 default: 362 log_warnx("got imsg %d", imsg.hdr.type); 363 fatalx("queue_dispatch_mda: unexpected imsg"); 364 } 365 imsg_free(&imsg); 366 } 367 imsg_event_add(iev); 368 } 369 370 void 371 queue_dispatch_lka(int sig, short event, void *p) 372 { 373 struct smtpd *env = p; 374 struct imsgev *iev; 375 struct imsgbuf *ibuf; 376 struct imsg imsg; 377 ssize_t n; 378 379 iev = env->sc_ievs[PROC_LKA]; 380 ibuf = &iev->ibuf; 381 382 if (event & EV_READ) { 383 if ((n = imsg_read(ibuf)) == -1) 384 fatal("imsg_read_error"); 385 if (n == 0) { 386 /* this pipe is dead, so remove the event handler */ 387 event_del(&iev->ev); 388 event_loopexit(NULL); 389 return; 390 } 391 } 392 393 if (event & EV_WRITE) { 394 if (msgbuf_write(&ibuf->w) == -1) 395 fatal("msgbuf_write"); 396 } 397 398 for (;;) { 399 if ((n = imsg_get(ibuf, &imsg)) == -1) 400 fatal("queue_dispatch_lka: imsg_get error"); 401 if (n == 0) 402 break; 403 404 switch (imsg.hdr.type) { 405 406 case IMSG_QUEUE_SUBMIT_ENVELOPE: { 407 struct message *messagep = imsg.data; 408 struct submit_status ss; 409 int (*f)(struct message *); 410 411 IMSG_SIZE_CHECK(messagep); 412 413 messagep->id = generate_uid(); 414 ss.id = messagep->session_id; 415 416 if (IS_MAILBOX(messagep->recipient) || 417 IS_EXT(messagep->recipient)) 418 messagep->type = T_MDA_MESSAGE; 419 else 420 messagep->type = T_MTA_MESSAGE; 421 422 /* Write to disk */ 423 if (messagep->flags & F_MESSAGE_ENQUEUED) 424 f = enqueue_record_envelope; 425 else 426 f = queue_record_incoming_envelope; 427 428 if (! f(messagep)) { 429 ss.code = 421; 430 imsg_compose_event(env->sc_ievs[PROC_SMTP], 431 IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss, 432 sizeof(ss)); 433 } 434 435 break; 436 } 437 438 case IMSG_QUEUE_COMMIT_ENVELOPES: { 439 struct message *messagep = imsg.data; 440 struct submit_status ss; 441 442 IMSG_SIZE_CHECK(messagep); 443 444 ss.id = messagep->session_id; 445 ss.code = 250; 446 447 imsg_compose_event(env->sc_ievs[PROC_SMTP], IMSG_QUEUE_COMMIT_ENVELOPES, 448 0, 0, -1, &ss, sizeof(ss)); 449 450 break; 451 } 452 453 default: 454 log_warnx("got imsg %d", imsg.hdr.type); 455 fatalx("queue_dispatch_lka: unexpected imsg"); 456 } 457 imsg_free(&imsg); 458 } 459 imsg_event_add(iev); 460 } 461 462 void 463 queue_dispatch_runner(int sig, short event, void *p) 464 { 465 struct smtpd *env = p; 466 struct imsgev *iev; 467 struct imsgbuf *ibuf; 468 struct imsg imsg; 469 ssize_t n; 470 471 iev = env->sc_ievs[PROC_RUNNER]; 472 ibuf = &iev->ibuf; 473 474 if (event & EV_READ) { 475 if ((n = imsg_read(ibuf)) == -1) 476 fatal("imsg_read_error"); 477 if (n == 0) { 478 /* this pipe is dead, so remove the event handler */ 479 event_del(&iev->ev); 480 event_loopexit(NULL); 481 return; 482 } 483 } 484 485 if (event & EV_WRITE) { 486 if (msgbuf_write(&ibuf->w) == -1) 487 fatal("msgbuf_write"); 488 } 489 490 for (;;) { 491 if ((n = imsg_get(ibuf, &imsg)) == -1) 492 fatal("queue_dispatch_runner: imsg_get error"); 493 if (n == 0) 494 break; 495 496 switch (imsg.hdr.type) { 497 default: 498 log_warnx("got imsg %d", imsg.hdr.type); 499 fatalx("queue_dispatch_runner: unexpected imsg"); 500 } 501 imsg_free(&imsg); 502 } 503 imsg_event_add(iev); 504 } 505 506 void 507 queue_shutdown(void) 508 { 509 log_info("queue handler exiting"); 510 _exit(0); 511 } 512 513 void 514 queue_setup_events(struct smtpd *env) 515 { 516 } 517 518 void 519 queue_disable_events(struct smtpd *env) 520 { 521 } 522 523 pid_t 524 queue(struct smtpd *env) 525 { 526 pid_t pid; 527 struct passwd *pw; 528 529 struct event ev_sigint; 530 struct event ev_sigterm; 531 532 struct peer peers[] = { 533 { PROC_CONTROL, queue_dispatch_control }, 534 { PROC_SMTP, queue_dispatch_smtp }, 535 { PROC_MDA, queue_dispatch_mda }, 536 { PROC_MTA, queue_dispatch_mta }, 537 { PROC_LKA, queue_dispatch_lka }, 538 { PROC_RUNNER, queue_dispatch_runner } 539 }; 540 541 switch (pid = fork()) { 542 case -1: 543 fatal("queue: cannot fork"); 544 case 0: 545 break; 546 default: 547 return (pid); 548 } 549 550 purge_config(env, PURGE_EVERYTHING); 551 552 pw = env->sc_pw; 553 554 #ifndef DEBUG 555 if (chroot(PATH_SPOOL) == -1) 556 fatal("queue: chroot"); 557 if (chdir("/") == -1) 558 fatal("queue: chdir(\"/\")"); 559 #else 560 #warning disabling privilege revocation and chroot in DEBUG MODE 561 #endif 562 563 smtpd_process = PROC_QUEUE; 564 setproctitle("%s", env->sc_title[smtpd_process]); 565 566 #ifndef DEBUG 567 if (setgroups(1, &pw->pw_gid) || 568 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 569 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 570 fatal("queue: cannot drop privileges"); 571 #endif 572 573 event_init(); 574 575 signal_set(&ev_sigint, SIGINT, queue_sig_handler, env); 576 signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, env); 577 signal_add(&ev_sigint, NULL); 578 signal_add(&ev_sigterm, NULL); 579 signal(SIGPIPE, SIG_IGN); 580 signal(SIGHUP, SIG_IGN); 581 582 config_pipes(env, peers, nitems(peers)); 583 config_peers(env, peers, nitems(peers)); 584 585 queue_purge(PATH_INCOMING); 586 queue_purge(PATH_ENQUEUE); 587 588 queue_setup_events(env); 589 event_dispatch(); 590 queue_shutdown(); 591 592 return (0); 593 } 594 595 struct batch * 596 batch_by_id(struct smtpd *env, u_int64_t id) 597 { 598 struct batch lookup; 599 600 lookup.id = id; 601 return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); 602 } 603 604 605 void 606 queue_purge(char *queuepath) 607 { 608 char path[MAXPATHLEN]; 609 struct qwalk *q; 610 611 q = qwalk_new(queuepath); 612 613 while (qwalk(q, path)) 614 queue_delete_layout_message(queuepath, basename(path)); 615 616 qwalk_close(q); 617 } 618 619 void 620 queue_submit_envelope(struct smtpd *env, struct message *message) 621 { 622 imsg_compose_event(env->sc_ievs[PROC_QUEUE], 623 IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, 624 message, sizeof(struct message)); 625 } 626 627 void 628 queue_commit_envelopes(struct smtpd *env, struct message *message) 629 { 630 imsg_compose_event(env->sc_ievs[PROC_QUEUE], 631 IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, 632 message, sizeof(struct message)); 633 } 634