1 /* $OpenBSD: queue_backend.c,v 1.47 2013/10/26 12:27:59 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <imsg.h> 31 #include <inttypes.h> 32 #include <libgen.h> 33 #include <pwd.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <time.h> 38 #include <unistd.h> 39 40 #include "smtpd.h" 41 #include "log.h" 42 43 static const char* envelope_validate(struct envelope *); 44 45 extern struct queue_backend queue_backend_fs; 46 extern struct queue_backend queue_backend_null; 47 extern struct queue_backend queue_backend_proc; 48 extern struct queue_backend queue_backend_ram; 49 50 static void queue_envelope_cache_add(struct envelope *); 51 static void queue_envelope_cache_update(struct envelope *); 52 static void queue_envelope_cache_del(uint64_t evpid); 53 54 TAILQ_HEAD(evplst, envelope); 55 56 static struct tree evpcache_tree; 57 static struct evplst evpcache_list; 58 static struct queue_backend *backend; 59 60 static int (*handler_message_create)(uint32_t *); 61 static int (*handler_message_commit)(uint32_t, const char*); 62 static int (*handler_message_delete)(uint32_t); 63 static int (*handler_message_fd_r)(uint32_t); 64 static int (*handler_message_corrupt)(uint32_t); 65 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 66 static int (*handler_envelope_delete)(uint64_t); 67 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 68 static int (*handler_envelope_load)(uint64_t, char *, size_t); 69 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 70 71 #ifdef QUEUE_PROFILING 72 73 static struct { 74 struct timespec t0; 75 const char *name; 76 } profile; 77 78 static inline void profile_enter(const char *name) 79 { 80 if ((profiling & PROFILE_QUEUE) == 0) 81 return; 82 83 profile.name = name; 84 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 85 } 86 87 static inline void profile_leave(void) 88 { 89 struct timespec t1, dt; 90 91 if ((profiling & PROFILE_QUEUE) == 0) 92 return; 93 94 clock_gettime(CLOCK_MONOTONIC, &t1); 95 timespecsub(&t1, &profile.t0, &dt); 96 log_debug("profile-queue: %s %lld.%06ld", profile.name, 97 (long long)dt.tv_sec * 1000000 + dt.tv_nsec / 1000000, 98 dt.tv_nsec % 1000000); 99 } 100 #else 101 #define profile_enter(x) do {} while (0) 102 #define profile_leave() do {} while (0) 103 #endif 104 105 static int 106 queue_message_path(uint32_t msgid, char *buf, size_t len) 107 { 108 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 109 } 110 111 int 112 queue_init(const char *name, int server) 113 { 114 struct passwd *pwq; 115 int r; 116 117 pwq = getpwnam(SMTPD_QUEUE_USER); 118 if (pwq == NULL) 119 pwq = getpwnam(SMTPD_USER); 120 if (pwq == NULL) 121 errx(1, "unknown user %s", SMTPD_USER); 122 123 tree_init(&evpcache_tree); 124 TAILQ_INIT(&evpcache_list); 125 126 if (!strcmp(name, "fs")) 127 backend = &queue_backend_fs; 128 if (!strcmp(name, "null")) 129 backend = &queue_backend_null; 130 if (!strcmp(name, "proc")) 131 backend = &queue_backend_proc; 132 if (!strcmp(name, "ram")) 133 backend = &queue_backend_ram; 134 135 if (backend == NULL) { 136 log_warn("could not find queue backend \"%s\"", name); 137 return (0); 138 } 139 140 if (server) { 141 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 142 errx(1, "error in spool directory setup"); 143 if (ckdir(PATH_SPOOL PATH_OFFLINE, 01777, 0, 0, 1) == 0) 144 errx(1, "error in offline directory setup"); 145 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 146 errx(1, "error in purge directory setup"); 147 148 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 149 150 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 151 errx(1, "error in purge directory setup"); 152 } 153 154 r = backend->init(pwq, server); 155 156 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 157 158 return (r); 159 } 160 161 int 162 queue_message_create(uint32_t *msgid) 163 { 164 int r; 165 166 profile_enter("queue_message_create"); 167 r = handler_message_create(msgid); 168 profile_leave(); 169 170 log_trace(TRACE_QUEUE, 171 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 172 r, *msgid); 173 174 return (r); 175 } 176 177 int 178 queue_message_delete(uint32_t msgid) 179 { 180 char msgpath[MAXPATHLEN]; 181 int r; 182 183 profile_enter("queue_message_delete"); 184 r = handler_message_delete(msgid); 185 profile_leave(); 186 187 /* in case the message is incoming */ 188 queue_message_path(msgid, msgpath, sizeof(msgpath)); 189 unlink(msgpath); 190 191 log_trace(TRACE_QUEUE, 192 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 193 194 return (r); 195 } 196 197 int 198 queue_message_commit(uint32_t msgid) 199 { 200 int r; 201 char msgpath[MAXPATHLEN]; 202 char tmppath[MAXPATHLEN]; 203 FILE *ifp = NULL; 204 FILE *ofp = NULL; 205 206 profile_enter("queue_message_commit"); 207 208 queue_message_path(msgid, msgpath, sizeof(msgpath)); 209 210 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 211 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 212 ifp = fopen(msgpath, "r"); 213 ofp = fopen(tmppath, "w+"); 214 if (ifp == NULL || ofp == NULL) 215 goto err; 216 if (! compress_file(ifp, ofp)) 217 goto err; 218 fclose(ifp); 219 fclose(ofp); 220 ifp = NULL; 221 ofp = NULL; 222 223 if (rename(tmppath, msgpath) == -1) { 224 if (errno == ENOSPC) 225 return (0); 226 unlink(tmppath); 227 log_warn("rename"); 228 return (0); 229 } 230 } 231 232 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 233 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 234 ifp = fopen(msgpath, "r"); 235 ofp = fopen(tmppath, "w+"); 236 if (ifp == NULL || ofp == NULL) 237 goto err; 238 if (! crypto_encrypt_file(ifp, ofp)) 239 goto err; 240 fclose(ifp); 241 fclose(ofp); 242 ifp = NULL; 243 ofp = NULL; 244 245 if (rename(tmppath, msgpath) == -1) { 246 if (errno == ENOSPC) 247 return (0); 248 unlink(tmppath); 249 log_warn("rename"); 250 return (0); 251 } 252 } 253 254 r = handler_message_commit(msgid, msgpath); 255 profile_leave(); 256 257 /* in case it's not done by the backend */ 258 unlink(msgpath); 259 260 log_trace(TRACE_QUEUE, 261 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 262 msgid, r); 263 264 return (r); 265 266 err: 267 if (ifp) 268 fclose(ifp); 269 if (ofp) 270 fclose(ofp); 271 return 0; 272 } 273 274 int 275 queue_message_corrupt(uint32_t msgid) 276 { 277 int r; 278 279 profile_enter("queue_message_corrupt"); 280 r = handler_message_corrupt(msgid); 281 profile_leave(); 282 283 log_trace(TRACE_QUEUE, 284 "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r); 285 286 return (r); 287 } 288 289 int 290 queue_message_fd_r(uint32_t msgid) 291 { 292 int fdin = -1, fdout = -1, fd = -1; 293 FILE *ifp = NULL; 294 FILE *ofp = NULL; 295 296 profile_enter("queue_message_fd_r"); 297 fdin = handler_message_fd_r(msgid); 298 profile_leave(); 299 300 log_trace(TRACE_QUEUE, 301 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 302 303 if (fdin == -1) 304 return (-1); 305 306 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 307 if ((fdout = mktmpfile()) == -1) 308 goto err; 309 if ((fd = dup(fdout)) == -1) 310 goto err; 311 if ((ifp = fdopen(fdin, "r")) == NULL) 312 goto err; 313 fdin = fd; 314 fd = -1; 315 if ((ofp = fdopen(fdout, "w+")) == NULL) 316 goto err; 317 318 if (! crypto_decrypt_file(ifp, ofp)) 319 goto err; 320 321 fclose(ifp); 322 fclose(ofp); 323 lseek(fdin, SEEK_SET, 0); 324 } 325 326 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 327 if ((fdout = mktmpfile()) == -1) 328 goto err; 329 if ((fd = dup(fdout)) == -1) 330 goto err; 331 if ((ifp = fdopen(fdin, "r")) == NULL) 332 goto err; 333 fdin = fd; 334 fd = -1; 335 if ((ofp = fdopen(fdout, "w+")) == NULL) 336 goto err; 337 338 if (! uncompress_file(ifp, ofp)) 339 goto err; 340 341 fclose(ifp); 342 fclose(ofp); 343 lseek(fdin, SEEK_SET, 0); 344 } 345 346 return (fdin); 347 348 err: 349 if (fd != -1) 350 close(fd); 351 if (fdin != -1) 352 close(fdin); 353 if (fdout != -1) 354 close(fdout); 355 if (ifp) 356 fclose(ifp); 357 if (ofp) 358 fclose(ofp); 359 return -1; 360 } 361 362 int 363 queue_message_fd_rw(uint32_t msgid) 364 { 365 char buf[SMTPD_MAXPATHLEN]; 366 367 queue_message_path(msgid, buf, sizeof(buf)); 368 369 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 370 } 371 372 static int 373 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 374 { 375 char *evp; 376 size_t evplen; 377 size_t complen; 378 char compbuf[sizeof(struct envelope)]; 379 size_t enclen; 380 char encbuf[sizeof(struct envelope)]; 381 382 evp = evpbuf; 383 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 384 if (evplen == 0) 385 return (0); 386 387 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 388 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 389 if (complen == 0) 390 return (0); 391 evp = compbuf; 392 evplen = complen; 393 } 394 395 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 396 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 397 if (enclen == 0) 398 return (0); 399 evp = encbuf; 400 evplen = enclen; 401 } 402 403 memmove(evpbuf, evp, evplen); 404 405 return (evplen); 406 } 407 408 static int 409 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 410 { 411 char *evp; 412 size_t evplen; 413 char compbuf[sizeof(struct envelope)]; 414 size_t complen; 415 char encbuf[sizeof(struct envelope)]; 416 size_t enclen; 417 418 evp = evpbuf; 419 evplen = evpbufsize; 420 421 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 422 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 423 if (enclen == 0) 424 return (0); 425 evp = encbuf; 426 evplen = enclen; 427 } 428 429 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 430 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 431 if (complen == 0) 432 return (0); 433 evp = compbuf; 434 evplen = complen; 435 } 436 437 return (envelope_load_buffer(ep, evp, evplen)); 438 } 439 440 static void 441 queue_envelope_cache_add(struct envelope *e) 442 { 443 struct envelope *cached; 444 445 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 446 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 447 448 cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); 449 *cached = *e; 450 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 451 tree_xset(&evpcache_tree, e->id, cached); 452 stat_increment("queue.evpcache.size", 1); 453 } 454 455 static void 456 queue_envelope_cache_update(struct envelope *e) 457 { 458 struct envelope *cached; 459 460 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 461 queue_envelope_cache_add(e); 462 stat_increment("queue.evpcache.update.missed", 1); 463 } else { 464 TAILQ_REMOVE(&evpcache_list, cached, entry); 465 *cached = *e; 466 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 467 stat_increment("queue.evpcache.update.hit", 1); 468 } 469 } 470 471 static void 472 queue_envelope_cache_del(uint64_t evpid) 473 { 474 struct envelope *cached; 475 476 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 477 return; 478 479 TAILQ_REMOVE(&evpcache_list, cached, entry); 480 free(cached); 481 stat_decrement("queue.evpcache.size", 1); 482 } 483 484 int 485 queue_envelope_create(struct envelope *ep) 486 { 487 int r; 488 char evpbuf[sizeof(struct envelope)]; 489 size_t evplen; 490 uint64_t evpid; 491 uint32_t msgid; 492 493 ep->creation = time(NULL); 494 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 495 if (evplen == 0) 496 return (0); 497 498 evpid = ep->id; 499 msgid = evpid_to_msgid(evpid); 500 501 profile_enter("queue_envelope_create"); 502 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 503 profile_leave(); 504 505 log_trace(TRACE_QUEUE, 506 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 507 evpid, evplen, r, ep->id); 508 509 if (!r) { 510 ep->creation = 0; 511 ep->id = 0; 512 } 513 514 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 515 queue_envelope_cache_add(ep); 516 517 return (r); 518 } 519 520 int 521 queue_envelope_delete(uint64_t evpid) 522 { 523 int r; 524 525 if (env->sc_queue_flags & QUEUE_EVPCACHE) 526 queue_envelope_cache_del(evpid); 527 528 profile_enter("queue_envelope_delete"); 529 r = handler_envelope_delete(evpid); 530 profile_leave(); 531 532 log_trace(TRACE_QUEUE, 533 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 534 evpid, r); 535 536 return (r); 537 } 538 539 int 540 queue_envelope_load(uint64_t evpid, struct envelope *ep) 541 { 542 const char *e; 543 char evpbuf[sizeof(struct envelope)]; 544 size_t evplen; 545 struct envelope *cached; 546 547 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 548 (cached = tree_get(&evpcache_tree, evpid))) { 549 *ep = *cached; 550 stat_increment("queue.evpcache.load.hit", 1); 551 return (1); 552 } 553 554 ep->id = evpid; 555 profile_enter("queue_envelope_load"); 556 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 557 profile_leave(); 558 559 log_trace(TRACE_QUEUE, 560 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 561 evpid, evplen); 562 563 if (evplen == 0) 564 return (0); 565 566 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 567 if ((e = envelope_validate(ep)) == NULL) { 568 ep->id = evpid; 569 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 570 queue_envelope_cache_add(ep); 571 stat_increment("queue.evpcache.load.missed", 1); 572 } 573 return (1); 574 } 575 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 576 ep->id, e); 577 } 578 579 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 580 return (0); 581 } 582 583 int 584 queue_envelope_update(struct envelope *ep) 585 { 586 char evpbuf[sizeof(struct envelope)]; 587 size_t evplen; 588 int r; 589 590 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 591 if (evplen == 0) 592 return (0); 593 594 profile_enter("queue_envelope_update"); 595 r = handler_envelope_update(ep->id, evpbuf, evplen); 596 profile_leave(); 597 598 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 599 queue_envelope_cache_update(ep); 600 601 log_trace(TRACE_QUEUE, 602 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 603 ep->id, r); 604 605 return (r); 606 } 607 608 int 609 queue_envelope_walk(struct envelope *ep) 610 { 611 const char *e; 612 uint64_t evpid; 613 char evpbuf[sizeof(struct envelope)]; 614 int r; 615 616 profile_enter("queue_envelope_walk"); 617 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 618 profile_leave(); 619 620 log_trace(TRACE_QUEUE, 621 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 622 r, evpid); 623 624 if (r == -1) 625 return (r); 626 627 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 628 if ((e = envelope_validate(ep)) == NULL) { 629 ep->id = evpid; 630 if (env->sc_queue_flags & QUEUE_EVPCACHE) 631 queue_envelope_cache_add(ep); 632 return (1); 633 } 634 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 635 ep->id, e); 636 } 637 638 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 639 return (0); 640 } 641 642 uint32_t 643 queue_generate_msgid(void) 644 { 645 uint32_t msgid; 646 647 while ((msgid = arc4random_uniform(0xffffffff)) == 0) 648 ; 649 650 return msgid; 651 } 652 653 uint64_t 654 queue_generate_evpid(uint32_t msgid) 655 { 656 uint32_t rnd; 657 uint64_t evpid; 658 659 while ((rnd = arc4random_uniform(0xffffffff)) == 0) 660 ; 661 662 evpid = msgid; 663 evpid <<= 32; 664 evpid |= rnd; 665 666 return evpid; 667 } 668 669 static const char* 670 envelope_validate(struct envelope *ep) 671 { 672 if (ep->version != SMTPD_ENVELOPE_VERSION) 673 return "version mismatch"; 674 675 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 676 return "invalid helo"; 677 if (ep->helo[0] == '\0') 678 return "empty helo"; 679 680 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 681 return "invalid hostname"; 682 if (ep->hostname[0] == '\0') 683 return "empty hostname"; 684 685 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 686 return "invalid error line"; 687 688 return NULL; 689 } 690 691 void 692 queue_api_on_message_create(int(*cb)(uint32_t *)) 693 { 694 handler_message_create = cb; 695 } 696 697 void 698 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 699 { 700 handler_message_commit = cb; 701 } 702 703 void 704 queue_api_on_message_delete(int(*cb)(uint32_t)) 705 { 706 handler_message_delete = cb; 707 } 708 709 void 710 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 711 { 712 handler_message_fd_r = cb; 713 } 714 715 void 716 queue_api_on_message_corrupt(int(*cb)(uint32_t)) 717 { 718 handler_message_corrupt = cb; 719 } 720 721 void 722 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 723 { 724 handler_envelope_create = cb; 725 } 726 727 void 728 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 729 { 730 handler_envelope_delete = cb; 731 } 732 733 void 734 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 735 { 736 handler_envelope_update = cb; 737 } 738 739 void 740 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 741 { 742 handler_envelope_load = cb; 743 } 744 745 void 746 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 747 { 748 handler_envelope_walk = cb; 749 } 750