1 /* $OpenBSD: mproc.c,v 1.37 2020/12/20 14:06:12 martijn Exp $ */ 2 3 /* 4 * Copyright (c) 2012 Eric Faurot <eric@faurot.net> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/socket.h> 21 #include <sys/tree.h> 22 #include <sys/queue.h> 23 #include <sys/uio.h> 24 25 #include <netinet/in.h> 26 #include <arpa/inet.h> 27 #include <arpa/nameser.h> 28 29 #include <err.h> 30 #include <errno.h> 31 #include <event.h> 32 #include <imsg.h> 33 #include <stdio.h> 34 #include <stdlib.h> 35 #include <limits.h> 36 #include <string.h> 37 #include <unistd.h> 38 39 #include "smtpd.h" 40 #include "log.h" 41 42 static void mproc_dispatch(int, short, void *); 43 44 static ssize_t imsg_read_nofd(struct imsgbuf *); 45 46 int 47 mproc_fork(struct mproc *p, const char *path, char *argv[]) 48 { 49 int sp[2]; 50 51 if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sp) == -1) 52 return (-1); 53 54 io_set_nonblocking(sp[0]); 55 io_set_nonblocking(sp[1]); 56 57 if ((p->pid = fork()) == -1) 58 goto err; 59 60 if (p->pid == 0) { 61 /* child process */ 62 dup2(sp[0], STDIN_FILENO); 63 if (closefrom(STDERR_FILENO + 1) == -1) 64 exit(1); 65 66 execv(path, argv); 67 err(1, "execv: %s", path); 68 } 69 70 /* parent process */ 71 close(sp[0]); 72 mproc_init(p, sp[1]); 73 return (0); 74 75 err: 76 log_warn("warn: Failed to start process %s, instance of %s", argv[0], path); 77 close(sp[0]); 78 close(sp[1]); 79 return (-1); 80 } 81 82 void 83 mproc_init(struct mproc *p, int fd) 84 { 85 imsg_init(&p->imsgbuf, fd); 86 } 87 88 void 89 mproc_clear(struct mproc *p) 90 { 91 log_debug("debug: clearing p=%s, fd=%d, pid=%d", p->name, p->imsgbuf.fd, p->pid); 92 93 if (p->events) 94 event_del(&p->ev); 95 close(p->imsgbuf.fd); 96 imsg_clear(&p->imsgbuf); 97 } 98 99 void 100 mproc_enable(struct mproc *p) 101 { 102 if (p->enable == 0) { 103 log_trace(TRACE_MPROC, "mproc: %s -> %s: enabled", 104 proc_name(smtpd_process), 105 proc_name(p->proc)); 106 p->enable = 1; 107 } 108 mproc_event_add(p); 109 } 110 111 void 112 mproc_disable(struct mproc *p) 113 { 114 if (p->enable == 1) { 115 log_trace(TRACE_MPROC, "mproc: %s -> %s: disabled", 116 proc_name(smtpd_process), 117 proc_name(p->proc)); 118 p->enable = 0; 119 } 120 mproc_event_add(p); 121 } 122 123 void 124 mproc_event_add(struct mproc *p) 125 { 126 short events; 127 128 if (p->enable) 129 events = EV_READ; 130 else 131 events = 0; 132 133 if (p->imsgbuf.w.queued) 134 events |= EV_WRITE; 135 136 if (p->events) 137 event_del(&p->ev); 138 139 p->events = events; 140 if (events) { 141 event_set(&p->ev, p->imsgbuf.fd, events, mproc_dispatch, p); 142 event_add(&p->ev, NULL); 143 } 144 } 145 146 static void 147 mproc_dispatch(int fd, short event, void *arg) 148 { 149 struct mproc *p = arg; 150 struct imsg imsg; 151 ssize_t n; 152 153 p->events = 0; 154 155 if (event & EV_READ) { 156 157 if (p->proc == PROC_CLIENT) 158 n = imsg_read_nofd(&p->imsgbuf); 159 else 160 n = imsg_read(&p->imsgbuf); 161 162 switch (n) { 163 case -1: 164 if (errno == EAGAIN) 165 break; 166 log_warn("warn: %s -> %s: imsg_read", 167 proc_name(smtpd_process), p->name); 168 fatal("exiting"); 169 /* NOTREACHED */ 170 case 0: 171 /* this pipe is dead, so remove the event handler */ 172 log_debug("debug: %s -> %s: pipe closed", 173 proc_name(smtpd_process), p->name); 174 p->handler(p, NULL); 175 return; 176 default: 177 break; 178 } 179 } 180 181 if (event & EV_WRITE) { 182 n = msgbuf_write(&p->imsgbuf.w); 183 if (n == 0 || (n == -1 && errno != EAGAIN)) { 184 /* this pipe is dead, so remove the event handler */ 185 log_debug("debug: %s -> %s: pipe closed", 186 proc_name(smtpd_process), p->name); 187 p->handler(p, NULL); 188 return; 189 } 190 } 191 192 for (;;) { 193 if ((n = imsg_get(&p->imsgbuf, &imsg)) == -1) { 194 195 if (smtpd_process == PROC_CONTROL && 196 p->proc == PROC_CLIENT) { 197 log_warnx("warn: client sent invalid imsg " 198 "over control socket"); 199 p->handler(p, NULL); 200 return; 201 } 202 log_warn("fatal: %s: error in imsg_get for %s", 203 proc_name(smtpd_process), p->name); 204 fatalx(NULL); 205 } 206 if (n == 0) 207 break; 208 209 p->handler(p, &imsg); 210 211 imsg_free(&imsg); 212 } 213 214 mproc_event_add(p); 215 } 216 217 /* This should go into libutil */ 218 static ssize_t 219 imsg_read_nofd(struct imsgbuf *ibuf) 220 { 221 ssize_t n; 222 char *buf; 223 size_t len; 224 225 buf = ibuf->r.buf + ibuf->r.wpos; 226 len = sizeof(ibuf->r.buf) - ibuf->r.wpos; 227 228 while ((n = recv(ibuf->fd, buf, len, 0)) == -1) { 229 if (errno != EINTR) 230 return (n); 231 } 232 233 ibuf->r.wpos += n; 234 return (n); 235 } 236 237 void 238 m_forward(struct mproc *p, struct imsg *imsg) 239 { 240 imsg_compose(&p->imsgbuf, imsg->hdr.type, imsg->hdr.peerid, 241 imsg->hdr.pid, imsg->fd, imsg->data, 242 imsg->hdr.len - sizeof(imsg->hdr)); 243 244 if (imsg->hdr.type != IMSG_STAT_DECREMENT && 245 imsg->hdr.type != IMSG_STAT_INCREMENT) 246 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s (forward)", 247 proc_name(smtpd_process), 248 proc_name(p->proc), 249 imsg->hdr.len - sizeof(imsg->hdr), 250 imsg_to_str(imsg->hdr.type)); 251 252 mproc_event_add(p); 253 } 254 255 void 256 m_compose(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, int fd, 257 void *data, size_t len) 258 { 259 imsg_compose(&p->imsgbuf, type, peerid, pid, fd, data, len); 260 261 if (type != IMSG_STAT_DECREMENT && 262 type != IMSG_STAT_INCREMENT) 263 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s", 264 proc_name(smtpd_process), 265 proc_name(p->proc), 266 len, 267 imsg_to_str(type)); 268 269 mproc_event_add(p); 270 } 271 272 void 273 m_composev(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, 274 int fd, const struct iovec *iov, int n) 275 { 276 size_t len; 277 int i; 278 279 imsg_composev(&p->imsgbuf, type, peerid, pid, fd, iov, n); 280 281 len = 0; 282 for (i = 0; i < n; i++) 283 len += iov[i].iov_len; 284 285 if (type != IMSG_STAT_DECREMENT && 286 type != IMSG_STAT_INCREMENT) 287 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s", 288 proc_name(smtpd_process), 289 proc_name(p->proc), 290 len, 291 imsg_to_str(type)); 292 293 mproc_event_add(p); 294 } 295 296 void 297 m_create(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, int fd) 298 { 299 p->m_pos = 0; 300 p->m_type = type; 301 p->m_peerid = peerid; 302 p->m_pid = pid; 303 p->m_fd = fd; 304 } 305 306 void 307 m_add(struct mproc *p, const void *data, size_t len) 308 { 309 size_t alloc; 310 void *tmp; 311 312 if (p->m_pos + len + IMSG_HEADER_SIZE > MAX_IMSGSIZE) { 313 log_warnx("warn: message too large"); 314 fatal(NULL); 315 } 316 317 alloc = p->m_alloc ? p->m_alloc : 128; 318 while (p->m_pos + len > alloc) 319 alloc *= 2; 320 if (alloc != p->m_alloc) { 321 log_trace(TRACE_MPROC, "mproc: %s -> %s: realloc %zu -> %zu", 322 proc_name(smtpd_process), 323 proc_name(p->proc), 324 p->m_alloc, 325 alloc); 326 327 tmp = recallocarray(p->m_buf, p->m_alloc, alloc, 1); 328 if (tmp == NULL) 329 fatal("realloc"); 330 p->m_alloc = alloc; 331 p->m_buf = tmp; 332 } 333 334 memmove(p->m_buf + p->m_pos, data, len); 335 p->m_pos += len; 336 } 337 338 void 339 m_close(struct mproc *p) 340 { 341 if (imsg_compose(&p->imsgbuf, p->m_type, p->m_peerid, p->m_pid, p->m_fd, 342 p->m_buf, p->m_pos) == -1) 343 fatal("imsg_compose"); 344 345 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s", 346 proc_name(smtpd_process), 347 proc_name(p->proc), 348 p->m_pos, 349 imsg_to_str(p->m_type)); 350 351 mproc_event_add(p); 352 } 353 354 void 355 m_flush(struct mproc *p) 356 { 357 if (imsg_compose(&p->imsgbuf, p->m_type, p->m_peerid, p->m_pid, p->m_fd, 358 p->m_buf, p->m_pos) == -1) 359 fatal("imsg_compose"); 360 361 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s (flush)", 362 proc_name(smtpd_process), 363 proc_name(p->proc), 364 p->m_pos, 365 imsg_to_str(p->m_type)); 366 367 p->m_pos = 0; 368 369 if (imsg_flush(&p->imsgbuf) == -1) 370 fatal("imsg_flush"); 371 } 372 373 static struct imsg * current; 374 375 static void 376 m_error(const char *error) 377 { 378 char buf[512]; 379 380 (void)snprintf(buf, sizeof buf, "%s: %s: %s", 381 proc_name(smtpd_process), 382 imsg_to_str(current->hdr.type), 383 error); 384 fatalx("%s", buf); 385 } 386 387 void 388 m_msg(struct msg *m, struct imsg *imsg) 389 { 390 current = imsg; 391 m->pos = imsg->data; 392 m->end = m->pos + (imsg->hdr.len - sizeof(imsg->hdr)); 393 } 394 395 void 396 m_end(struct msg *m) 397 { 398 if (m->pos != m->end) 399 m_error("not at msg end"); 400 } 401 402 int 403 m_is_eom(struct msg *m) 404 { 405 return (m->pos == m->end); 406 } 407 408 static inline void 409 m_get(struct msg *m, void *dst, size_t sz) 410 { 411 if (sz > MAX_IMSGSIZE || 412 m->end - m->pos < (ssize_t)sz) 413 fatalx("msg too short"); 414 415 memmove(dst, m->pos, sz); 416 m->pos += sz; 417 } 418 419 void 420 m_add_int(struct mproc *m, int v) 421 { 422 m_add(m, &v, sizeof(v)); 423 }; 424 425 void 426 m_add_u32(struct mproc *m, uint32_t u32) 427 { 428 m_add(m, &u32, sizeof(u32)); 429 }; 430 431 void 432 m_add_size(struct mproc *m, size_t sz) 433 { 434 m_add(m, &sz, sizeof(sz)); 435 }; 436 437 void 438 m_add_time(struct mproc *m, time_t v) 439 { 440 m_add(m, &v, sizeof(v)); 441 }; 442 443 void 444 m_add_timeval(struct mproc *m, struct timeval *tv) 445 { 446 m_add(m, tv, sizeof(*tv)); 447 } 448 449 450 void 451 m_add_string(struct mproc *m, const char *v) 452 { 453 if (v) { 454 m_add(m, "s", 1); 455 m_add(m, v, strlen(v) + 1); 456 } 457 else 458 m_add(m, "\0", 1); 459 }; 460 461 void 462 m_add_data(struct mproc *m, const void *v, size_t len) 463 { 464 m_add_size(m, len); 465 m_add(m, v, len); 466 }; 467 468 void 469 m_add_id(struct mproc *m, uint64_t v) 470 { 471 m_add(m, &v, sizeof(v)); 472 } 473 474 void 475 m_add_evpid(struct mproc *m, uint64_t v) 476 { 477 m_add(m, &v, sizeof(v)); 478 } 479 480 void 481 m_add_msgid(struct mproc *m, uint32_t v) 482 { 483 m_add(m, &v, sizeof(v)); 484 } 485 486 void 487 m_add_sockaddr(struct mproc *m, const struct sockaddr *sa) 488 { 489 m_add_size(m, sa->sa_len); 490 m_add(m, sa, sa->sa_len); 491 } 492 493 void 494 m_add_mailaddr(struct mproc *m, const struct mailaddr *maddr) 495 { 496 m_add(m, maddr, sizeof(*maddr)); 497 } 498 499 void 500 m_add_envelope(struct mproc *m, const struct envelope *evp) 501 { 502 char buf[sizeof(*evp)]; 503 504 envelope_dump_buffer(evp, buf, sizeof(buf)); 505 m_add_evpid(m, evp->id); 506 m_add_string(m, buf); 507 } 508 509 void 510 m_add_params(struct mproc *m, struct dict *d) 511 { 512 const char *key; 513 char *value; 514 void *iter; 515 516 if (d == NULL) { 517 m_add_size(m, 0); 518 return; 519 } 520 m_add_size(m, dict_count(d)); 521 iter = NULL; 522 while (dict_iter(d, &iter, &key, (void **)&value)) { 523 m_add_string(m, key); 524 m_add_string(m, value); 525 } 526 } 527 528 void 529 m_get_int(struct msg *m, int *i) 530 { 531 m_get(m, i, sizeof(*i)); 532 } 533 534 void 535 m_get_u32(struct msg *m, uint32_t *u32) 536 { 537 m_get(m, u32, sizeof(*u32)); 538 } 539 540 void 541 m_get_size(struct msg *m, size_t *sz) 542 { 543 m_get(m, sz, sizeof(*sz)); 544 } 545 546 void 547 m_get_time(struct msg *m, time_t *t) 548 { 549 m_get(m, t, sizeof(*t)); 550 } 551 552 void 553 m_get_timeval(struct msg *m, struct timeval *tv) 554 { 555 m_get(m, tv, sizeof(*tv)); 556 } 557 558 void 559 m_get_string(struct msg *m, const char **s) 560 { 561 uint8_t *end; 562 char c; 563 564 if (m->pos >= m->end) 565 m_error("msg too short"); 566 567 c = *m->pos++; 568 if (c == '\0') { 569 *s = NULL; 570 return; 571 } 572 573 if (m->pos >= m->end) 574 m_error("msg too short"); 575 end = memchr(m->pos, 0, m->end - m->pos); 576 if (end == NULL) 577 m_error("unterminated string"); 578 579 *s = m->pos; 580 m->pos = end + 1; 581 } 582 583 void 584 m_get_data(struct msg *m, const void **data, size_t *sz) 585 { 586 m_get_size(m, sz); 587 588 if (*sz == 0) { 589 *data = NULL; 590 return; 591 } 592 593 if (m->pos + *sz > m->end) 594 m_error("msg too short"); 595 596 *data = m->pos; 597 m->pos += *sz; 598 } 599 600 void 601 m_get_evpid(struct msg *m, uint64_t *evpid) 602 { 603 m_get(m, evpid, sizeof(*evpid)); 604 } 605 606 void 607 m_get_msgid(struct msg *m, uint32_t *msgid) 608 { 609 m_get(m, msgid, sizeof(*msgid)); 610 } 611 612 void 613 m_get_id(struct msg *m, uint64_t *id) 614 { 615 m_get(m, id, sizeof(*id)); 616 } 617 618 void 619 m_get_sockaddr(struct msg *m, struct sockaddr *sa) 620 { 621 size_t len; 622 623 m_get_size(m, &len); 624 m_get(m, sa, len); 625 } 626 627 void 628 m_get_mailaddr(struct msg *m, struct mailaddr *maddr) 629 { 630 m_get(m, maddr, sizeof(*maddr)); 631 } 632 633 void 634 m_get_envelope(struct msg *m, struct envelope *evp) 635 { 636 uint64_t evpid; 637 const char *buf; 638 639 m_get_evpid(m, &evpid); 640 m_get_string(m, &buf); 641 if (buf == NULL) 642 fatalx("empty envelope buffer"); 643 644 if (!envelope_load_buffer(evp, buf, strlen(buf))) 645 fatalx("failed to retrieve envelope"); 646 evp->id = evpid; 647 } 648 649 void 650 m_get_params(struct msg *m, struct dict *d) 651 { 652 size_t c; 653 const char *key; 654 const char *value; 655 char *tmp; 656 657 dict_init(d); 658 659 m_get_size(m, &c); 660 661 for (; c; c--) { 662 m_get_string(m, &key); 663 m_get_string(m, &value); 664 if ((tmp = strdup(value)) == NULL) 665 fatal("m_get_params"); 666 dict_set(d, key, tmp); 667 } 668 } 669 670 void 671 m_clear_params(struct dict *d) 672 { 673 char *value; 674 675 while (dict_poproot(d, (void **)&value)) 676 free(value); 677 } 678