1 /* $OpenBSD: queue_backend.c,v 1.67 2021/05/26 18:08:55 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <errno.h> 27 #include <event.h> 28 #include <fcntl.h> 29 #include <grp.h> 30 #include <imsg.h> 31 #include <limits.h> 32 #include <inttypes.h> 33 #include <pwd.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <time.h> 38 #include <unistd.h> 39 40 #include "smtpd.h" 41 #include "log.h" 42 43 static const char* envelope_validate(struct envelope *); 44 45 extern struct queue_backend queue_backend_fs; 46 extern struct queue_backend queue_backend_null; 47 extern struct queue_backend queue_backend_proc; 48 extern struct queue_backend queue_backend_ram; 49 50 static void queue_envelope_cache_add(struct envelope *); 51 static void queue_envelope_cache_update(struct envelope *); 52 static void queue_envelope_cache_del(uint64_t evpid); 53 54 TAILQ_HEAD(evplst, envelope); 55 56 static struct tree evpcache_tree; 57 static struct evplst evpcache_list; 58 static struct queue_backend *backend; 59 60 static int (*handler_close)(void); 61 static int (*handler_message_create)(uint32_t *); 62 static int (*handler_message_commit)(uint32_t, const char*); 63 static int (*handler_message_delete)(uint32_t); 64 static int (*handler_message_fd_r)(uint32_t); 65 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 66 static int (*handler_envelope_delete)(uint64_t); 67 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 68 static int (*handler_envelope_load)(uint64_t, char *, size_t); 69 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 70 static int (*handler_message_walk)(uint64_t *, char *, size_t, 71 uint32_t, int *, void **); 72 73 #ifdef QUEUE_PROFILING 74 75 static struct { 76 struct timespec t0; 77 const char *name; 78 } profile; 79 80 static inline void profile_enter(const char *name) 81 { 82 if ((profiling & PROFILE_QUEUE) == 0) 83 return; 84 85 profile.name = name; 86 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 87 } 88 89 static inline void profile_leave(void) 90 { 91 struct timespec t1, dt; 92 93 if ((profiling & PROFILE_QUEUE) == 0) 94 return; 95 96 clock_gettime(CLOCK_MONOTONIC, &t1); 97 timespecsub(&t1, &profile.t0, &dt); 98 log_debug("profile-queue: %s %lld.%09ld", profile.name, 99 (long long)dt.tv_sec, dt.tv_nsec); 100 } 101 #else 102 #define profile_enter(x) do {} while (0) 103 #define profile_leave() do {} while (0) 104 #endif 105 106 static int 107 queue_message_path(uint32_t msgid, char *buf, size_t len) 108 { 109 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 110 } 111 112 int 113 queue_init(const char *name, int server) 114 { 115 struct passwd *pwq; 116 struct group *gr; 117 int r; 118 119 pwq = getpwnam(SMTPD_QUEUE_USER); 120 if (pwq == NULL) 121 fatalx("unknown user %s", SMTPD_QUEUE_USER); 122 123 gr = getgrnam(SMTPD_QUEUE_GROUP); 124 if (gr == NULL) 125 fatalx("unknown group %s", SMTPD_QUEUE_GROUP); 126 127 tree_init(&evpcache_tree); 128 TAILQ_INIT(&evpcache_list); 129 130 if (!strcmp(name, "fs")) 131 backend = &queue_backend_fs; 132 else if (!strcmp(name, "null")) 133 backend = &queue_backend_null; 134 else if (!strcmp(name, "ram")) 135 backend = &queue_backend_ram; 136 else 137 backend = &queue_backend_proc; 138 139 if (server) { 140 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 141 fatalx("error in spool directory setup"); 142 if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) 143 fatalx("error in offline directory setup"); 144 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 145 fatalx("error in purge directory setup"); 146 147 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 148 149 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 150 fatalx("error in purge directory setup"); 151 } 152 153 r = backend->init(pwq, server, name); 154 155 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 156 157 return (r); 158 } 159 160 int 161 queue_close(void) 162 { 163 if (handler_close) 164 return (handler_close()); 165 166 return (1); 167 } 168 169 int 170 queue_message_create(uint32_t *msgid) 171 { 172 int r; 173 174 profile_enter("queue_message_create"); 175 r = handler_message_create(msgid); 176 profile_leave(); 177 178 log_trace(TRACE_QUEUE, 179 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 180 r, *msgid); 181 182 return (r); 183 } 184 185 int 186 queue_message_delete(uint32_t msgid) 187 { 188 char msgpath[PATH_MAX]; 189 uint64_t evpid; 190 void *iter; 191 int r; 192 193 profile_enter("queue_message_delete"); 194 r = handler_message_delete(msgid); 195 profile_leave(); 196 197 /* in case the message is incoming */ 198 queue_message_path(msgid, msgpath, sizeof(msgpath)); 199 unlink(msgpath); 200 201 /* remove remaining envelopes from the cache if any (on rollback) */ 202 evpid = msgid_to_evpid(msgid); 203 for (;;) { 204 iter = NULL; 205 if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL)) 206 break; 207 if (evpid_to_msgid(evpid) != msgid) 208 break; 209 queue_envelope_cache_del(evpid); 210 } 211 212 log_trace(TRACE_QUEUE, 213 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 214 215 return (r); 216 } 217 218 int 219 queue_message_commit(uint32_t msgid) 220 { 221 int r; 222 char msgpath[PATH_MAX]; 223 char tmppath[PATH_MAX]; 224 FILE *ifp = NULL; 225 FILE *ofp = NULL; 226 227 profile_enter("queue_message_commit"); 228 229 queue_message_path(msgid, msgpath, sizeof(msgpath)); 230 231 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 232 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 233 ifp = fopen(msgpath, "r"); 234 ofp = fopen(tmppath, "w+"); 235 if (ifp == NULL || ofp == NULL) 236 goto err; 237 if (!compress_file(ifp, ofp)) 238 goto err; 239 fclose(ifp); 240 fclose(ofp); 241 ifp = NULL; 242 ofp = NULL; 243 244 if (rename(tmppath, msgpath) == -1) { 245 if (errno == ENOSPC) 246 return (0); 247 unlink(tmppath); 248 log_warn("rename"); 249 return (0); 250 } 251 } 252 253 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 254 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 255 ifp = fopen(msgpath, "r"); 256 ofp = fopen(tmppath, "w+"); 257 if (ifp == NULL || ofp == NULL) 258 goto err; 259 if (!crypto_encrypt_file(ifp, ofp)) 260 goto err; 261 fclose(ifp); 262 fclose(ofp); 263 ifp = NULL; 264 ofp = NULL; 265 266 if (rename(tmppath, msgpath) == -1) { 267 if (errno == ENOSPC) 268 return (0); 269 unlink(tmppath); 270 log_warn("rename"); 271 return (0); 272 } 273 } 274 275 r = handler_message_commit(msgid, msgpath); 276 profile_leave(); 277 278 /* in case it's not done by the backend */ 279 unlink(msgpath); 280 281 log_trace(TRACE_QUEUE, 282 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 283 msgid, r); 284 285 return (r); 286 287 err: 288 if (ifp) 289 fclose(ifp); 290 if (ofp) 291 fclose(ofp); 292 return 0; 293 } 294 295 int 296 queue_message_fd_r(uint32_t msgid) 297 { 298 int fdin = -1, fdout = -1, fd = -1; 299 FILE *ifp = NULL; 300 FILE *ofp = NULL; 301 302 profile_enter("queue_message_fd_r"); 303 fdin = handler_message_fd_r(msgid); 304 profile_leave(); 305 306 log_trace(TRACE_QUEUE, 307 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 308 309 if (fdin == -1) 310 return (-1); 311 312 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 313 if ((fdout = mktmpfile()) == -1) 314 goto err; 315 if ((fd = dup(fdout)) == -1) 316 goto err; 317 if ((ifp = fdopen(fdin, "r")) == NULL) 318 goto err; 319 fdin = fd; 320 fd = -1; 321 if ((ofp = fdopen(fdout, "w+")) == NULL) 322 goto err; 323 324 if (!crypto_decrypt_file(ifp, ofp)) 325 goto err; 326 327 fclose(ifp); 328 ifp = NULL; 329 fclose(ofp); 330 ofp = NULL; 331 lseek(fdin, SEEK_SET, 0); 332 } 333 334 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 335 if ((fdout = mktmpfile()) == -1) 336 goto err; 337 if ((fd = dup(fdout)) == -1) 338 goto err; 339 if ((ifp = fdopen(fdin, "r")) == NULL) 340 goto err; 341 fdin = fd; 342 fd = -1; 343 if ((ofp = fdopen(fdout, "w+")) == NULL) 344 goto err; 345 346 if (!uncompress_file(ifp, ofp)) 347 goto err; 348 349 fclose(ifp); 350 ifp = NULL; 351 fclose(ofp); 352 ofp = NULL; 353 lseek(fdin, SEEK_SET, 0); 354 } 355 356 return (fdin); 357 358 err: 359 if (fd != -1) 360 close(fd); 361 if (fdin != -1) 362 close(fdin); 363 if (fdout != -1) 364 close(fdout); 365 if (ifp) 366 fclose(ifp); 367 if (ofp) 368 fclose(ofp); 369 return -1; 370 } 371 372 int 373 queue_message_fd_rw(uint32_t msgid) 374 { 375 char buf[PATH_MAX]; 376 377 queue_message_path(msgid, buf, sizeof(buf)); 378 379 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 380 } 381 382 static int 383 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 384 { 385 char *evp; 386 size_t evplen; 387 size_t complen; 388 char compbuf[sizeof(struct envelope)]; 389 size_t enclen; 390 char encbuf[sizeof(struct envelope)]; 391 392 evp = evpbuf; 393 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 394 if (evplen == 0) 395 return (0); 396 397 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 398 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 399 if (complen == 0) 400 return (0); 401 evp = compbuf; 402 evplen = complen; 403 } 404 405 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 406 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 407 if (enclen == 0) 408 return (0); 409 evp = encbuf; 410 evplen = enclen; 411 } 412 413 memmove(evpbuf, evp, evplen); 414 415 return (evplen); 416 } 417 418 static int 419 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 420 { 421 char *evp; 422 size_t evplen; 423 char compbuf[sizeof(struct envelope)]; 424 size_t complen; 425 char encbuf[sizeof(struct envelope)]; 426 size_t enclen; 427 428 evp = evpbuf; 429 evplen = evpbufsize; 430 431 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 432 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 433 if (enclen == 0) 434 return (0); 435 evp = encbuf; 436 evplen = enclen; 437 } 438 439 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 440 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 441 if (complen == 0) 442 return (0); 443 evp = compbuf; 444 evplen = complen; 445 } 446 447 return (envelope_load_buffer(ep, evp, evplen)); 448 } 449 450 static void 451 queue_envelope_cache_add(struct envelope *e) 452 { 453 struct envelope *cached; 454 455 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 456 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 457 458 cached = xcalloc(1, sizeof *cached); 459 *cached = *e; 460 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 461 tree_xset(&evpcache_tree, e->id, cached); 462 stat_increment("queue.evpcache.size", 1); 463 } 464 465 static void 466 queue_envelope_cache_update(struct envelope *e) 467 { 468 struct envelope *cached; 469 470 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 471 queue_envelope_cache_add(e); 472 stat_increment("queue.evpcache.update.missed", 1); 473 } else { 474 TAILQ_REMOVE(&evpcache_list, cached, entry); 475 *cached = *e; 476 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 477 stat_increment("queue.evpcache.update.hit", 1); 478 } 479 } 480 481 static void 482 queue_envelope_cache_del(uint64_t evpid) 483 { 484 struct envelope *cached; 485 486 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 487 return; 488 489 TAILQ_REMOVE(&evpcache_list, cached, entry); 490 free(cached); 491 stat_decrement("queue.evpcache.size", 1); 492 } 493 494 int 495 queue_envelope_create(struct envelope *ep) 496 { 497 int r; 498 char evpbuf[sizeof(struct envelope)]; 499 size_t evplen; 500 uint64_t evpid; 501 uint32_t msgid; 502 503 ep->creation = time(NULL); 504 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 505 if (evplen == 0) 506 return (0); 507 508 evpid = ep->id; 509 msgid = evpid_to_msgid(evpid); 510 511 profile_enter("queue_envelope_create"); 512 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 513 profile_leave(); 514 515 log_trace(TRACE_QUEUE, 516 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 517 evpid, evplen, r, ep->id); 518 519 if (!r) { 520 ep->creation = 0; 521 ep->id = 0; 522 } 523 524 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 525 queue_envelope_cache_add(ep); 526 527 return (r); 528 } 529 530 int 531 queue_envelope_delete(uint64_t evpid) 532 { 533 int r; 534 535 if (env->sc_queue_flags & QUEUE_EVPCACHE) 536 queue_envelope_cache_del(evpid); 537 538 profile_enter("queue_envelope_delete"); 539 r = handler_envelope_delete(evpid); 540 profile_leave(); 541 542 log_trace(TRACE_QUEUE, 543 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 544 evpid, r); 545 546 return (r); 547 } 548 549 int 550 queue_envelope_load(uint64_t evpid, struct envelope *ep) 551 { 552 const char *e; 553 char evpbuf[sizeof(struct envelope)]; 554 size_t evplen; 555 struct envelope *cached; 556 557 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 558 (cached = tree_get(&evpcache_tree, evpid))) { 559 *ep = *cached; 560 stat_increment("queue.evpcache.load.hit", 1); 561 return (1); 562 } 563 564 ep->id = evpid; 565 profile_enter("queue_envelope_load"); 566 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 567 profile_leave(); 568 569 log_trace(TRACE_QUEUE, 570 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 571 evpid, evplen); 572 573 if (evplen == 0) 574 return (0); 575 576 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 577 if ((e = envelope_validate(ep)) == NULL) { 578 ep->id = evpid; 579 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 580 queue_envelope_cache_add(ep); 581 stat_increment("queue.evpcache.load.missed", 1); 582 } 583 return (1); 584 } 585 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 586 evpid, e); 587 } 588 return (0); 589 } 590 591 int 592 queue_envelope_update(struct envelope *ep) 593 { 594 char evpbuf[sizeof(struct envelope)]; 595 size_t evplen; 596 int r; 597 598 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 599 if (evplen == 0) 600 return (0); 601 602 profile_enter("queue_envelope_update"); 603 r = handler_envelope_update(ep->id, evpbuf, evplen); 604 profile_leave(); 605 606 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 607 queue_envelope_cache_update(ep); 608 609 log_trace(TRACE_QUEUE, 610 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 611 ep->id, r); 612 613 return (r); 614 } 615 616 int 617 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data) 618 { 619 char evpbuf[sizeof(struct envelope)]; 620 uint64_t evpid; 621 int r; 622 const char *e; 623 624 profile_enter("queue_message_walk"); 625 r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf, 626 msgid, done, data); 627 profile_leave(); 628 629 log_trace(TRACE_QUEUE, 630 "queue-backend: queue_message_walk() -> %d (%016"PRIx64")", 631 r, evpid); 632 633 if (r == -1) 634 return (r); 635 636 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 637 if ((e = envelope_validate(ep)) == NULL) { 638 ep->id = evpid; 639 /* 640 * do not cache the envelope here, while discovering 641 * envelopes one could re-run discover on already 642 * scheduled envelopes which leads to triggering of 643 * strict checks in caching. Envelopes could anyway 644 * be loaded from backend if it isn't cached. 645 */ 646 return (1); 647 } 648 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 649 evpid, e); 650 } 651 return (0); 652 } 653 654 int 655 queue_envelope_walk(struct envelope *ep) 656 { 657 const char *e; 658 uint64_t evpid; 659 char evpbuf[sizeof(struct envelope)]; 660 int r; 661 662 profile_enter("queue_envelope_walk"); 663 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 664 profile_leave(); 665 666 log_trace(TRACE_QUEUE, 667 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 668 r, evpid); 669 670 if (r == -1) 671 return (r); 672 673 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 674 if ((e = envelope_validate(ep)) == NULL) { 675 ep->id = evpid; 676 if (env->sc_queue_flags & QUEUE_EVPCACHE) 677 queue_envelope_cache_add(ep); 678 return (1); 679 } 680 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 681 evpid, e); 682 } 683 return (0); 684 } 685 686 uint32_t 687 queue_generate_msgid(void) 688 { 689 uint32_t msgid; 690 691 while ((msgid = arc4random()) == 0) 692 ; 693 694 return msgid; 695 } 696 697 uint64_t 698 queue_generate_evpid(uint32_t msgid) 699 { 700 uint32_t rnd; 701 uint64_t evpid; 702 703 while ((rnd = arc4random()) == 0) 704 ; 705 706 evpid = msgid; 707 evpid <<= 32; 708 evpid |= rnd; 709 710 return evpid; 711 } 712 713 static const char* 714 envelope_validate(struct envelope *ep) 715 { 716 if (ep->version != SMTPD_ENVELOPE_VERSION) 717 return "version mismatch"; 718 719 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 720 return "invalid helo"; 721 if (ep->helo[0] == '\0') 722 return "empty helo"; 723 724 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 725 return "invalid hostname"; 726 if (ep->hostname[0] == '\0') 727 return "empty hostname"; 728 729 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 730 return "invalid error line"; 731 732 if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL) 733 return "unknown dispatcher"; 734 735 return NULL; 736 } 737 738 void 739 queue_api_on_close(int(*cb)(void)) 740 { 741 handler_close = cb; 742 } 743 744 void 745 queue_api_on_message_create(int(*cb)(uint32_t *)) 746 { 747 handler_message_create = cb; 748 } 749 750 void 751 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 752 { 753 handler_message_commit = cb; 754 } 755 756 void 757 queue_api_on_message_delete(int(*cb)(uint32_t)) 758 { 759 handler_message_delete = cb; 760 } 761 762 void 763 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 764 { 765 handler_message_fd_r = cb; 766 } 767 768 void 769 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 770 { 771 handler_envelope_create = cb; 772 } 773 774 void 775 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 776 { 777 handler_envelope_delete = cb; 778 } 779 780 void 781 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 782 { 783 handler_envelope_update = cb; 784 } 785 786 void 787 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 788 { 789 handler_envelope_load = cb; 790 } 791 792 void 793 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 794 { 795 handler_envelope_walk = cb; 796 } 797 798 void 799 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t, 800 uint32_t, int *, void **)) 801 { 802 handler_message_walk = cb; 803 } 804