1 /* $OpenBSD: queue_backend.c,v 1.52 2014/07/08 15:45:32 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <imsg.h> 31 #include <inttypes.h> 32 #include <libgen.h> 33 #include <pwd.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <time.h> 38 #include <unistd.h> 39 40 #include "smtpd.h" 41 #include "log.h" 42 43 static const char* envelope_validate(struct envelope *); 44 45 extern struct queue_backend queue_backend_fs; 46 extern struct queue_backend queue_backend_null; 47 extern struct queue_backend queue_backend_proc; 48 extern struct queue_backend queue_backend_ram; 49 50 static void queue_envelope_cache_add(struct envelope *); 51 static void queue_envelope_cache_update(struct envelope *); 52 static void queue_envelope_cache_del(uint64_t evpid); 53 54 TAILQ_HEAD(evplst, envelope); 55 56 static struct tree evpcache_tree; 57 static struct evplst evpcache_list; 58 static struct queue_backend *backend; 59 60 static int (*handler_close)(void); 61 static int (*handler_message_create)(uint32_t *); 62 static int (*handler_message_commit)(uint32_t, const char*); 63 static int (*handler_message_delete)(uint32_t); 64 static int (*handler_message_fd_r)(uint32_t); 65 static int (*handler_message_corrupt)(uint32_t); 66 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 67 static int (*handler_envelope_delete)(uint64_t); 68 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 69 static int (*handler_envelope_load)(uint64_t, char *, size_t); 70 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 71 72 #ifdef QUEUE_PROFILING 73 74 static struct { 75 struct timespec t0; 76 const char *name; 77 } profile; 78 79 static inline void profile_enter(const char *name) 80 { 81 if ((profiling & PROFILE_QUEUE) == 0) 82 return; 83 84 profile.name = name; 85 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 86 } 87 88 static inline void profile_leave(void) 89 { 90 struct timespec t1, dt; 91 92 if ((profiling & PROFILE_QUEUE) == 0) 93 return; 94 95 clock_gettime(CLOCK_MONOTONIC, &t1); 96 timespecsub(&t1, &profile.t0, &dt); 97 log_debug("profile-queue: %s %lld.%09ld", profile.name, 98 (long long)dt.tv_sec, dt.tv_nsec); 99 } 100 #else 101 #define profile_enter(x) do {} while (0) 102 #define profile_leave() do {} while (0) 103 #endif 104 105 static int 106 queue_message_path(uint32_t msgid, char *buf, size_t len) 107 { 108 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 109 } 110 111 int 112 queue_init(const char *name, int server) 113 { 114 struct passwd *pwq; 115 int r; 116 117 pwq = getpwnam(SMTPD_QUEUE_USER); 118 if (pwq == NULL) 119 errx(1, "unknown user %s", SMTPD_QUEUE_USER); 120 121 tree_init(&evpcache_tree); 122 TAILQ_INIT(&evpcache_list); 123 124 if (!strcmp(name, "fs")) 125 backend = &queue_backend_fs; 126 else if (!strcmp(name, "null")) 127 backend = &queue_backend_null; 128 else if (!strcmp(name, "ram")) 129 backend = &queue_backend_ram; 130 else 131 backend = &queue_backend_proc; 132 133 if (server) { 134 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 135 errx(1, "error in spool directory setup"); 136 if (ckdir(PATH_SPOOL PATH_OFFLINE, 01777, 0, 0, 1) == 0) 137 errx(1, "error in offline directory setup"); 138 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 139 errx(1, "error in purge directory setup"); 140 141 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 142 143 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 144 errx(1, "error in purge directory setup"); 145 } 146 147 r = backend->init(pwq, server, name); 148 149 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 150 151 return (r); 152 } 153 154 int 155 queue_close(void) 156 { 157 if (handler_close) 158 return (handler_close()); 159 160 return (1); 161 } 162 163 int 164 queue_message_create(uint32_t *msgid) 165 { 166 int r; 167 168 profile_enter("queue_message_create"); 169 r = handler_message_create(msgid); 170 profile_leave(); 171 172 log_trace(TRACE_QUEUE, 173 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 174 r, *msgid); 175 176 return (r); 177 } 178 179 int 180 queue_message_delete(uint32_t msgid) 181 { 182 char msgpath[MAXPATHLEN]; 183 int r; 184 185 profile_enter("queue_message_delete"); 186 r = handler_message_delete(msgid); 187 profile_leave(); 188 189 /* in case the message is incoming */ 190 queue_message_path(msgid, msgpath, sizeof(msgpath)); 191 unlink(msgpath); 192 193 log_trace(TRACE_QUEUE, 194 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 195 196 return (r); 197 } 198 199 int 200 queue_message_commit(uint32_t msgid) 201 { 202 int r; 203 char msgpath[MAXPATHLEN]; 204 char tmppath[MAXPATHLEN]; 205 FILE *ifp = NULL; 206 FILE *ofp = NULL; 207 208 profile_enter("queue_message_commit"); 209 210 queue_message_path(msgid, msgpath, sizeof(msgpath)); 211 212 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 213 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 214 ifp = fopen(msgpath, "r"); 215 ofp = fopen(tmppath, "w+"); 216 if (ifp == NULL || ofp == NULL) 217 goto err; 218 if (! compress_file(ifp, ofp)) 219 goto err; 220 fclose(ifp); 221 fclose(ofp); 222 ifp = NULL; 223 ofp = NULL; 224 225 if (rename(tmppath, msgpath) == -1) { 226 if (errno == ENOSPC) 227 return (0); 228 unlink(tmppath); 229 log_warn("rename"); 230 return (0); 231 } 232 } 233 234 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 235 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 236 ifp = fopen(msgpath, "r"); 237 ofp = fopen(tmppath, "w+"); 238 if (ifp == NULL || ofp == NULL) 239 goto err; 240 if (! crypto_encrypt_file(ifp, ofp)) 241 goto err; 242 fclose(ifp); 243 fclose(ofp); 244 ifp = NULL; 245 ofp = NULL; 246 247 if (rename(tmppath, msgpath) == -1) { 248 if (errno == ENOSPC) 249 return (0); 250 unlink(tmppath); 251 log_warn("rename"); 252 return (0); 253 } 254 } 255 256 r = handler_message_commit(msgid, msgpath); 257 profile_leave(); 258 259 /* in case it's not done by the backend */ 260 unlink(msgpath); 261 262 log_trace(TRACE_QUEUE, 263 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 264 msgid, r); 265 266 return (r); 267 268 err: 269 if (ifp) 270 fclose(ifp); 271 if (ofp) 272 fclose(ofp); 273 return 0; 274 } 275 276 int 277 queue_message_corrupt(uint32_t msgid) 278 { 279 int r; 280 281 profile_enter("queue_message_corrupt"); 282 r = handler_message_corrupt(msgid); 283 profile_leave(); 284 285 log_trace(TRACE_QUEUE, 286 "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r); 287 288 return (r); 289 } 290 291 int 292 queue_message_fd_r(uint32_t msgid) 293 { 294 int fdin = -1, fdout = -1, fd = -1; 295 FILE *ifp = NULL; 296 FILE *ofp = NULL; 297 298 profile_enter("queue_message_fd_r"); 299 fdin = handler_message_fd_r(msgid); 300 profile_leave(); 301 302 log_trace(TRACE_QUEUE, 303 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 304 305 if (fdin == -1) 306 return (-1); 307 308 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 309 if ((fdout = mktmpfile()) == -1) 310 goto err; 311 if ((fd = dup(fdout)) == -1) 312 goto err; 313 if ((ifp = fdopen(fdin, "r")) == NULL) 314 goto err; 315 fdin = fd; 316 fd = -1; 317 if ((ofp = fdopen(fdout, "w+")) == NULL) 318 goto err; 319 320 if (! crypto_decrypt_file(ifp, ofp)) 321 goto err; 322 323 fclose(ifp); 324 ifp = NULL; 325 fclose(ofp); 326 ofp = NULL; 327 lseek(fdin, SEEK_SET, 0); 328 } 329 330 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 331 if ((fdout = mktmpfile()) == -1) 332 goto err; 333 if ((fd = dup(fdout)) == -1) 334 goto err; 335 if ((ifp = fdopen(fdin, "r")) == NULL) 336 goto err; 337 fdin = fd; 338 fd = -1; 339 if ((ofp = fdopen(fdout, "w+")) == NULL) 340 goto err; 341 342 if (! uncompress_file(ifp, ofp)) 343 goto err; 344 345 fclose(ifp); 346 ifp = NULL; 347 fclose(ofp); 348 ofp = NULL; 349 lseek(fdin, SEEK_SET, 0); 350 } 351 352 return (fdin); 353 354 err: 355 if (fd != -1) 356 close(fd); 357 if (fdin != -1) 358 close(fdin); 359 if (fdout != -1) 360 close(fdout); 361 if (ifp) 362 fclose(ifp); 363 if (ofp) 364 fclose(ofp); 365 return -1; 366 } 367 368 int 369 queue_message_fd_rw(uint32_t msgid) 370 { 371 char buf[SMTPD_MAXPATHLEN]; 372 373 queue_message_path(msgid, buf, sizeof(buf)); 374 375 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 376 } 377 378 static int 379 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 380 { 381 char *evp; 382 size_t evplen; 383 size_t complen; 384 char compbuf[sizeof(struct envelope)]; 385 size_t enclen; 386 char encbuf[sizeof(struct envelope)]; 387 388 evp = evpbuf; 389 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 390 if (evplen == 0) 391 return (0); 392 393 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 394 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 395 if (complen == 0) 396 return (0); 397 evp = compbuf; 398 evplen = complen; 399 } 400 401 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 402 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 403 if (enclen == 0) 404 return (0); 405 evp = encbuf; 406 evplen = enclen; 407 } 408 409 memmove(evpbuf, evp, evplen); 410 411 return (evplen); 412 } 413 414 static int 415 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 416 { 417 char *evp; 418 size_t evplen; 419 char compbuf[sizeof(struct envelope)]; 420 size_t complen; 421 char encbuf[sizeof(struct envelope)]; 422 size_t enclen; 423 424 evp = evpbuf; 425 evplen = evpbufsize; 426 427 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 428 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 429 if (enclen == 0) 430 return (0); 431 evp = encbuf; 432 evplen = enclen; 433 } 434 435 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 436 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 437 if (complen == 0) 438 return (0); 439 evp = compbuf; 440 evplen = complen; 441 } 442 443 return (envelope_load_buffer(ep, evp, evplen)); 444 } 445 446 static void 447 queue_envelope_cache_add(struct envelope *e) 448 { 449 struct envelope *cached; 450 451 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 452 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 453 454 cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); 455 *cached = *e; 456 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 457 tree_xset(&evpcache_tree, e->id, cached); 458 stat_increment("queue.evpcache.size", 1); 459 } 460 461 static void 462 queue_envelope_cache_update(struct envelope *e) 463 { 464 struct envelope *cached; 465 466 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 467 queue_envelope_cache_add(e); 468 stat_increment("queue.evpcache.update.missed", 1); 469 } else { 470 TAILQ_REMOVE(&evpcache_list, cached, entry); 471 *cached = *e; 472 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 473 stat_increment("queue.evpcache.update.hit", 1); 474 } 475 } 476 477 static void 478 queue_envelope_cache_del(uint64_t evpid) 479 { 480 struct envelope *cached; 481 482 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 483 return; 484 485 TAILQ_REMOVE(&evpcache_list, cached, entry); 486 free(cached); 487 stat_decrement("queue.evpcache.size", 1); 488 } 489 490 int 491 queue_envelope_create(struct envelope *ep) 492 { 493 int r; 494 char evpbuf[sizeof(struct envelope)]; 495 size_t evplen; 496 uint64_t evpid; 497 uint32_t msgid; 498 499 ep->creation = time(NULL); 500 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 501 if (evplen == 0) 502 return (0); 503 504 evpid = ep->id; 505 msgid = evpid_to_msgid(evpid); 506 507 profile_enter("queue_envelope_create"); 508 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 509 profile_leave(); 510 511 log_trace(TRACE_QUEUE, 512 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 513 evpid, evplen, r, ep->id); 514 515 if (!r) { 516 ep->creation = 0; 517 ep->id = 0; 518 } 519 520 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 521 queue_envelope_cache_add(ep); 522 523 return (r); 524 } 525 526 int 527 queue_envelope_delete(uint64_t evpid) 528 { 529 int r; 530 531 if (env->sc_queue_flags & QUEUE_EVPCACHE) 532 queue_envelope_cache_del(evpid); 533 534 profile_enter("queue_envelope_delete"); 535 r = handler_envelope_delete(evpid); 536 profile_leave(); 537 538 log_trace(TRACE_QUEUE, 539 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 540 evpid, r); 541 542 return (r); 543 } 544 545 int 546 queue_envelope_load(uint64_t evpid, struct envelope *ep) 547 { 548 const char *e; 549 char evpbuf[sizeof(struct envelope)]; 550 size_t evplen; 551 struct envelope *cached; 552 553 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 554 (cached = tree_get(&evpcache_tree, evpid))) { 555 *ep = *cached; 556 stat_increment("queue.evpcache.load.hit", 1); 557 return (1); 558 } 559 560 ep->id = evpid; 561 profile_enter("queue_envelope_load"); 562 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 563 profile_leave(); 564 565 log_trace(TRACE_QUEUE, 566 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 567 evpid, evplen); 568 569 if (evplen == 0) 570 return (0); 571 572 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 573 if ((e = envelope_validate(ep)) == NULL) { 574 ep->id = evpid; 575 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 576 queue_envelope_cache_add(ep); 577 stat_increment("queue.evpcache.load.missed", 1); 578 } 579 return (1); 580 } 581 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 582 ep->id, e); 583 } 584 585 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 586 return (0); 587 } 588 589 int 590 queue_envelope_update(struct envelope *ep) 591 { 592 char evpbuf[sizeof(struct envelope)]; 593 size_t evplen; 594 int r; 595 596 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 597 if (evplen == 0) 598 return (0); 599 600 profile_enter("queue_envelope_update"); 601 r = handler_envelope_update(ep->id, evpbuf, evplen); 602 profile_leave(); 603 604 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 605 queue_envelope_cache_update(ep); 606 607 log_trace(TRACE_QUEUE, 608 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 609 ep->id, r); 610 611 return (r); 612 } 613 614 int 615 queue_envelope_walk(struct envelope *ep) 616 { 617 const char *e; 618 uint64_t evpid; 619 char evpbuf[sizeof(struct envelope)]; 620 int r; 621 622 profile_enter("queue_envelope_walk"); 623 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 624 profile_leave(); 625 626 log_trace(TRACE_QUEUE, 627 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 628 r, evpid); 629 630 if (r == -1) 631 return (r); 632 633 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 634 if ((e = envelope_validate(ep)) == NULL) { 635 ep->id = evpid; 636 if (env->sc_queue_flags & QUEUE_EVPCACHE) 637 queue_envelope_cache_add(ep); 638 return (1); 639 } 640 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 641 ep->id, e); 642 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 643 } 644 645 return (0); 646 } 647 648 uint32_t 649 queue_generate_msgid(void) 650 { 651 uint32_t msgid; 652 653 while ((msgid = arc4random_uniform(0xffffffff)) == 0) 654 ; 655 656 return msgid; 657 } 658 659 uint64_t 660 queue_generate_evpid(uint32_t msgid) 661 { 662 uint32_t rnd; 663 uint64_t evpid; 664 665 while ((rnd = arc4random_uniform(0xffffffff)) == 0) 666 ; 667 668 evpid = msgid; 669 evpid <<= 32; 670 evpid |= rnd; 671 672 return evpid; 673 } 674 675 static const char* 676 envelope_validate(struct envelope *ep) 677 { 678 if (ep->version != SMTPD_ENVELOPE_VERSION) 679 return "version mismatch"; 680 681 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 682 return "invalid helo"; 683 if (ep->helo[0] == '\0') 684 return "empty helo"; 685 686 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 687 return "invalid hostname"; 688 if (ep->hostname[0] == '\0') 689 return "empty hostname"; 690 691 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 692 return "invalid error line"; 693 694 return NULL; 695 } 696 697 void 698 queue_api_on_close(int(*cb)(void)) 699 { 700 handler_close = cb; 701 } 702 703 void 704 queue_api_on_message_create(int(*cb)(uint32_t *)) 705 { 706 handler_message_create = cb; 707 } 708 709 void 710 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 711 { 712 handler_message_commit = cb; 713 } 714 715 void 716 queue_api_on_message_delete(int(*cb)(uint32_t)) 717 { 718 handler_message_delete = cb; 719 } 720 721 void 722 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 723 { 724 handler_message_fd_r = cb; 725 } 726 727 void 728 queue_api_on_message_corrupt(int(*cb)(uint32_t)) 729 { 730 handler_message_corrupt = cb; 731 } 732 733 void 734 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 735 { 736 handler_envelope_create = cb; 737 } 738 739 void 740 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 741 { 742 handler_envelope_delete = cb; 743 } 744 745 void 746 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 747 { 748 handler_envelope_update = cb; 749 } 750 751 void 752 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 753 { 754 handler_envelope_load = cb; 755 } 756 757 void 758 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 759 { 760 handler_envelope_walk = cb; 761 } 762