1 /* $OpenBSD: queue_backend.c,v 1.57 2015/10/29 10:25:36 sunil Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <grp.h> 31 #include <imsg.h> 32 #include <limits.h> 33 #include <inttypes.h> 34 #include <libgen.h> 35 #include <pwd.h> 36 #include <stdio.h> 37 #include <stdlib.h> 38 #include <string.h> 39 #include <time.h> 40 #include <unistd.h> 41 42 #include "smtpd.h" 43 #include "log.h" 44 45 static const char* envelope_validate(struct envelope *); 46 47 extern struct queue_backend queue_backend_fs; 48 extern struct queue_backend queue_backend_null; 49 extern struct queue_backend queue_backend_proc; 50 extern struct queue_backend queue_backend_ram; 51 52 static void queue_envelope_cache_add(struct envelope *); 53 static void queue_envelope_cache_update(struct envelope *); 54 static void queue_envelope_cache_del(uint64_t evpid); 55 56 TAILQ_HEAD(evplst, envelope); 57 58 static struct tree evpcache_tree; 59 static struct evplst evpcache_list; 60 static struct queue_backend *backend; 61 62 static int (*handler_close)(void); 63 static int (*handler_message_create)(uint32_t *); 64 static int (*handler_message_commit)(uint32_t, const char*); 65 static int (*handler_message_delete)(uint32_t); 66 static int (*handler_message_fd_r)(uint32_t); 67 static int (*handler_message_corrupt)(uint32_t); 68 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 69 static int (*handler_envelope_delete)(uint64_t); 70 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 71 static int (*handler_envelope_load)(uint64_t, char *, size_t); 72 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 73 static int (*handler_message_walk)(uint64_t *, char *, size_t, 74 uint32_t, int *, void **); 75 76 #ifdef QUEUE_PROFILING 77 78 static struct { 79 struct timespec t0; 80 const char *name; 81 } profile; 82 83 static inline void profile_enter(const char *name) 84 { 85 if ((profiling & PROFILE_QUEUE) == 0) 86 return; 87 88 profile.name = name; 89 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 90 } 91 92 static inline void profile_leave(void) 93 { 94 struct timespec t1, dt; 95 96 if ((profiling & PROFILE_QUEUE) == 0) 97 return; 98 99 clock_gettime(CLOCK_MONOTONIC, &t1); 100 timespecsub(&t1, &profile.t0, &dt); 101 log_debug("profile-queue: %s %lld.%09ld", profile.name, 102 (long long)dt.tv_sec, dt.tv_nsec); 103 } 104 #else 105 #define profile_enter(x) do {} while (0) 106 #define profile_leave() do {} while (0) 107 #endif 108 109 static int 110 queue_message_path(uint32_t msgid, char *buf, size_t len) 111 { 112 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 113 } 114 115 int 116 queue_init(const char *name, int server) 117 { 118 struct passwd *pwq; 119 struct group *gr; 120 int r; 121 122 pwq = getpwnam(SMTPD_QUEUE_USER); 123 if (pwq == NULL) 124 errx(1, "unknown user %s", SMTPD_QUEUE_USER); 125 126 gr = getgrnam(SMTPD_QUEUE_GROUP); 127 if (gr == NULL) 128 errx(1, "unknown group %s", SMTPD_QUEUE_GROUP); 129 130 tree_init(&evpcache_tree); 131 TAILQ_INIT(&evpcache_list); 132 133 if (!strcmp(name, "fs")) 134 backend = &queue_backend_fs; 135 else if (!strcmp(name, "null")) 136 backend = &queue_backend_null; 137 else if (!strcmp(name, "ram")) 138 backend = &queue_backend_ram; 139 else 140 backend = &queue_backend_proc; 141 142 if (server) { 143 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 144 errx(1, "error in spool directory setup"); 145 if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) 146 errx(1, "error in offline directory setup"); 147 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 148 errx(1, "error in purge directory setup"); 149 150 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 151 152 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 153 errx(1, "error in purge directory setup"); 154 } 155 156 r = backend->init(pwq, server, name); 157 158 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 159 160 return (r); 161 } 162 163 int 164 queue_close(void) 165 { 166 if (handler_close) 167 return (handler_close()); 168 169 return (1); 170 } 171 172 int 173 queue_message_create(uint32_t *msgid) 174 { 175 int r; 176 177 profile_enter("queue_message_create"); 178 r = handler_message_create(msgid); 179 profile_leave(); 180 181 log_trace(TRACE_QUEUE, 182 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 183 r, *msgid); 184 185 return (r); 186 } 187 188 int 189 queue_message_delete(uint32_t msgid) 190 { 191 char msgpath[PATH_MAX]; 192 int r; 193 194 profile_enter("queue_message_delete"); 195 r = handler_message_delete(msgid); 196 profile_leave(); 197 198 /* in case the message is incoming */ 199 queue_message_path(msgid, msgpath, sizeof(msgpath)); 200 unlink(msgpath); 201 202 log_trace(TRACE_QUEUE, 203 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 204 205 return (r); 206 } 207 208 int 209 queue_message_commit(uint32_t msgid) 210 { 211 int r; 212 char msgpath[PATH_MAX]; 213 char tmppath[PATH_MAX]; 214 FILE *ifp = NULL; 215 FILE *ofp = NULL; 216 217 profile_enter("queue_message_commit"); 218 219 queue_message_path(msgid, msgpath, sizeof(msgpath)); 220 221 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 222 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 223 ifp = fopen(msgpath, "r"); 224 ofp = fopen(tmppath, "w+"); 225 if (ifp == NULL || ofp == NULL) 226 goto err; 227 if (! compress_file(ifp, ofp)) 228 goto err; 229 fclose(ifp); 230 fclose(ofp); 231 ifp = NULL; 232 ofp = NULL; 233 234 if (rename(tmppath, msgpath) == -1) { 235 if (errno == ENOSPC) 236 return (0); 237 unlink(tmppath); 238 log_warn("rename"); 239 return (0); 240 } 241 } 242 243 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 244 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 245 ifp = fopen(msgpath, "r"); 246 ofp = fopen(tmppath, "w+"); 247 if (ifp == NULL || ofp == NULL) 248 goto err; 249 if (! crypto_encrypt_file(ifp, ofp)) 250 goto err; 251 fclose(ifp); 252 fclose(ofp); 253 ifp = NULL; 254 ofp = NULL; 255 256 if (rename(tmppath, msgpath) == -1) { 257 if (errno == ENOSPC) 258 return (0); 259 unlink(tmppath); 260 log_warn("rename"); 261 return (0); 262 } 263 } 264 265 r = handler_message_commit(msgid, msgpath); 266 profile_leave(); 267 268 /* in case it's not done by the backend */ 269 unlink(msgpath); 270 271 log_trace(TRACE_QUEUE, 272 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 273 msgid, r); 274 275 return (r); 276 277 err: 278 if (ifp) 279 fclose(ifp); 280 if (ofp) 281 fclose(ofp); 282 return 0; 283 } 284 285 int 286 queue_message_corrupt(uint32_t msgid) 287 { 288 int r; 289 290 profile_enter("queue_message_corrupt"); 291 r = handler_message_corrupt(msgid); 292 profile_leave(); 293 294 log_trace(TRACE_QUEUE, 295 "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r); 296 297 return (r); 298 } 299 300 int 301 queue_message_fd_r(uint32_t msgid) 302 { 303 int fdin = -1, fdout = -1, fd = -1; 304 FILE *ifp = NULL; 305 FILE *ofp = NULL; 306 307 profile_enter("queue_message_fd_r"); 308 fdin = handler_message_fd_r(msgid); 309 profile_leave(); 310 311 log_trace(TRACE_QUEUE, 312 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 313 314 if (fdin == -1) 315 return (-1); 316 317 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 318 if ((fdout = mktmpfile()) == -1) 319 goto err; 320 if ((fd = dup(fdout)) == -1) 321 goto err; 322 if ((ifp = fdopen(fdin, "r")) == NULL) 323 goto err; 324 fdin = fd; 325 fd = -1; 326 if ((ofp = fdopen(fdout, "w+")) == NULL) 327 goto err; 328 329 if (! crypto_decrypt_file(ifp, ofp)) 330 goto err; 331 332 fclose(ifp); 333 ifp = NULL; 334 fclose(ofp); 335 ofp = NULL; 336 lseek(fdin, SEEK_SET, 0); 337 } 338 339 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 340 if ((fdout = mktmpfile()) == -1) 341 goto err; 342 if ((fd = dup(fdout)) == -1) 343 goto err; 344 if ((ifp = fdopen(fdin, "r")) == NULL) 345 goto err; 346 fdin = fd; 347 fd = -1; 348 if ((ofp = fdopen(fdout, "w+")) == NULL) 349 goto err; 350 351 if (! uncompress_file(ifp, ofp)) 352 goto err; 353 354 fclose(ifp); 355 ifp = NULL; 356 fclose(ofp); 357 ofp = NULL; 358 lseek(fdin, SEEK_SET, 0); 359 } 360 361 return (fdin); 362 363 err: 364 if (fd != -1) 365 close(fd); 366 if (fdin != -1) 367 close(fdin); 368 if (fdout != -1) 369 close(fdout); 370 if (ifp) 371 fclose(ifp); 372 if (ofp) 373 fclose(ofp); 374 return -1; 375 } 376 377 int 378 queue_message_fd_rw(uint32_t msgid) 379 { 380 char buf[PATH_MAX]; 381 382 queue_message_path(msgid, buf, sizeof(buf)); 383 384 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 385 } 386 387 static int 388 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 389 { 390 char *evp; 391 size_t evplen; 392 size_t complen; 393 char compbuf[sizeof(struct envelope)]; 394 size_t enclen; 395 char encbuf[sizeof(struct envelope)]; 396 397 evp = evpbuf; 398 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 399 if (evplen == 0) 400 return (0); 401 402 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 403 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 404 if (complen == 0) 405 return (0); 406 evp = compbuf; 407 evplen = complen; 408 } 409 410 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 411 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 412 if (enclen == 0) 413 return (0); 414 evp = encbuf; 415 evplen = enclen; 416 } 417 418 memmove(evpbuf, evp, evplen); 419 420 return (evplen); 421 } 422 423 static int 424 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 425 { 426 char *evp; 427 size_t evplen; 428 char compbuf[sizeof(struct envelope)]; 429 size_t complen; 430 char encbuf[sizeof(struct envelope)]; 431 size_t enclen; 432 433 evp = evpbuf; 434 evplen = evpbufsize; 435 436 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 437 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 438 if (enclen == 0) 439 return (0); 440 evp = encbuf; 441 evplen = enclen; 442 } 443 444 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 445 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 446 if (complen == 0) 447 return (0); 448 evp = compbuf; 449 evplen = complen; 450 } 451 452 return (envelope_load_buffer(ep, evp, evplen)); 453 } 454 455 static void 456 queue_envelope_cache_add(struct envelope *e) 457 { 458 struct envelope *cached; 459 460 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 461 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 462 463 cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); 464 *cached = *e; 465 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 466 tree_xset(&evpcache_tree, e->id, cached); 467 stat_increment("queue.evpcache.size", 1); 468 } 469 470 static void 471 queue_envelope_cache_update(struct envelope *e) 472 { 473 struct envelope *cached; 474 475 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 476 queue_envelope_cache_add(e); 477 stat_increment("queue.evpcache.update.missed", 1); 478 } else { 479 TAILQ_REMOVE(&evpcache_list, cached, entry); 480 *cached = *e; 481 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 482 stat_increment("queue.evpcache.update.hit", 1); 483 } 484 } 485 486 static void 487 queue_envelope_cache_del(uint64_t evpid) 488 { 489 struct envelope *cached; 490 491 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 492 return; 493 494 TAILQ_REMOVE(&evpcache_list, cached, entry); 495 free(cached); 496 stat_decrement("queue.evpcache.size", 1); 497 } 498 499 int 500 queue_envelope_create(struct envelope *ep) 501 { 502 int r; 503 char evpbuf[sizeof(struct envelope)]; 504 size_t evplen; 505 uint64_t evpid; 506 uint32_t msgid; 507 508 ep->creation = time(NULL); 509 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 510 if (evplen == 0) 511 return (0); 512 513 evpid = ep->id; 514 msgid = evpid_to_msgid(evpid); 515 516 profile_enter("queue_envelope_create"); 517 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 518 profile_leave(); 519 520 log_trace(TRACE_QUEUE, 521 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 522 evpid, evplen, r, ep->id); 523 524 if (!r) { 525 ep->creation = 0; 526 ep->id = 0; 527 } 528 529 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 530 queue_envelope_cache_add(ep); 531 532 return (r); 533 } 534 535 int 536 queue_envelope_delete(uint64_t evpid) 537 { 538 int r; 539 540 if (env->sc_queue_flags & QUEUE_EVPCACHE) 541 queue_envelope_cache_del(evpid); 542 543 profile_enter("queue_envelope_delete"); 544 r = handler_envelope_delete(evpid); 545 profile_leave(); 546 547 log_trace(TRACE_QUEUE, 548 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 549 evpid, r); 550 551 return (r); 552 } 553 554 int 555 queue_envelope_load(uint64_t evpid, struct envelope *ep) 556 { 557 const char *e; 558 char evpbuf[sizeof(struct envelope)]; 559 size_t evplen; 560 struct envelope *cached; 561 562 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 563 (cached = tree_get(&evpcache_tree, evpid))) { 564 *ep = *cached; 565 stat_increment("queue.evpcache.load.hit", 1); 566 return (1); 567 } 568 569 ep->id = evpid; 570 profile_enter("queue_envelope_load"); 571 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 572 profile_leave(); 573 574 log_trace(TRACE_QUEUE, 575 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 576 evpid, evplen); 577 578 if (evplen == 0) 579 return (0); 580 581 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 582 if ((e = envelope_validate(ep)) == NULL) { 583 ep->id = evpid; 584 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 585 queue_envelope_cache_add(ep); 586 stat_increment("queue.evpcache.load.missed", 1); 587 } 588 return (1); 589 } 590 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 591 ep->id, e); 592 } 593 594 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 595 return (0); 596 } 597 598 int 599 queue_envelope_update(struct envelope *ep) 600 { 601 char evpbuf[sizeof(struct envelope)]; 602 size_t evplen; 603 int r; 604 605 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 606 if (evplen == 0) 607 return (0); 608 609 profile_enter("queue_envelope_update"); 610 r = handler_envelope_update(ep->id, evpbuf, evplen); 611 profile_leave(); 612 613 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 614 queue_envelope_cache_update(ep); 615 616 log_trace(TRACE_QUEUE, 617 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 618 ep->id, r); 619 620 return (r); 621 } 622 623 int 624 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data) 625 { 626 char evpbuf[sizeof(struct envelope)]; 627 uint64_t evpid; 628 int r; 629 const char *e; 630 631 profile_enter("queue_message_walk"); 632 r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf, 633 msgid, done, data); 634 profile_leave(); 635 636 log_trace(TRACE_QUEUE, 637 "queue-backend: queue_message_walk() -> %d (%016"PRIx64")", 638 r, evpid); 639 640 if (r == -1) 641 return (r); 642 643 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 644 if ((e = envelope_validate(ep)) == NULL) { 645 ep->id = evpid; 646 /* 647 * do not cache the envelope here, while discovering 648 * envelopes one could re-run discover on already 649 * scheduled envelopes which leads to triggering of 650 * strict checks in caching. Envelopes could anyway 651 * be loaded from backend if it isn't cached. 652 */ 653 return (1); 654 } 655 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 656 ep->id, e); 657 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 658 } 659 660 return (-1); 661 } 662 663 int 664 queue_envelope_walk(struct envelope *ep) 665 { 666 const char *e; 667 uint64_t evpid; 668 char evpbuf[sizeof(struct envelope)]; 669 int r; 670 671 profile_enter("queue_envelope_walk"); 672 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 673 profile_leave(); 674 675 log_trace(TRACE_QUEUE, 676 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 677 r, evpid); 678 679 if (r == -1) 680 return (r); 681 682 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 683 if ((e = envelope_validate(ep)) == NULL) { 684 ep->id = evpid; 685 if (env->sc_queue_flags & QUEUE_EVPCACHE) 686 queue_envelope_cache_add(ep); 687 return (1); 688 } 689 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 690 ep->id, e); 691 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 692 } 693 694 return (0); 695 } 696 697 uint32_t 698 queue_generate_msgid(void) 699 { 700 uint32_t msgid; 701 702 while ((msgid = arc4random()) == 0) 703 ; 704 705 return msgid; 706 } 707 708 uint64_t 709 queue_generate_evpid(uint32_t msgid) 710 { 711 uint32_t rnd; 712 uint64_t evpid; 713 714 while ((rnd = arc4random()) == 0) 715 ; 716 717 evpid = msgid; 718 evpid <<= 32; 719 evpid |= rnd; 720 721 return evpid; 722 } 723 724 static const char* 725 envelope_validate(struct envelope *ep) 726 { 727 if (ep->version != SMTPD_ENVELOPE_VERSION) 728 return "version mismatch"; 729 730 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 731 return "invalid helo"; 732 if (ep->helo[0] == '\0') 733 return "empty helo"; 734 735 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 736 return "invalid hostname"; 737 if (ep->hostname[0] == '\0') 738 return "empty hostname"; 739 740 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 741 return "invalid error line"; 742 743 return NULL; 744 } 745 746 void 747 queue_api_on_close(int(*cb)(void)) 748 { 749 handler_close = cb; 750 } 751 752 void 753 queue_api_on_message_create(int(*cb)(uint32_t *)) 754 { 755 handler_message_create = cb; 756 } 757 758 void 759 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 760 { 761 handler_message_commit = cb; 762 } 763 764 void 765 queue_api_on_message_delete(int(*cb)(uint32_t)) 766 { 767 handler_message_delete = cb; 768 } 769 770 void 771 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 772 { 773 handler_message_fd_r = cb; 774 } 775 776 void 777 queue_api_on_message_corrupt(int(*cb)(uint32_t)) 778 { 779 handler_message_corrupt = cb; 780 } 781 782 void 783 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 784 { 785 handler_envelope_create = cb; 786 } 787 788 void 789 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 790 { 791 handler_envelope_delete = cb; 792 } 793 794 void 795 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 796 { 797 handler_envelope_update = cb; 798 } 799 800 void 801 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 802 { 803 handler_envelope_load = cb; 804 } 805 806 void 807 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 808 { 809 handler_envelope_walk = cb; 810 } 811 812 void 813 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t, 814 uint32_t, int *, void **)) 815 { 816 handler_message_walk = cb; 817 } 818