1 /* $OpenBSD: queue.c,v 1.70 2009/09/15 16:50:06 jacekm Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * 7 * Permission to use, copy, modify, and distribute this software for any 8 * purpose with or without fee is hereby granted, provided that the above 9 * copyright notice and this permission notice appear in all copies. 10 * 11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 18 */ 19 20 #include <sys/types.h> 21 #include <sys/queue.h> 22 #include <sys/tree.h> 23 #include <sys/param.h> 24 #include <sys/socket.h> 25 #include <sys/stat.h> 26 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <libgen.h> 31 #include <pwd.h> 32 #include <signal.h> 33 #include <stdio.h> 34 #include <stdlib.h> 35 #include <string.h> 36 #include <unistd.h> 37 38 #include "smtpd.h" 39 40 __dead void queue_shutdown(void); 41 void queue_sig_handler(int, short, void *); 42 void queue_dispatch_control(int, short, void *); 43 void queue_dispatch_smtp(int, short, void *); 44 void queue_dispatch_mda(int, short, void *); 45 void queue_dispatch_mta(int, short, void *); 46 void queue_dispatch_lka(int, short, void *); 47 void queue_dispatch_runner(int, short, void *); 48 void queue_setup_events(struct smtpd *); 49 void queue_disable_events(struct smtpd *); 50 void queue_purge(char *); 51 52 int queue_create_layout_message(char *, char *); 53 void queue_delete_layout_message(char *, char *); 54 int queue_record_layout_envelope(char *, struct message *); 55 int queue_remove_layout_envelope(char *, struct message *); 56 int queue_commit_layout_message(char *, struct message *); 57 int queue_open_layout_messagefile(char *, struct message *); 58 59 void 60 queue_sig_handler(int sig, short event, void *p) 61 { 62 switch (sig) { 63 case SIGINT: 64 case SIGTERM: 65 queue_shutdown(); 66 break; 67 default: 68 fatalx("queue_sig_handler: unexpected signal"); 69 } 70 } 71 72 void 73 queue_dispatch_control(int sig, short event, void *p) 74 { 75 struct smtpd *env = p; 76 struct imsgev *iev; 77 struct imsgbuf *ibuf; 78 struct imsg imsg; 79 ssize_t n; 80 81 iev = env->sc_ievs[PROC_CONTROL]; 82 ibuf = &iev->ibuf; 83 84 if (event & EV_READ) { 85 if ((n = imsg_read(ibuf)) == -1) 86 fatal("imsg_read_error"); 87 if (n == 0) { 88 /* this pipe is dead, so remove the event handler */ 89 event_del(&iev->ev); 90 event_loopexit(NULL); 91 return; 92 } 93 } 94 95 if (event & EV_WRITE) { 96 if (msgbuf_write(&ibuf->w) == -1) 97 fatal("msgbuf_write"); 98 } 99 100 for (;;) { 101 if ((n = imsg_get(ibuf, &imsg)) == -1) 102 fatal("queue_dispatch_control: imsg_get error"); 103 if (n == 0) 104 break; 105 106 switch (imsg.hdr.type) { 107 default: 108 log_warnx("queue_dispatch_control: got imsg %d", 109 imsg.hdr.type); 110 fatalx("queue_dispatch_control: unexpected imsg"); 111 } 112 imsg_free(&imsg); 113 } 114 imsg_event_add(iev); 115 } 116 117 void 118 queue_dispatch_smtp(int sig, short event, void *p) 119 { 120 struct smtpd *env = p; 121 struct imsgev *iev; 122 struct imsgbuf *ibuf; 123 struct imsg imsg; 124 ssize_t n; 125 126 iev = env->sc_ievs[PROC_SMTP]; 127 ibuf = &iev->ibuf; 128 129 if (event & EV_READ) { 130 if ((n = imsg_read(ibuf)) == -1) 131 fatal("imsg_read_error"); 132 if (n == 0) { 133 /* this pipe is dead, so remove the event handler */ 134 event_del(&iev->ev); 135 event_loopexit(NULL); 136 return; 137 } 138 } 139 140 if (event & EV_WRITE) { 141 if (msgbuf_write(&ibuf->w) == -1) 142 fatal("msgbuf_write"); 143 } 144 145 for (;;) { 146 if ((n = imsg_get(ibuf, &imsg)) == -1) 147 fatal("queue_dispatch_smtp: imsg_get error"); 148 if (n == 0) 149 break; 150 151 switch (imsg.hdr.type) { 152 case IMSG_QUEUE_CREATE_MESSAGE: { 153 struct message *messagep = imsg.data; 154 struct submit_status ss; 155 int (*f)(char *); 156 157 log_debug("queue_dispatch_smtp: creating message file"); 158 159 IMSG_SIZE_CHECK(messagep); 160 161 ss.id = messagep->session_id; 162 ss.code = 250; 163 bzero(ss.u.msgid, MAX_ID_SIZE); 164 165 if (messagep->flags & F_MESSAGE_ENQUEUED) 166 f = enqueue_create_layout; 167 else 168 f = queue_create_incoming_layout; 169 170 if (! f(ss.u.msgid)) 171 ss.code = 421; 172 173 imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, 174 &ss, sizeof(ss)); 175 break; 176 } 177 case IMSG_QUEUE_REMOVE_MESSAGE: { 178 struct message *messagep = imsg.data; 179 void (*f)(char *); 180 181 IMSG_SIZE_CHECK(messagep); 182 183 if (messagep->flags & F_MESSAGE_ENQUEUED) 184 f = enqueue_delete_message; 185 else 186 f = queue_delete_incoming_message; 187 188 f(messagep->message_id); 189 190 break; 191 } 192 case IMSG_QUEUE_COMMIT_MESSAGE: { 193 struct message *messagep = imsg.data; 194 struct submit_status ss; 195 size_t *counter; 196 int (*f)(struct message *); 197 198 IMSG_SIZE_CHECK(messagep); 199 200 ss.id = messagep->session_id; 201 202 if (messagep->flags & F_MESSAGE_ENQUEUED) { 203 f = enqueue_commit_message; 204 counter = &env->stats->queue.inserts_local; 205 } else { 206 f = queue_commit_incoming_message; 207 counter = &env->stats->queue.inserts_remote; 208 } 209 210 if (f(messagep)) 211 (*counter)++; 212 else 213 ss.code = 421; 214 215 imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, 216 &ss, sizeof(ss)); 217 218 break; 219 } 220 case IMSG_QUEUE_MESSAGE_FILE: { 221 struct message *messagep = imsg.data; 222 struct submit_status ss; 223 int fd; 224 int (*f)(struct message *); 225 226 IMSG_SIZE_CHECK(messagep); 227 228 ss.id = messagep->session_id; 229 230 if (messagep->flags & F_MESSAGE_ENQUEUED) 231 f = enqueue_open_messagefile; 232 else 233 f = queue_open_incoming_message_file; 234 235 fd = f(messagep); 236 if (fd == -1) 237 ss.code = 421; 238 239 imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, 240 &ss, sizeof(ss)); 241 break; 242 } 243 default: 244 log_warnx("queue_dispatch_smtp: got imsg %d", 245 imsg.hdr.type); 246 fatalx("queue_dispatch_smtp: unexpected imsg"); 247 } 248 imsg_free(&imsg); 249 } 250 imsg_event_add(iev); 251 } 252 253 void 254 queue_dispatch_mda(int sig, short event, void *p) 255 { 256 struct smtpd *env = p; 257 struct imsgev *iev; 258 struct imsgbuf *ibuf; 259 struct imsg imsg; 260 ssize_t n; 261 262 iev = env->sc_ievs[PROC_MDA]; 263 ibuf = &iev->ibuf; 264 265 if (event & EV_READ) { 266 if ((n = imsg_read(ibuf)) == -1) 267 fatal("imsg_read_error"); 268 if (n == 0) { 269 /* this pipe is dead, so remove the event handler */ 270 event_del(&iev->ev); 271 event_loopexit(NULL); 272 return; 273 } 274 } 275 276 if (event & EV_WRITE) { 277 if (msgbuf_write(&ibuf->w) == -1) 278 fatal("msgbuf_write"); 279 } 280 281 for (;;) { 282 if ((n = imsg_get(ibuf, &imsg)) == -1) 283 fatal("queue_dispatch_mda: imsg_get error"); 284 if (n == 0) 285 break; 286 287 switch (imsg.hdr.type) { 288 289 case IMSG_QUEUE_MESSAGE_UPDATE: { 290 imsg_compose_event(env->sc_ievs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE, 291 0, 0, -1, imsg.data, sizeof(struct message)); 292 break; 293 } 294 295 default: 296 log_warnx("got imsg %d", imsg.hdr.type); 297 fatalx("queue_dispatch_mda: unexpected imsg"); 298 } 299 imsg_free(&imsg); 300 } 301 imsg_event_add(iev); 302 } 303 304 void 305 queue_dispatch_mta(int sig, short event, void *p) 306 { 307 struct smtpd *env = p; 308 struct imsgev *iev; 309 struct imsgbuf *ibuf; 310 struct imsg imsg; 311 ssize_t n; 312 313 iev = env->sc_ievs[PROC_MTA]; 314 ibuf = &iev->ibuf; 315 316 if (event & EV_READ) { 317 if ((n = imsg_read(ibuf)) == -1) 318 fatal("imsg_read_error"); 319 if (n == 0) { 320 /* this pipe is dead, so remove the event handler */ 321 event_del(&iev->ev); 322 event_loopexit(NULL); 323 return; 324 } 325 } 326 327 if (event & EV_WRITE) { 328 if (msgbuf_write(&ibuf->w) == -1) 329 fatal("msgbuf_write"); 330 } 331 332 for (;;) { 333 if ((n = imsg_get(ibuf, &imsg)) == -1) 334 fatal("queue_dispatch_mta: imsg_get error"); 335 if (n == 0) 336 break; 337 338 switch (imsg.hdr.type) { 339 340 case IMSG_QUEUE_MESSAGE_FD: { 341 struct batch *batchp = imsg.data; 342 int fd; 343 344 IMSG_SIZE_CHECK(batchp); 345 346 fd = queue_open_message_file(batchp->message_id); 347 imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FD, 0, 0, fd, batchp, 348 sizeof(*batchp)); 349 break; 350 } 351 352 case IMSG_QUEUE_MESSAGE_UPDATE: { 353 imsg_compose_event(env->sc_ievs[PROC_RUNNER], IMSG_RUNNER_UPDATE_ENVELOPE, 354 0, 0, -1, imsg.data, sizeof(struct message)); 355 break; 356 } 357 358 default: 359 log_warnx("got imsg %d", imsg.hdr.type); 360 fatalx("queue_dispatch_mda: unexpected imsg"); 361 } 362 imsg_free(&imsg); 363 } 364 imsg_event_add(iev); 365 } 366 367 void 368 queue_dispatch_lka(int sig, short event, void *p) 369 { 370 struct smtpd *env = p; 371 struct imsgev *iev; 372 struct imsgbuf *ibuf; 373 struct imsg imsg; 374 ssize_t n; 375 376 iev = env->sc_ievs[PROC_LKA]; 377 ibuf = &iev->ibuf; 378 379 if (event & EV_READ) { 380 if ((n = imsg_read(ibuf)) == -1) 381 fatal("imsg_read_error"); 382 if (n == 0) { 383 /* this pipe is dead, so remove the event handler */ 384 event_del(&iev->ev); 385 event_loopexit(NULL); 386 return; 387 } 388 } 389 390 if (event & EV_WRITE) { 391 if (msgbuf_write(&ibuf->w) == -1) 392 fatal("msgbuf_write"); 393 } 394 395 for (;;) { 396 if ((n = imsg_get(ibuf, &imsg)) == -1) 397 fatal("queue_dispatch_lka: imsg_get error"); 398 if (n == 0) 399 break; 400 401 switch (imsg.hdr.type) { 402 403 case IMSG_QUEUE_SUBMIT_ENVELOPE: { 404 struct message *messagep = imsg.data; 405 struct submit_status ss; 406 int (*f)(struct message *); 407 408 IMSG_SIZE_CHECK(messagep); 409 410 messagep->id = queue_generate_id(); 411 ss.id = messagep->session_id; 412 413 if (IS_MAILBOX(messagep->recipient.rule.r_action) || 414 IS_EXT(messagep->recipient.rule.r_action)) 415 messagep->type = T_MDA_MESSAGE; 416 else 417 messagep->type = T_MTA_MESSAGE; 418 419 /* Write to disk */ 420 if (messagep->flags & F_MESSAGE_ENQUEUED) 421 f = enqueue_record_envelope; 422 else 423 f = queue_record_incoming_envelope; 424 425 if (! f(messagep)) { 426 ss.code = 421; 427 imsg_compose_event(env->sc_ievs[PROC_SMTP], 428 IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss, 429 sizeof(ss)); 430 } 431 432 break; 433 } 434 435 case IMSG_QUEUE_COMMIT_ENVELOPES: { 436 struct message *messagep = imsg.data; 437 struct submit_status ss; 438 439 IMSG_SIZE_CHECK(messagep); 440 441 ss.id = messagep->session_id; 442 ss.code = 250; 443 444 imsg_compose_event(env->sc_ievs[PROC_SMTP], IMSG_QUEUE_COMMIT_ENVELOPES, 445 0, 0, -1, &ss, sizeof(ss)); 446 447 break; 448 } 449 450 default: 451 log_warnx("got imsg %d", imsg.hdr.type); 452 fatalx("queue_dispatch_lka: unexpected imsg"); 453 } 454 imsg_free(&imsg); 455 } 456 imsg_event_add(iev); 457 } 458 459 void 460 queue_dispatch_runner(int sig, short event, void *p) 461 { 462 struct smtpd *env = p; 463 struct imsgev *iev; 464 struct imsgbuf *ibuf; 465 struct imsg imsg; 466 ssize_t n; 467 468 iev = env->sc_ievs[PROC_RUNNER]; 469 ibuf = &iev->ibuf; 470 471 if (event & EV_READ) { 472 if ((n = imsg_read(ibuf)) == -1) 473 fatal("imsg_read_error"); 474 if (n == 0) { 475 /* this pipe is dead, so remove the event handler */ 476 event_del(&iev->ev); 477 event_loopexit(NULL); 478 return; 479 } 480 } 481 482 if (event & EV_WRITE) { 483 if (msgbuf_write(&ibuf->w) == -1) 484 fatal("msgbuf_write"); 485 } 486 487 for (;;) { 488 if ((n = imsg_get(ibuf, &imsg)) == -1) 489 fatal("queue_dispatch_runner: imsg_get error"); 490 if (n == 0) 491 break; 492 493 switch (imsg.hdr.type) { 494 default: 495 log_warnx("got imsg %d", imsg.hdr.type); 496 fatalx("queue_dispatch_runner: unexpected imsg"); 497 } 498 imsg_free(&imsg); 499 } 500 imsg_event_add(iev); 501 } 502 503 void 504 queue_shutdown(void) 505 { 506 log_info("queue handler"); 507 _exit(0); 508 } 509 510 void 511 queue_setup_events(struct smtpd *env) 512 { 513 } 514 515 void 516 queue_disable_events(struct smtpd *env) 517 { 518 } 519 520 pid_t 521 queue(struct smtpd *env) 522 { 523 pid_t pid; 524 struct passwd *pw; 525 526 struct event ev_sigint; 527 struct event ev_sigterm; 528 529 struct peer peers[] = { 530 { PROC_CONTROL, queue_dispatch_control }, 531 { PROC_SMTP, queue_dispatch_smtp }, 532 { PROC_MDA, queue_dispatch_mda }, 533 { PROC_MTA, queue_dispatch_mta }, 534 { PROC_LKA, queue_dispatch_lka }, 535 { PROC_RUNNER, queue_dispatch_runner } 536 }; 537 538 switch (pid = fork()) { 539 case -1: 540 fatal("queue: cannot fork"); 541 case 0: 542 break; 543 default: 544 return (pid); 545 } 546 547 purge_config(env, PURGE_EVERYTHING); 548 549 pw = env->sc_pw; 550 551 #ifndef DEBUG 552 if (chroot(PATH_SPOOL) == -1) 553 fatal("queue: chroot"); 554 if (chdir("/") == -1) 555 fatal("queue: chdir(\"/\")"); 556 #else 557 #warning disabling privilege revocation and chroot in DEBUG MODE 558 #endif 559 560 smtpd_process = PROC_QUEUE; 561 setproctitle("%s", env->sc_title[smtpd_process]); 562 563 #ifndef DEBUG 564 if (setgroups(1, &pw->pw_gid) || 565 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 566 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 567 fatal("queue: cannot drop privileges"); 568 #endif 569 570 event_init(); 571 572 signal_set(&ev_sigint, SIGINT, queue_sig_handler, env); 573 signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, env); 574 signal_add(&ev_sigint, NULL); 575 signal_add(&ev_sigterm, NULL); 576 signal(SIGPIPE, SIG_IGN); 577 signal(SIGHUP, SIG_IGN); 578 579 config_pipes(env, peers, nitems(peers)); 580 config_peers(env, peers, nitems(peers)); 581 582 queue_purge(PATH_INCOMING); 583 queue_purge(PATH_ENQUEUE); 584 585 queue_setup_events(env); 586 event_dispatch(); 587 queue_shutdown(); 588 589 return (0); 590 } 591 592 struct batch * 593 batch_by_id(struct smtpd *env, u_int64_t id) 594 { 595 struct batch lookup; 596 597 lookup.id = id; 598 return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); 599 } 600 601 602 void 603 queue_purge(char *queuepath) 604 { 605 char path[MAXPATHLEN]; 606 struct qwalk *q; 607 608 q = qwalk_new(queuepath); 609 610 while (qwalk(q, path)) 611 queue_delete_layout_message(queuepath, basename(path)); 612 613 qwalk_close(q); 614 } 615