1 /* $OpenBSD: queue_backend.c,v 1.56 2015/10/09 14:37:38 gilles Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <grp.h> 31 #include <imsg.h> 32 #include <limits.h> 33 #include <inttypes.h> 34 #include <libgen.h> 35 #include <pwd.h> 36 #include <stdio.h> 37 #include <stdlib.h> 38 #include <string.h> 39 #include <time.h> 40 #include <unistd.h> 41 42 #include "smtpd.h" 43 #include "log.h" 44 45 static const char* envelope_validate(struct envelope *); 46 47 extern struct queue_backend queue_backend_fs; 48 extern struct queue_backend queue_backend_null; 49 extern struct queue_backend queue_backend_proc; 50 extern struct queue_backend queue_backend_ram; 51 52 static void queue_envelope_cache_add(struct envelope *); 53 static void queue_envelope_cache_update(struct envelope *); 54 static void queue_envelope_cache_del(uint64_t evpid); 55 56 TAILQ_HEAD(evplst, envelope); 57 58 static struct tree evpcache_tree; 59 static struct evplst evpcache_list; 60 static struct queue_backend *backend; 61 62 static int (*handler_close)(void); 63 static int (*handler_message_create)(uint32_t *); 64 static int (*handler_message_commit)(uint32_t, const char*); 65 static int (*handler_message_delete)(uint32_t); 66 static int (*handler_message_fd_r)(uint32_t); 67 static int (*handler_message_corrupt)(uint32_t); 68 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 69 static int (*handler_envelope_delete)(uint64_t); 70 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 71 static int (*handler_envelope_load)(uint64_t, char *, size_t); 72 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 73 74 #ifdef QUEUE_PROFILING 75 76 static struct { 77 struct timespec t0; 78 const char *name; 79 } profile; 80 81 static inline void profile_enter(const char *name) 82 { 83 if ((profiling & PROFILE_QUEUE) == 0) 84 return; 85 86 profile.name = name; 87 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 88 } 89 90 static inline void profile_leave(void) 91 { 92 struct timespec t1, dt; 93 94 if ((profiling & PROFILE_QUEUE) == 0) 95 return; 96 97 clock_gettime(CLOCK_MONOTONIC, &t1); 98 timespecsub(&t1, &profile.t0, &dt); 99 log_debug("profile-queue: %s %lld.%09ld", profile.name, 100 (long long)dt.tv_sec, dt.tv_nsec); 101 } 102 #else 103 #define profile_enter(x) do {} while (0) 104 #define profile_leave() do {} while (0) 105 #endif 106 107 static int 108 queue_message_path(uint32_t msgid, char *buf, size_t len) 109 { 110 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 111 } 112 113 int 114 queue_init(const char *name, int server) 115 { 116 struct passwd *pwq; 117 struct group *gr; 118 int r; 119 120 pwq = getpwnam(SMTPD_QUEUE_USER); 121 if (pwq == NULL) 122 errx(1, "unknown user %s", SMTPD_QUEUE_USER); 123 124 gr = getgrnam(SMTPD_QUEUE_GROUP); 125 if (gr == NULL) 126 errx(1, "unknown group %s", SMTPD_QUEUE_GROUP); 127 128 tree_init(&evpcache_tree); 129 TAILQ_INIT(&evpcache_list); 130 131 if (!strcmp(name, "fs")) 132 backend = &queue_backend_fs; 133 else if (!strcmp(name, "null")) 134 backend = &queue_backend_null; 135 else if (!strcmp(name, "ram")) 136 backend = &queue_backend_ram; 137 else 138 backend = &queue_backend_proc; 139 140 if (server) { 141 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 142 errx(1, "error in spool directory setup"); 143 if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) 144 errx(1, "error in offline directory setup"); 145 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 146 errx(1, "error in purge directory setup"); 147 148 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 149 150 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 151 errx(1, "error in purge directory setup"); 152 } 153 154 r = backend->init(pwq, server, name); 155 156 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 157 158 return (r); 159 } 160 161 int 162 queue_close(void) 163 { 164 if (handler_close) 165 return (handler_close()); 166 167 return (1); 168 } 169 170 int 171 queue_message_create(uint32_t *msgid) 172 { 173 int r; 174 175 profile_enter("queue_message_create"); 176 r = handler_message_create(msgid); 177 profile_leave(); 178 179 log_trace(TRACE_QUEUE, 180 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 181 r, *msgid); 182 183 return (r); 184 } 185 186 int 187 queue_message_delete(uint32_t msgid) 188 { 189 char msgpath[PATH_MAX]; 190 int r; 191 192 profile_enter("queue_message_delete"); 193 r = handler_message_delete(msgid); 194 profile_leave(); 195 196 /* in case the message is incoming */ 197 queue_message_path(msgid, msgpath, sizeof(msgpath)); 198 unlink(msgpath); 199 200 log_trace(TRACE_QUEUE, 201 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 202 203 return (r); 204 } 205 206 int 207 queue_message_commit(uint32_t msgid) 208 { 209 int r; 210 char msgpath[PATH_MAX]; 211 char tmppath[PATH_MAX]; 212 FILE *ifp = NULL; 213 FILE *ofp = NULL; 214 215 profile_enter("queue_message_commit"); 216 217 queue_message_path(msgid, msgpath, sizeof(msgpath)); 218 219 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 220 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 221 ifp = fopen(msgpath, "r"); 222 ofp = fopen(tmppath, "w+"); 223 if (ifp == NULL || ofp == NULL) 224 goto err; 225 if (! compress_file(ifp, ofp)) 226 goto err; 227 fclose(ifp); 228 fclose(ofp); 229 ifp = NULL; 230 ofp = NULL; 231 232 if (rename(tmppath, msgpath) == -1) { 233 if (errno == ENOSPC) 234 return (0); 235 unlink(tmppath); 236 log_warn("rename"); 237 return (0); 238 } 239 } 240 241 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 242 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 243 ifp = fopen(msgpath, "r"); 244 ofp = fopen(tmppath, "w+"); 245 if (ifp == NULL || ofp == NULL) 246 goto err; 247 if (! crypto_encrypt_file(ifp, ofp)) 248 goto err; 249 fclose(ifp); 250 fclose(ofp); 251 ifp = NULL; 252 ofp = NULL; 253 254 if (rename(tmppath, msgpath) == -1) { 255 if (errno == ENOSPC) 256 return (0); 257 unlink(tmppath); 258 log_warn("rename"); 259 return (0); 260 } 261 } 262 263 r = handler_message_commit(msgid, msgpath); 264 profile_leave(); 265 266 /* in case it's not done by the backend */ 267 unlink(msgpath); 268 269 log_trace(TRACE_QUEUE, 270 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 271 msgid, r); 272 273 return (r); 274 275 err: 276 if (ifp) 277 fclose(ifp); 278 if (ofp) 279 fclose(ofp); 280 return 0; 281 } 282 283 int 284 queue_message_corrupt(uint32_t msgid) 285 { 286 int r; 287 288 profile_enter("queue_message_corrupt"); 289 r = handler_message_corrupt(msgid); 290 profile_leave(); 291 292 log_trace(TRACE_QUEUE, 293 "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r); 294 295 return (r); 296 } 297 298 int 299 queue_message_fd_r(uint32_t msgid) 300 { 301 int fdin = -1, fdout = -1, fd = -1; 302 FILE *ifp = NULL; 303 FILE *ofp = NULL; 304 305 profile_enter("queue_message_fd_r"); 306 fdin = handler_message_fd_r(msgid); 307 profile_leave(); 308 309 log_trace(TRACE_QUEUE, 310 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 311 312 if (fdin == -1) 313 return (-1); 314 315 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 316 if ((fdout = mktmpfile()) == -1) 317 goto err; 318 if ((fd = dup(fdout)) == -1) 319 goto err; 320 if ((ifp = fdopen(fdin, "r")) == NULL) 321 goto err; 322 fdin = fd; 323 fd = -1; 324 if ((ofp = fdopen(fdout, "w+")) == NULL) 325 goto err; 326 327 if (! crypto_decrypt_file(ifp, ofp)) 328 goto err; 329 330 fclose(ifp); 331 ifp = NULL; 332 fclose(ofp); 333 ofp = NULL; 334 lseek(fdin, SEEK_SET, 0); 335 } 336 337 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 338 if ((fdout = mktmpfile()) == -1) 339 goto err; 340 if ((fd = dup(fdout)) == -1) 341 goto err; 342 if ((ifp = fdopen(fdin, "r")) == NULL) 343 goto err; 344 fdin = fd; 345 fd = -1; 346 if ((ofp = fdopen(fdout, "w+")) == NULL) 347 goto err; 348 349 if (! uncompress_file(ifp, ofp)) 350 goto err; 351 352 fclose(ifp); 353 ifp = NULL; 354 fclose(ofp); 355 ofp = NULL; 356 lseek(fdin, SEEK_SET, 0); 357 } 358 359 return (fdin); 360 361 err: 362 if (fd != -1) 363 close(fd); 364 if (fdin != -1) 365 close(fdin); 366 if (fdout != -1) 367 close(fdout); 368 if (ifp) 369 fclose(ifp); 370 if (ofp) 371 fclose(ofp); 372 return -1; 373 } 374 375 int 376 queue_message_fd_rw(uint32_t msgid) 377 { 378 char buf[PATH_MAX]; 379 380 queue_message_path(msgid, buf, sizeof(buf)); 381 382 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 383 } 384 385 static int 386 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 387 { 388 char *evp; 389 size_t evplen; 390 size_t complen; 391 char compbuf[sizeof(struct envelope)]; 392 size_t enclen; 393 char encbuf[sizeof(struct envelope)]; 394 395 evp = evpbuf; 396 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 397 if (evplen == 0) 398 return (0); 399 400 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 401 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 402 if (complen == 0) 403 return (0); 404 evp = compbuf; 405 evplen = complen; 406 } 407 408 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 409 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 410 if (enclen == 0) 411 return (0); 412 evp = encbuf; 413 evplen = enclen; 414 } 415 416 memmove(evpbuf, evp, evplen); 417 418 return (evplen); 419 } 420 421 static int 422 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 423 { 424 char *evp; 425 size_t evplen; 426 char compbuf[sizeof(struct envelope)]; 427 size_t complen; 428 char encbuf[sizeof(struct envelope)]; 429 size_t enclen; 430 431 evp = evpbuf; 432 evplen = evpbufsize; 433 434 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 435 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 436 if (enclen == 0) 437 return (0); 438 evp = encbuf; 439 evplen = enclen; 440 } 441 442 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 443 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 444 if (complen == 0) 445 return (0); 446 evp = compbuf; 447 evplen = complen; 448 } 449 450 return (envelope_load_buffer(ep, evp, evplen)); 451 } 452 453 static void 454 queue_envelope_cache_add(struct envelope *e) 455 { 456 struct envelope *cached; 457 458 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 459 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 460 461 cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); 462 *cached = *e; 463 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 464 tree_xset(&evpcache_tree, e->id, cached); 465 stat_increment("queue.evpcache.size", 1); 466 } 467 468 static void 469 queue_envelope_cache_update(struct envelope *e) 470 { 471 struct envelope *cached; 472 473 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 474 queue_envelope_cache_add(e); 475 stat_increment("queue.evpcache.update.missed", 1); 476 } else { 477 TAILQ_REMOVE(&evpcache_list, cached, entry); 478 *cached = *e; 479 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 480 stat_increment("queue.evpcache.update.hit", 1); 481 } 482 } 483 484 static void 485 queue_envelope_cache_del(uint64_t evpid) 486 { 487 struct envelope *cached; 488 489 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 490 return; 491 492 TAILQ_REMOVE(&evpcache_list, cached, entry); 493 free(cached); 494 stat_decrement("queue.evpcache.size", 1); 495 } 496 497 int 498 queue_envelope_create(struct envelope *ep) 499 { 500 int r; 501 char evpbuf[sizeof(struct envelope)]; 502 size_t evplen; 503 uint64_t evpid; 504 uint32_t msgid; 505 506 ep->creation = time(NULL); 507 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 508 if (evplen == 0) 509 return (0); 510 511 evpid = ep->id; 512 msgid = evpid_to_msgid(evpid); 513 514 profile_enter("queue_envelope_create"); 515 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 516 profile_leave(); 517 518 log_trace(TRACE_QUEUE, 519 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 520 evpid, evplen, r, ep->id); 521 522 if (!r) { 523 ep->creation = 0; 524 ep->id = 0; 525 } 526 527 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 528 queue_envelope_cache_add(ep); 529 530 return (r); 531 } 532 533 int 534 queue_envelope_delete(uint64_t evpid) 535 { 536 int r; 537 538 if (env->sc_queue_flags & QUEUE_EVPCACHE) 539 queue_envelope_cache_del(evpid); 540 541 profile_enter("queue_envelope_delete"); 542 r = handler_envelope_delete(evpid); 543 profile_leave(); 544 545 log_trace(TRACE_QUEUE, 546 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 547 evpid, r); 548 549 return (r); 550 } 551 552 int 553 queue_envelope_load(uint64_t evpid, struct envelope *ep) 554 { 555 const char *e; 556 char evpbuf[sizeof(struct envelope)]; 557 size_t evplen; 558 struct envelope *cached; 559 560 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 561 (cached = tree_get(&evpcache_tree, evpid))) { 562 *ep = *cached; 563 stat_increment("queue.evpcache.load.hit", 1); 564 return (1); 565 } 566 567 ep->id = evpid; 568 profile_enter("queue_envelope_load"); 569 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 570 profile_leave(); 571 572 log_trace(TRACE_QUEUE, 573 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 574 evpid, evplen); 575 576 if (evplen == 0) 577 return (0); 578 579 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 580 if ((e = envelope_validate(ep)) == NULL) { 581 ep->id = evpid; 582 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 583 queue_envelope_cache_add(ep); 584 stat_increment("queue.evpcache.load.missed", 1); 585 } 586 return (1); 587 } 588 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 589 ep->id, e); 590 } 591 592 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 593 return (0); 594 } 595 596 int 597 queue_envelope_update(struct envelope *ep) 598 { 599 char evpbuf[sizeof(struct envelope)]; 600 size_t evplen; 601 int r; 602 603 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 604 if (evplen == 0) 605 return (0); 606 607 profile_enter("queue_envelope_update"); 608 r = handler_envelope_update(ep->id, evpbuf, evplen); 609 profile_leave(); 610 611 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 612 queue_envelope_cache_update(ep); 613 614 log_trace(TRACE_QUEUE, 615 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 616 ep->id, r); 617 618 return (r); 619 } 620 621 int 622 queue_envelope_walk(struct envelope *ep) 623 { 624 const char *e; 625 uint64_t evpid; 626 char evpbuf[sizeof(struct envelope)]; 627 int r; 628 629 profile_enter("queue_envelope_walk"); 630 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 631 profile_leave(); 632 633 log_trace(TRACE_QUEUE, 634 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 635 r, evpid); 636 637 if (r == -1) 638 return (r); 639 640 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 641 if ((e = envelope_validate(ep)) == NULL) { 642 ep->id = evpid; 643 if (env->sc_queue_flags & QUEUE_EVPCACHE) 644 queue_envelope_cache_add(ep); 645 return (1); 646 } 647 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 648 ep->id, e); 649 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 650 } 651 652 return (0); 653 } 654 655 uint32_t 656 queue_generate_msgid(void) 657 { 658 uint32_t msgid; 659 660 while ((msgid = arc4random()) == 0) 661 ; 662 663 return msgid; 664 } 665 666 uint64_t 667 queue_generate_evpid(uint32_t msgid) 668 { 669 uint32_t rnd; 670 uint64_t evpid; 671 672 while ((rnd = arc4random()) == 0) 673 ; 674 675 evpid = msgid; 676 evpid <<= 32; 677 evpid |= rnd; 678 679 return evpid; 680 } 681 682 static const char* 683 envelope_validate(struct envelope *ep) 684 { 685 if (ep->version != SMTPD_ENVELOPE_VERSION) 686 return "version mismatch"; 687 688 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 689 return "invalid helo"; 690 if (ep->helo[0] == '\0') 691 return "empty helo"; 692 693 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 694 return "invalid hostname"; 695 if (ep->hostname[0] == '\0') 696 return "empty hostname"; 697 698 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 699 return "invalid error line"; 700 701 return NULL; 702 } 703 704 void 705 queue_api_on_close(int(*cb)(void)) 706 { 707 handler_close = cb; 708 } 709 710 void 711 queue_api_on_message_create(int(*cb)(uint32_t *)) 712 { 713 handler_message_create = cb; 714 } 715 716 void 717 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 718 { 719 handler_message_commit = cb; 720 } 721 722 void 723 queue_api_on_message_delete(int(*cb)(uint32_t)) 724 { 725 handler_message_delete = cb; 726 } 727 728 void 729 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 730 { 731 handler_message_fd_r = cb; 732 } 733 734 void 735 queue_api_on_message_corrupt(int(*cb)(uint32_t)) 736 { 737 handler_message_corrupt = cb; 738 } 739 740 void 741 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 742 { 743 handler_envelope_create = cb; 744 } 745 746 void 747 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 748 { 749 handler_envelope_delete = cb; 750 } 751 752 void 753 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 754 { 755 handler_envelope_update = cb; 756 } 757 758 void 759 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 760 { 761 handler_envelope_load = cb; 762 } 763 764 void 765 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 766 { 767 handler_envelope_walk = cb; 768 } 769