1 /* $OpenBSD: queue_backend.c,v 1.51 2014/07/07 09:11:24 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <imsg.h> 31 #include <inttypes.h> 32 #include <libgen.h> 33 #include <pwd.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <time.h> 38 #include <unistd.h> 39 40 #include "smtpd.h" 41 #include "log.h" 42 43 static const char* envelope_validate(struct envelope *); 44 45 extern struct queue_backend queue_backend_fs; 46 extern struct queue_backend queue_backend_null; 47 extern struct queue_backend queue_backend_proc; 48 extern struct queue_backend queue_backend_ram; 49 50 static void queue_envelope_cache_add(struct envelope *); 51 static void queue_envelope_cache_update(struct envelope *); 52 static void queue_envelope_cache_del(uint64_t evpid); 53 54 TAILQ_HEAD(evplst, envelope); 55 56 static struct tree evpcache_tree; 57 static struct evplst evpcache_list; 58 static struct queue_backend *backend; 59 60 static int (*handler_message_create)(uint32_t *); 61 static int (*handler_message_commit)(uint32_t, const char*); 62 static int (*handler_message_delete)(uint32_t); 63 static int (*handler_message_fd_r)(uint32_t); 64 static int (*handler_message_corrupt)(uint32_t); 65 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 66 static int (*handler_envelope_delete)(uint64_t); 67 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 68 static int (*handler_envelope_load)(uint64_t, char *, size_t); 69 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 70 71 #ifdef QUEUE_PROFILING 72 73 static struct { 74 struct timespec t0; 75 const char *name; 76 } profile; 77 78 static inline void profile_enter(const char *name) 79 { 80 if ((profiling & PROFILE_QUEUE) == 0) 81 return; 82 83 profile.name = name; 84 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 85 } 86 87 static inline void profile_leave(void) 88 { 89 struct timespec t1, dt; 90 91 if ((profiling & PROFILE_QUEUE) == 0) 92 return; 93 94 clock_gettime(CLOCK_MONOTONIC, &t1); 95 timespecsub(&t1, &profile.t0, &dt); 96 log_debug("profile-queue: %s %lld.%09ld", profile.name, 97 (long long)dt.tv_sec, dt.tv_nsec); 98 } 99 #else 100 #define profile_enter(x) do {} while (0) 101 #define profile_leave() do {} while (0) 102 #endif 103 104 static int 105 queue_message_path(uint32_t msgid, char *buf, size_t len) 106 { 107 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 108 } 109 110 int 111 queue_init(const char *name, int server) 112 { 113 struct passwd *pwq; 114 int r; 115 116 pwq = getpwnam(SMTPD_QUEUE_USER); 117 if (pwq == NULL) 118 errx(1, "unknown user %s", SMTPD_QUEUE_USER); 119 120 tree_init(&evpcache_tree); 121 TAILQ_INIT(&evpcache_list); 122 123 if (!strcmp(name, "fs")) 124 backend = &queue_backend_fs; 125 if (!strcmp(name, "null")) 126 backend = &queue_backend_null; 127 if (!strcmp(name, "proc")) 128 backend = &queue_backend_proc; 129 if (!strcmp(name, "ram")) 130 backend = &queue_backend_ram; 131 132 if (backend == NULL) { 133 log_warn("could not find queue backend \"%s\"", name); 134 return (0); 135 } 136 137 if (server) { 138 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 139 errx(1, "error in spool directory setup"); 140 if (ckdir(PATH_SPOOL PATH_OFFLINE, 01777, 0, 0, 1) == 0) 141 errx(1, "error in offline directory setup"); 142 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 143 errx(1, "error in purge directory setup"); 144 145 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 146 147 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 148 errx(1, "error in purge directory setup"); 149 } 150 151 r = backend->init(pwq, server); 152 153 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 154 155 return (r); 156 } 157 158 int 159 queue_message_create(uint32_t *msgid) 160 { 161 int r; 162 163 profile_enter("queue_message_create"); 164 r = handler_message_create(msgid); 165 profile_leave(); 166 167 log_trace(TRACE_QUEUE, 168 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 169 r, *msgid); 170 171 return (r); 172 } 173 174 int 175 queue_message_delete(uint32_t msgid) 176 { 177 char msgpath[MAXPATHLEN]; 178 int r; 179 180 profile_enter("queue_message_delete"); 181 r = handler_message_delete(msgid); 182 profile_leave(); 183 184 /* in case the message is incoming */ 185 queue_message_path(msgid, msgpath, sizeof(msgpath)); 186 unlink(msgpath); 187 188 log_trace(TRACE_QUEUE, 189 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 190 191 return (r); 192 } 193 194 int 195 queue_message_commit(uint32_t msgid) 196 { 197 int r; 198 char msgpath[MAXPATHLEN]; 199 char tmppath[MAXPATHLEN]; 200 FILE *ifp = NULL; 201 FILE *ofp = NULL; 202 203 profile_enter("queue_message_commit"); 204 205 queue_message_path(msgid, msgpath, sizeof(msgpath)); 206 207 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 208 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 209 ifp = fopen(msgpath, "r"); 210 ofp = fopen(tmppath, "w+"); 211 if (ifp == NULL || ofp == NULL) 212 goto err; 213 if (! compress_file(ifp, ofp)) 214 goto err; 215 fclose(ifp); 216 fclose(ofp); 217 ifp = NULL; 218 ofp = NULL; 219 220 if (rename(tmppath, msgpath) == -1) { 221 if (errno == ENOSPC) 222 return (0); 223 unlink(tmppath); 224 log_warn("rename"); 225 return (0); 226 } 227 } 228 229 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 230 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 231 ifp = fopen(msgpath, "r"); 232 ofp = fopen(tmppath, "w+"); 233 if (ifp == NULL || ofp == NULL) 234 goto err; 235 if (! crypto_encrypt_file(ifp, ofp)) 236 goto err; 237 fclose(ifp); 238 fclose(ofp); 239 ifp = NULL; 240 ofp = NULL; 241 242 if (rename(tmppath, msgpath) == -1) { 243 if (errno == ENOSPC) 244 return (0); 245 unlink(tmppath); 246 log_warn("rename"); 247 return (0); 248 } 249 } 250 251 r = handler_message_commit(msgid, msgpath); 252 profile_leave(); 253 254 /* in case it's not done by the backend */ 255 unlink(msgpath); 256 257 log_trace(TRACE_QUEUE, 258 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 259 msgid, r); 260 261 return (r); 262 263 err: 264 if (ifp) 265 fclose(ifp); 266 if (ofp) 267 fclose(ofp); 268 return 0; 269 } 270 271 int 272 queue_message_corrupt(uint32_t msgid) 273 { 274 int r; 275 276 profile_enter("queue_message_corrupt"); 277 r = handler_message_corrupt(msgid); 278 profile_leave(); 279 280 log_trace(TRACE_QUEUE, 281 "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r); 282 283 return (r); 284 } 285 286 int 287 queue_message_fd_r(uint32_t msgid) 288 { 289 int fdin = -1, fdout = -1, fd = -1; 290 FILE *ifp = NULL; 291 FILE *ofp = NULL; 292 293 profile_enter("queue_message_fd_r"); 294 fdin = handler_message_fd_r(msgid); 295 profile_leave(); 296 297 log_trace(TRACE_QUEUE, 298 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 299 300 if (fdin == -1) 301 return (-1); 302 303 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 304 if ((fdout = mktmpfile()) == -1) 305 goto err; 306 if ((fd = dup(fdout)) == -1) 307 goto err; 308 if ((ifp = fdopen(fdin, "r")) == NULL) 309 goto err; 310 fdin = fd; 311 fd = -1; 312 if ((ofp = fdopen(fdout, "w+")) == NULL) 313 goto err; 314 315 if (! crypto_decrypt_file(ifp, ofp)) 316 goto err; 317 318 fclose(ifp); 319 ifp = NULL; 320 fclose(ofp); 321 ofp = NULL; 322 lseek(fdin, SEEK_SET, 0); 323 } 324 325 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 326 if ((fdout = mktmpfile()) == -1) 327 goto err; 328 if ((fd = dup(fdout)) == -1) 329 goto err; 330 if ((ifp = fdopen(fdin, "r")) == NULL) 331 goto err; 332 fdin = fd; 333 fd = -1; 334 if ((ofp = fdopen(fdout, "w+")) == NULL) 335 goto err; 336 337 if (! uncompress_file(ifp, ofp)) 338 goto err; 339 340 fclose(ifp); 341 ifp = NULL; 342 fclose(ofp); 343 ofp = NULL; 344 lseek(fdin, SEEK_SET, 0); 345 } 346 347 return (fdin); 348 349 err: 350 if (fd != -1) 351 close(fd); 352 if (fdin != -1) 353 close(fdin); 354 if (fdout != -1) 355 close(fdout); 356 if (ifp) 357 fclose(ifp); 358 if (ofp) 359 fclose(ofp); 360 return -1; 361 } 362 363 int 364 queue_message_fd_rw(uint32_t msgid) 365 { 366 char buf[SMTPD_MAXPATHLEN]; 367 368 queue_message_path(msgid, buf, sizeof(buf)); 369 370 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 371 } 372 373 static int 374 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 375 { 376 char *evp; 377 size_t evplen; 378 size_t complen; 379 char compbuf[sizeof(struct envelope)]; 380 size_t enclen; 381 char encbuf[sizeof(struct envelope)]; 382 383 evp = evpbuf; 384 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 385 if (evplen == 0) 386 return (0); 387 388 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 389 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 390 if (complen == 0) 391 return (0); 392 evp = compbuf; 393 evplen = complen; 394 } 395 396 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 397 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 398 if (enclen == 0) 399 return (0); 400 evp = encbuf; 401 evplen = enclen; 402 } 403 404 memmove(evpbuf, evp, evplen); 405 406 return (evplen); 407 } 408 409 static int 410 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 411 { 412 char *evp; 413 size_t evplen; 414 char compbuf[sizeof(struct envelope)]; 415 size_t complen; 416 char encbuf[sizeof(struct envelope)]; 417 size_t enclen; 418 419 evp = evpbuf; 420 evplen = evpbufsize; 421 422 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 423 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 424 if (enclen == 0) 425 return (0); 426 evp = encbuf; 427 evplen = enclen; 428 } 429 430 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 431 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 432 if (complen == 0) 433 return (0); 434 evp = compbuf; 435 evplen = complen; 436 } 437 438 return (envelope_load_buffer(ep, evp, evplen)); 439 } 440 441 static void 442 queue_envelope_cache_add(struct envelope *e) 443 { 444 struct envelope *cached; 445 446 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 447 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 448 449 cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); 450 *cached = *e; 451 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 452 tree_xset(&evpcache_tree, e->id, cached); 453 stat_increment("queue.evpcache.size", 1); 454 } 455 456 static void 457 queue_envelope_cache_update(struct envelope *e) 458 { 459 struct envelope *cached; 460 461 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 462 queue_envelope_cache_add(e); 463 stat_increment("queue.evpcache.update.missed", 1); 464 } else { 465 TAILQ_REMOVE(&evpcache_list, cached, entry); 466 *cached = *e; 467 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 468 stat_increment("queue.evpcache.update.hit", 1); 469 } 470 } 471 472 static void 473 queue_envelope_cache_del(uint64_t evpid) 474 { 475 struct envelope *cached; 476 477 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 478 return; 479 480 TAILQ_REMOVE(&evpcache_list, cached, entry); 481 free(cached); 482 stat_decrement("queue.evpcache.size", 1); 483 } 484 485 int 486 queue_envelope_create(struct envelope *ep) 487 { 488 int r; 489 char evpbuf[sizeof(struct envelope)]; 490 size_t evplen; 491 uint64_t evpid; 492 uint32_t msgid; 493 494 ep->creation = time(NULL); 495 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 496 if (evplen == 0) 497 return (0); 498 499 evpid = ep->id; 500 msgid = evpid_to_msgid(evpid); 501 502 profile_enter("queue_envelope_create"); 503 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 504 profile_leave(); 505 506 log_trace(TRACE_QUEUE, 507 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 508 evpid, evplen, r, ep->id); 509 510 if (!r) { 511 ep->creation = 0; 512 ep->id = 0; 513 } 514 515 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 516 queue_envelope_cache_add(ep); 517 518 return (r); 519 } 520 521 int 522 queue_envelope_delete(uint64_t evpid) 523 { 524 int r; 525 526 if (env->sc_queue_flags & QUEUE_EVPCACHE) 527 queue_envelope_cache_del(evpid); 528 529 profile_enter("queue_envelope_delete"); 530 r = handler_envelope_delete(evpid); 531 profile_leave(); 532 533 log_trace(TRACE_QUEUE, 534 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 535 evpid, r); 536 537 return (r); 538 } 539 540 int 541 queue_envelope_load(uint64_t evpid, struct envelope *ep) 542 { 543 const char *e; 544 char evpbuf[sizeof(struct envelope)]; 545 size_t evplen; 546 struct envelope *cached; 547 548 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 549 (cached = tree_get(&evpcache_tree, evpid))) { 550 *ep = *cached; 551 stat_increment("queue.evpcache.load.hit", 1); 552 return (1); 553 } 554 555 ep->id = evpid; 556 profile_enter("queue_envelope_load"); 557 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 558 profile_leave(); 559 560 log_trace(TRACE_QUEUE, 561 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 562 evpid, evplen); 563 564 if (evplen == 0) 565 return (0); 566 567 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 568 if ((e = envelope_validate(ep)) == NULL) { 569 ep->id = evpid; 570 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 571 queue_envelope_cache_add(ep); 572 stat_increment("queue.evpcache.load.missed", 1); 573 } 574 return (1); 575 } 576 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 577 ep->id, e); 578 } 579 580 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 581 return (0); 582 } 583 584 int 585 queue_envelope_update(struct envelope *ep) 586 { 587 char evpbuf[sizeof(struct envelope)]; 588 size_t evplen; 589 int r; 590 591 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 592 if (evplen == 0) 593 return (0); 594 595 profile_enter("queue_envelope_update"); 596 r = handler_envelope_update(ep->id, evpbuf, evplen); 597 profile_leave(); 598 599 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 600 queue_envelope_cache_update(ep); 601 602 log_trace(TRACE_QUEUE, 603 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 604 ep->id, r); 605 606 return (r); 607 } 608 609 int 610 queue_envelope_walk(struct envelope *ep) 611 { 612 const char *e; 613 uint64_t evpid; 614 char evpbuf[sizeof(struct envelope)]; 615 int r; 616 617 profile_enter("queue_envelope_walk"); 618 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 619 profile_leave(); 620 621 log_trace(TRACE_QUEUE, 622 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 623 r, evpid); 624 625 if (r == -1) 626 return (r); 627 628 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 629 if ((e = envelope_validate(ep)) == NULL) { 630 ep->id = evpid; 631 if (env->sc_queue_flags & QUEUE_EVPCACHE) 632 queue_envelope_cache_add(ep); 633 return (1); 634 } 635 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 636 ep->id, e); 637 } 638 639 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 640 return (0); 641 } 642 643 uint32_t 644 queue_generate_msgid(void) 645 { 646 uint32_t msgid; 647 648 while ((msgid = arc4random_uniform(0xffffffff)) == 0) 649 ; 650 651 return msgid; 652 } 653 654 uint64_t 655 queue_generate_evpid(uint32_t msgid) 656 { 657 uint32_t rnd; 658 uint64_t evpid; 659 660 while ((rnd = arc4random_uniform(0xffffffff)) == 0) 661 ; 662 663 evpid = msgid; 664 evpid <<= 32; 665 evpid |= rnd; 666 667 return evpid; 668 } 669 670 static const char* 671 envelope_validate(struct envelope *ep) 672 { 673 if (ep->version != SMTPD_ENVELOPE_VERSION) 674 return "version mismatch"; 675 676 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 677 return "invalid helo"; 678 if (ep->helo[0] == '\0') 679 return "empty helo"; 680 681 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 682 return "invalid hostname"; 683 if (ep->hostname[0] == '\0') 684 return "empty hostname"; 685 686 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 687 return "invalid error line"; 688 689 return NULL; 690 } 691 692 void 693 queue_api_on_message_create(int(*cb)(uint32_t *)) 694 { 695 handler_message_create = cb; 696 } 697 698 void 699 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 700 { 701 handler_message_commit = cb; 702 } 703 704 void 705 queue_api_on_message_delete(int(*cb)(uint32_t)) 706 { 707 handler_message_delete = cb; 708 } 709 710 void 711 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 712 { 713 handler_message_fd_r = cb; 714 } 715 716 void 717 queue_api_on_message_corrupt(int(*cb)(uint32_t)) 718 { 719 handler_message_corrupt = cb; 720 } 721 722 void 723 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 724 { 725 handler_envelope_create = cb; 726 } 727 728 void 729 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 730 { 731 handler_envelope_delete = cb; 732 } 733 734 void 735 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 736 { 737 handler_envelope_update = cb; 738 } 739 740 void 741 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 742 { 743 handler_envelope_load = cb; 744 } 745 746 void 747 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 748 { 749 handler_envelope_walk = cb; 750 } 751