1 /* $OpenBSD: mproc.c,v 1.10 2014/07/08 13:49:09 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2012 Eric Faurot <eric@faurot.net> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/socket.h> 21 #include <sys/tree.h> 22 #include <sys/queue.h> 23 #include <sys/uio.h> 24 25 #include <netinet/in.h> 26 #include <arpa/inet.h> 27 #include <arpa/nameser.h> 28 29 #include <err.h> 30 #include <errno.h> 31 #include <event.h> 32 #include <imsg.h> 33 #include <stdio.h> 34 #include <stdlib.h> 35 #include <string.h> 36 #include <unistd.h> 37 38 #include "smtpd.h" 39 #include "log.h" 40 41 static void mproc_dispatch(int, short, void *); 42 43 static ssize_t msgbuf_write2(struct msgbuf *); 44 45 int 46 mproc_fork(struct mproc *p, const char *path, char *argv[]) 47 { 48 int sp[2]; 49 50 if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sp) < 0) 51 return (-1); 52 53 session_socket_blockmode(sp[0], BM_NONBLOCK); 54 session_socket_blockmode(sp[1], BM_NONBLOCK); 55 56 if ((p->pid = fork()) == -1) 57 goto err; 58 59 if (p->pid == 0) { 60 /* child process */ 61 dup2(sp[0], STDIN_FILENO); 62 if (closefrom(STDERR_FILENO + 1) < 0) 63 exit(1); 64 65 execv(path, argv); 66 err(1, "execv: %s", path); 67 } 68 69 /* parent process */ 70 close(sp[0]); 71 mproc_init(p, sp[1]); 72 return (0); 73 74 err: 75 log_warn("warn: Failed to start process %s, instance of %s", argv[0], path); 76 close(sp[0]); 77 close(sp[1]); 78 return (-1); 79 } 80 81 void 82 mproc_init(struct mproc *p, int fd) 83 { 84 imsg_init(&p->imsgbuf, fd); 85 } 86 87 void 88 mproc_clear(struct mproc *p) 89 { 90 event_del(&p->ev); 91 close(p->imsgbuf.fd); 92 imsg_clear(&p->imsgbuf); 93 } 94 95 void 96 mproc_enable(struct mproc *p) 97 { 98 if (p->enable == 0) { 99 log_trace(TRACE_MPROC, "mproc: %s -> %s: enabled", 100 proc_name(smtpd_process), 101 proc_name(p->proc)); 102 p->enable = 1; 103 } 104 mproc_event_add(p); 105 } 106 107 void 108 mproc_disable(struct mproc *p) 109 { 110 if (p->enable == 1) { 111 log_trace(TRACE_MPROC, "mproc: %s -> %s: disabled", 112 proc_name(smtpd_process), 113 proc_name(p->proc)); 114 p->enable = 0; 115 } 116 mproc_event_add(p); 117 } 118 119 void 120 mproc_event_add(struct mproc *p) 121 { 122 short events; 123 124 if (p->enable) 125 events = EV_READ; 126 else 127 events = 0; 128 129 if (p->imsgbuf.w.queued) 130 events |= EV_WRITE; 131 132 if (p->events) 133 event_del(&p->ev); 134 135 p->events = events; 136 if (events) { 137 event_set(&p->ev, p->imsgbuf.fd, events, mproc_dispatch, p); 138 event_add(&p->ev, NULL); 139 } 140 } 141 142 static void 143 mproc_dispatch(int fd, short event, void *arg) 144 { 145 struct mproc *p = arg; 146 struct imsg imsg; 147 ssize_t n; 148 149 p->events = 0; 150 151 if (event & EV_READ) { 152 153 if ((n = imsg_read(&p->imsgbuf)) == -1) { 154 log_warn("warn: %s -> %s: imsg_read", 155 proc_name(smtpd_process), p->name); 156 fatal("exiting"); 157 } 158 if (n == 0) { 159 /* this pipe is dead, so remove the event handler */ 160 if (smtpd_process != PROC_CONTROL || 161 p->proc != PROC_CLIENT) 162 log_warnx("warn: %s -> %s: pipe closed", 163 proc_name(smtpd_process), p->name); 164 p->handler(p, NULL); 165 return; 166 } 167 p->bytes_in += n; 168 } 169 170 if (event & EV_WRITE) { 171 n = msgbuf_write2(&p->imsgbuf.w); 172 if (n == 0 || (n == -1 && errno != EAGAIN)) { 173 /* this pipe is dead, so remove the event handler */ 174 if (smtpd_process != PROC_CONTROL || 175 p->proc != PROC_CLIENT) 176 log_warnx("warn: %s -> %s: pipe closed", 177 proc_name(smtpd_process), p->name); 178 p->handler(p, NULL); 179 return; 180 } else if (n != -1) { 181 p->bytes_out += n; 182 p->bytes_queued -= n; 183 } 184 } 185 186 for (;;) { 187 if ((n = imsg_get(&p->imsgbuf, &imsg)) == -1) { 188 log_warn("fatal: %s: error in imsg_get for %s", 189 proc_name(smtpd_process), p->name); 190 fatalx(NULL); 191 } 192 if (n == 0) 193 break; 194 195 p->msg_in += 1; 196 p->handler(p, &imsg); 197 198 imsg_free(&imsg); 199 } 200 201 #if 0 202 if (smtpd_process == PROC_QUEUE) 203 queue_flow_control(); 204 #endif 205 206 mproc_event_add(p); 207 } 208 209 /* XXX msgbuf_write() should return n ... */ 210 static ssize_t 211 msgbuf_write2(struct msgbuf *msgbuf) 212 { 213 struct iovec iov[IOV_MAX]; 214 struct ibuf *buf; 215 unsigned int i = 0; 216 ssize_t n; 217 struct msghdr msg; 218 struct cmsghdr *cmsg; 219 union { 220 struct cmsghdr hdr; 221 char buf[CMSG_SPACE(sizeof(int))]; 222 } cmsgbuf; 223 224 memset(&iov, 0, sizeof(iov)); 225 memset(&msg, 0, sizeof(msg)); 226 TAILQ_FOREACH(buf, &msgbuf->bufs, entry) { 227 if (i >= IOV_MAX) 228 break; 229 iov[i].iov_base = buf->buf + buf->rpos; 230 iov[i].iov_len = buf->wpos - buf->rpos; 231 i++; 232 if (buf->fd != -1) 233 break; 234 } 235 236 msg.msg_iov = iov; 237 msg.msg_iovlen = i; 238 239 if (buf != NULL && buf->fd != -1) { 240 msg.msg_control = (caddr_t)&cmsgbuf.buf; 241 msg.msg_controllen = sizeof(cmsgbuf.buf); 242 cmsg = CMSG_FIRSTHDR(&msg); 243 cmsg->cmsg_len = CMSG_LEN(sizeof(int)); 244 cmsg->cmsg_level = SOL_SOCKET; 245 cmsg->cmsg_type = SCM_RIGHTS; 246 *(int *)CMSG_DATA(cmsg) = buf->fd; 247 } 248 249 again: 250 if ((n = sendmsg(msgbuf->fd, &msg, 0)) == -1) { 251 if (errno == EINTR) 252 goto again; 253 if (errno == ENOBUFS) 254 errno = EAGAIN; 255 return (-1); 256 } 257 258 if (n == 0) { /* connection closed */ 259 errno = 0; 260 return (0); 261 } 262 263 /* 264 * assumption: fd got sent if sendmsg sent anything 265 * this works because fds are passed one at a time 266 */ 267 if (buf != NULL && buf->fd != -1) { 268 close(buf->fd); 269 buf->fd = -1; 270 } 271 272 msgbuf_drain(msgbuf, n); 273 274 return (n); 275 } 276 277 void 278 m_forward(struct mproc *p, struct imsg *imsg) 279 { 280 imsg_compose(&p->imsgbuf, imsg->hdr.type, imsg->hdr.peerid, 281 imsg->hdr.pid, imsg->fd, imsg->data, 282 imsg->hdr.len - sizeof(imsg->hdr)); 283 284 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s (forward)", 285 proc_name(smtpd_process), 286 proc_name(p->proc), 287 imsg->hdr.len - sizeof(imsg->hdr), 288 imsg_to_str(imsg->hdr.type)); 289 290 p->msg_out += 1; 291 p->bytes_queued += imsg->hdr.len; 292 if (p->bytes_queued > p->bytes_queued_max) 293 p->bytes_queued_max = p->bytes_queued; 294 295 mproc_event_add(p); 296 } 297 298 void 299 m_compose(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, int fd, 300 void *data, size_t len) 301 { 302 imsg_compose(&p->imsgbuf, type, peerid, pid, fd, data, len); 303 304 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s", 305 proc_name(smtpd_process), 306 proc_name(p->proc), 307 len, 308 imsg_to_str(type)); 309 310 p->msg_out += 1; 311 p->bytes_queued += len + IMSG_HEADER_SIZE; 312 if (p->bytes_queued > p->bytes_queued_max) 313 p->bytes_queued_max = p->bytes_queued; 314 315 mproc_event_add(p); 316 } 317 318 void 319 m_composev(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, 320 int fd, const struct iovec *iov, int n) 321 { 322 size_t len; 323 int i; 324 325 imsg_composev(&p->imsgbuf, type, peerid, pid, fd, iov, n); 326 327 len = 0; 328 for (i = 0; i < n; i++) 329 len += iov[i].iov_len; 330 331 p->msg_out += 1; 332 p->bytes_queued += IMSG_HEADER_SIZE + len; 333 if (p->bytes_queued > p->bytes_queued_max) 334 p->bytes_queued_max = p->bytes_queued; 335 336 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s", 337 proc_name(smtpd_process), 338 proc_name(p->proc), 339 len, 340 imsg_to_str(type)); 341 342 mproc_event_add(p); 343 } 344 345 void 346 m_create(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, int fd) 347 { 348 if (p->m_buf == NULL) { 349 p->m_alloc = 128; 350 log_trace(TRACE_MPROC, "mproc: %s -> %s: allocating %zu", 351 proc_name(smtpd_process), 352 proc_name(p->proc), 353 p->m_alloc); 354 p->m_buf = malloc(p->m_alloc); 355 if (p->m_buf == NULL) 356 fatal("warn: m_create: malloc"); 357 } 358 359 p->m_pos = 0; 360 p->m_type = type; 361 p->m_peerid = peerid; 362 p->m_pid = pid; 363 p->m_fd = fd; 364 } 365 366 void 367 m_add(struct mproc *p, const void *data, size_t len) 368 { 369 size_t alloc; 370 void *tmp; 371 372 if (p->m_pos + len + IMSG_HEADER_SIZE > MAX_IMSGSIZE) { 373 log_warnx("warn: message to large"); 374 fatal(NULL); 375 } 376 377 alloc = p->m_alloc; 378 while (p->m_pos + len > alloc) 379 alloc *= 2; 380 if (alloc != p->m_alloc) { 381 log_trace(TRACE_MPROC, "mproc: %s -> %s: realloc %zu -> %zu", 382 proc_name(smtpd_process), 383 proc_name(p->proc), 384 p->m_alloc, 385 alloc); 386 387 tmp = realloc(p->m_buf, alloc); 388 if (tmp == NULL) 389 fatal("realloc"); 390 p->m_alloc = alloc; 391 p->m_buf = tmp; 392 } 393 394 memmove(p->m_buf + p->m_pos, data, len); 395 p->m_pos += len; 396 } 397 398 void 399 m_close(struct mproc *p) 400 { 401 if (imsg_compose(&p->imsgbuf, p->m_type, p->m_peerid, p->m_pid, p->m_fd, 402 p->m_buf, p->m_pos) == -1) 403 fatal("imsg_compose"); 404 405 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s", 406 proc_name(smtpd_process), 407 proc_name(p->proc), 408 p->m_pos, 409 imsg_to_str(p->m_type)); 410 411 p->msg_out += 1; 412 p->bytes_queued += p->m_pos + IMSG_HEADER_SIZE; 413 if (p->bytes_queued > p->bytes_queued_max) 414 p->bytes_queued_max = p->bytes_queued; 415 416 mproc_event_add(p); 417 } 418 419 void 420 m_flush(struct mproc *p) 421 { 422 if (imsg_compose(&p->imsgbuf, p->m_type, p->m_peerid, p->m_pid, p->m_fd, 423 p->m_buf, p->m_pos) == -1) 424 fatal("imsg_compose"); 425 426 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s (flush)", 427 proc_name(smtpd_process), 428 proc_name(p->proc), 429 p->m_pos, 430 imsg_to_str(p->m_type)); 431 432 p->msg_out += 1; 433 p->m_pos = 0; 434 435 imsg_flush(&p->imsgbuf); 436 } 437 438 static struct imsg * current; 439 440 static void 441 m_error(const char *error) 442 { 443 char buf[512]; 444 445 (void)snprintf(buf, sizeof buf, "%s: %s: %s", 446 proc_name(smtpd_process), 447 imsg_to_str(current->hdr.type), 448 error); 449 fatalx("%s", buf); 450 } 451 452 void 453 m_msg(struct msg *m, struct imsg *imsg) 454 { 455 current = imsg; 456 m->pos = imsg->data; 457 m->end = m->pos + (imsg->hdr.len - sizeof(imsg->hdr)); 458 } 459 460 void 461 m_end(struct msg *m) 462 { 463 if (m->pos != m->end) 464 m_error("not at msg end"); 465 } 466 467 int 468 m_is_eom(struct msg *m) 469 { 470 return (m->pos == m->end); 471 } 472 473 static inline void 474 m_get(struct msg *m, void *dst, size_t sz) 475 { 476 if (m->pos + sz > m->end) 477 m_error("msg too short"); 478 memmove(dst, m->pos, sz); 479 m->pos += sz; 480 } 481 482 static inline void 483 m_get_typed(struct msg *m, uint8_t type, void *dst, size_t sz) 484 { 485 if (m->pos + 1 + sz > m->end) 486 m_error("msg too short"); 487 if (*m->pos != type) 488 m_error("msg bad type"); 489 memmove(dst, m->pos + 1, sz); 490 m->pos += sz + 1; 491 } 492 493 static inline void 494 m_get_typed_sized(struct msg *m, uint8_t type, const void **dst, size_t *sz) 495 { 496 if (m->pos + 1 + sizeof(*sz) > m->end) 497 m_error("msg too short"); 498 if (*m->pos != type) 499 m_error("msg bad type"); 500 memmove(sz, m->pos + 1, sizeof(*sz)); 501 m->pos += sizeof(sz) + 1; 502 if (m->pos + *sz > m->end) 503 m_error("msg too short"); 504 *dst = m->pos; 505 m->pos += *sz; 506 } 507 508 static void 509 m_add_typed(struct mproc *p, uint8_t type, const void *data, size_t len) 510 { 511 m_add(p, &type, 1); 512 m_add(p, data, len); 513 } 514 515 static void 516 m_add_typed_sized(struct mproc *p, uint8_t type, const void *data, size_t len) 517 { 518 m_add(p, &type, 1); 519 m_add(p, &len, sizeof(len)); 520 m_add(p, data, len); 521 } 522 523 enum { 524 M_INT, 525 M_UINT32, 526 M_SIZET, 527 M_TIME, 528 M_STRING, 529 M_DATA, 530 M_ID, 531 M_EVPID, 532 M_MSGID, 533 M_SOCKADDR, 534 M_MAILADDR, 535 M_ENVELOPE, 536 }; 537 538 void 539 m_add_int(struct mproc *m, int v) 540 { 541 m_add_typed(m, M_INT, &v, sizeof v); 542 }; 543 544 void 545 m_add_u32(struct mproc *m, uint32_t u32) 546 { 547 m_add_typed(m, M_UINT32, &u32, sizeof u32); 548 }; 549 550 void 551 m_add_size(struct mproc *m, size_t sz) 552 { 553 m_add_typed(m, M_SIZET, &sz, sizeof sz); 554 }; 555 556 void 557 m_add_time(struct mproc *m, time_t v) 558 { 559 m_add_typed(m, M_TIME, &v, sizeof v); 560 }; 561 562 void 563 m_add_string(struct mproc *m, const char *v) 564 { 565 m_add_typed(m, M_STRING, v, strlen(v) + 1); 566 }; 567 568 void 569 m_add_data(struct mproc *m, const void *v, size_t len) 570 { 571 m_add_typed_sized(m, M_DATA, v, len); 572 }; 573 574 void 575 m_add_id(struct mproc *m, uint64_t v) 576 { 577 m_add_typed(m, M_ID, &v, sizeof(v)); 578 } 579 580 void 581 m_add_evpid(struct mproc *m, uint64_t v) 582 { 583 m_add_typed(m, M_EVPID, &v, sizeof(v)); 584 } 585 586 void 587 m_add_msgid(struct mproc *m, uint32_t v) 588 { 589 m_add_typed(m, M_MSGID, &v, sizeof(v)); 590 } 591 592 void 593 m_add_sockaddr(struct mproc *m, const struct sockaddr *sa) 594 { 595 m_add_typed_sized(m, M_SOCKADDR, sa, sa->sa_len); 596 } 597 598 void 599 m_add_mailaddr(struct mproc *m, const struct mailaddr *maddr) 600 { 601 m_add_typed(m, M_MAILADDR, maddr, sizeof(*maddr)); 602 } 603 604 #ifndef BUILD_FILTER 605 void 606 m_add_envelope(struct mproc *m, const struct envelope *evp) 607 { 608 #if 0 609 m_add_typed(m, M_ENVELOPE, evp, sizeof(*evp)); 610 #else 611 char buf[sizeof(*evp)]; 612 613 envelope_dump_buffer(evp, buf, sizeof(buf)); 614 m_add_evpid(m, evp->id); 615 m_add_typed_sized(m, M_ENVELOPE, buf, strlen(buf) + 1); 616 #endif 617 } 618 #endif 619 620 void 621 m_add_params(struct mproc *m, struct dict *d) 622 { 623 const char *key; 624 char *value; 625 void *iter; 626 627 if (d == NULL) { 628 m_add_size(m, 0); 629 return; 630 } 631 m_add_size(m, dict_count(d)); 632 iter = NULL; 633 while (dict_iter(d, &iter, &key, (void **)&value)) { 634 m_add_string(m, key); 635 m_add_string(m, value); 636 } 637 } 638 639 void 640 m_get_int(struct msg *m, int *i) 641 { 642 m_get_typed(m, M_INT, i, sizeof(*i)); 643 } 644 645 void 646 m_get_u32(struct msg *m, uint32_t *u32) 647 { 648 m_get_typed(m, M_UINT32, u32, sizeof(*u32)); 649 } 650 651 void 652 m_get_size(struct msg *m, size_t *sz) 653 { 654 m_get_typed(m, M_SIZET, sz, sizeof(*sz)); 655 } 656 657 void 658 m_get_time(struct msg *m, time_t *t) 659 { 660 m_get_typed(m, M_TIME, t, sizeof(*t)); 661 } 662 663 void 664 m_get_string(struct msg *m, const char **s) 665 { 666 uint8_t *end; 667 668 if (m->pos + 2 > m->end) 669 m_error("msg too short"); 670 if (*m->pos != M_STRING) 671 m_error("bad msg type"); 672 673 end = memchr(m->pos + 1, 0, m->end - (m->pos + 1)); 674 if (end == NULL) 675 m_error("unterminated string"); 676 677 *s = m->pos + 1; 678 m->pos = end + 1; 679 } 680 681 void 682 m_get_data(struct msg *m, const void **data, size_t *sz) 683 { 684 m_get_typed_sized(m, M_DATA, data, sz); 685 } 686 687 void 688 m_get_evpid(struct msg *m, uint64_t *evpid) 689 { 690 m_get_typed(m, M_EVPID, evpid, sizeof(*evpid)); 691 } 692 693 void 694 m_get_msgid(struct msg *m, uint32_t *msgid) 695 { 696 m_get_typed(m, M_MSGID, msgid, sizeof(*msgid)); 697 } 698 699 void 700 m_get_id(struct msg *m, uint64_t *id) 701 { 702 m_get_typed(m, M_ID, id, sizeof(*id)); 703 } 704 705 void 706 m_get_sockaddr(struct msg *m, struct sockaddr *sa) 707 { 708 size_t s; 709 const void *d; 710 711 m_get_typed_sized(m, M_SOCKADDR, &d, &s); 712 memmove(sa, d, s); 713 } 714 715 void 716 m_get_mailaddr(struct msg *m, struct mailaddr *maddr) 717 { 718 m_get_typed(m, M_MAILADDR, maddr, sizeof(*maddr)); 719 } 720 721 #ifndef BUILD_FILTER 722 void 723 m_get_envelope(struct msg *m, struct envelope *evp) 724 { 725 #if 0 726 m_get_typed(m, M_ENVELOPE, evp, sizeof(*evp)); 727 #else 728 uint64_t evpid; 729 size_t s; 730 const void *d; 731 732 m_get_evpid(m, &evpid); 733 m_get_typed_sized(m, M_ENVELOPE, &d, &s); 734 735 if (!envelope_load_buffer(evp, d, s - 1)) 736 fatalx("failed to retrieve envelope"); 737 evp->id = evpid; 738 #endif 739 } 740 #endif 741 742 void 743 m_get_params(struct msg *m, struct dict *d) 744 { 745 size_t c; 746 const char *key; 747 const char *value; 748 char *tmp; 749 750 dict_init(d); 751 752 m_get_size(m, &c); 753 754 for (; c; c--) { 755 m_get_string(m, &key); 756 m_get_string(m, &value); 757 if ((tmp = strdup(value)) == NULL) 758 fatal("m_get_params"); 759 dict_set(d, key, tmp); 760 } 761 } 762 763 void 764 m_clear_params(struct dict *d) 765 { 766 char *value; 767 768 while (dict_poproot(d, (void **)&value)) 769 free(value); 770 } 771