1 /* $OpenBSD: queue_backend.c,v 1.68 2021/06/14 17:58:16 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <errno.h> 20 #include <fcntl.h> 21 #include <grp.h> 22 #include <inttypes.h> 23 #include <pwd.h> 24 #include <stdlib.h> 25 #include <string.h> 26 #include <unistd.h> 27 28 #include "smtpd.h" 29 #include "log.h" 30 31 static const char* envelope_validate(struct envelope *); 32 33 extern struct queue_backend queue_backend_fs; 34 extern struct queue_backend queue_backend_null; 35 extern struct queue_backend queue_backend_proc; 36 extern struct queue_backend queue_backend_ram; 37 38 static void queue_envelope_cache_add(struct envelope *); 39 static void queue_envelope_cache_update(struct envelope *); 40 static void queue_envelope_cache_del(uint64_t evpid); 41 42 TAILQ_HEAD(evplst, envelope); 43 44 static struct tree evpcache_tree; 45 static struct evplst evpcache_list; 46 static struct queue_backend *backend; 47 48 static int (*handler_close)(void); 49 static int (*handler_message_create)(uint32_t *); 50 static int (*handler_message_commit)(uint32_t, const char*); 51 static int (*handler_message_delete)(uint32_t); 52 static int (*handler_message_fd_r)(uint32_t); 53 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 54 static int (*handler_envelope_delete)(uint64_t); 55 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 56 static int (*handler_envelope_load)(uint64_t, char *, size_t); 57 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 58 static int (*handler_message_walk)(uint64_t *, char *, size_t, 59 uint32_t, int *, void **); 60 61 #ifdef QUEUE_PROFILING 62 63 static struct { 64 struct timespec t0; 65 const char *name; 66 } profile; 67 68 static inline void profile_enter(const char *name) 69 { 70 if ((profiling & PROFILE_QUEUE) == 0) 71 return; 72 73 profile.name = name; 74 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 75 } 76 77 static inline void profile_leave(void) 78 { 79 struct timespec t1, dt; 80 81 if ((profiling & PROFILE_QUEUE) == 0) 82 return; 83 84 clock_gettime(CLOCK_MONOTONIC, &t1); 85 timespecsub(&t1, &profile.t0, &dt); 86 log_debug("profile-queue: %s %lld.%09ld", profile.name, 87 (long long)dt.tv_sec, dt.tv_nsec); 88 } 89 #else 90 #define profile_enter(x) do {} while (0) 91 #define profile_leave() do {} while (0) 92 #endif 93 94 static int 95 queue_message_path(uint32_t msgid, char *buf, size_t len) 96 { 97 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 98 } 99 100 int 101 queue_init(const char *name, int server) 102 { 103 struct passwd *pwq; 104 struct group *gr; 105 int r; 106 107 pwq = getpwnam(SMTPD_QUEUE_USER); 108 if (pwq == NULL) 109 fatalx("unknown user %s", SMTPD_QUEUE_USER); 110 111 gr = getgrnam(SMTPD_QUEUE_GROUP); 112 if (gr == NULL) 113 fatalx("unknown group %s", SMTPD_QUEUE_GROUP); 114 115 tree_init(&evpcache_tree); 116 TAILQ_INIT(&evpcache_list); 117 118 if (!strcmp(name, "fs")) 119 backend = &queue_backend_fs; 120 else if (!strcmp(name, "null")) 121 backend = &queue_backend_null; 122 else if (!strcmp(name, "ram")) 123 backend = &queue_backend_ram; 124 else 125 backend = &queue_backend_proc; 126 127 if (server) { 128 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 129 fatalx("error in spool directory setup"); 130 if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) 131 fatalx("error in offline directory setup"); 132 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 133 fatalx("error in purge directory setup"); 134 135 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 136 137 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 138 fatalx("error in purge directory setup"); 139 } 140 141 r = backend->init(pwq, server, name); 142 143 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 144 145 return (r); 146 } 147 148 int 149 queue_close(void) 150 { 151 if (handler_close) 152 return (handler_close()); 153 154 return (1); 155 } 156 157 int 158 queue_message_create(uint32_t *msgid) 159 { 160 int r; 161 162 profile_enter("queue_message_create"); 163 r = handler_message_create(msgid); 164 profile_leave(); 165 166 log_trace(TRACE_QUEUE, 167 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 168 r, *msgid); 169 170 return (r); 171 } 172 173 int 174 queue_message_delete(uint32_t msgid) 175 { 176 char msgpath[PATH_MAX]; 177 uint64_t evpid; 178 void *iter; 179 int r; 180 181 profile_enter("queue_message_delete"); 182 r = handler_message_delete(msgid); 183 profile_leave(); 184 185 /* in case the message is incoming */ 186 queue_message_path(msgid, msgpath, sizeof(msgpath)); 187 unlink(msgpath); 188 189 /* remove remaining envelopes from the cache if any (on rollback) */ 190 evpid = msgid_to_evpid(msgid); 191 for (;;) { 192 iter = NULL; 193 if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL)) 194 break; 195 if (evpid_to_msgid(evpid) != msgid) 196 break; 197 queue_envelope_cache_del(evpid); 198 } 199 200 log_trace(TRACE_QUEUE, 201 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 202 203 return (r); 204 } 205 206 int 207 queue_message_commit(uint32_t msgid) 208 { 209 int r; 210 char msgpath[PATH_MAX]; 211 char tmppath[PATH_MAX]; 212 FILE *ifp = NULL; 213 FILE *ofp = NULL; 214 215 profile_enter("queue_message_commit"); 216 217 queue_message_path(msgid, msgpath, sizeof(msgpath)); 218 219 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 220 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 221 ifp = fopen(msgpath, "r"); 222 ofp = fopen(tmppath, "w+"); 223 if (ifp == NULL || ofp == NULL) 224 goto err; 225 if (!compress_file(ifp, ofp)) 226 goto err; 227 fclose(ifp); 228 fclose(ofp); 229 ifp = NULL; 230 ofp = NULL; 231 232 if (rename(tmppath, msgpath) == -1) { 233 if (errno == ENOSPC) 234 return (0); 235 unlink(tmppath); 236 log_warn("rename"); 237 return (0); 238 } 239 } 240 241 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 242 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 243 ifp = fopen(msgpath, "r"); 244 ofp = fopen(tmppath, "w+"); 245 if (ifp == NULL || ofp == NULL) 246 goto err; 247 if (!crypto_encrypt_file(ifp, ofp)) 248 goto err; 249 fclose(ifp); 250 fclose(ofp); 251 ifp = NULL; 252 ofp = NULL; 253 254 if (rename(tmppath, msgpath) == -1) { 255 if (errno == ENOSPC) 256 return (0); 257 unlink(tmppath); 258 log_warn("rename"); 259 return (0); 260 } 261 } 262 263 r = handler_message_commit(msgid, msgpath); 264 profile_leave(); 265 266 /* in case it's not done by the backend */ 267 unlink(msgpath); 268 269 log_trace(TRACE_QUEUE, 270 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 271 msgid, r); 272 273 return (r); 274 275 err: 276 if (ifp) 277 fclose(ifp); 278 if (ofp) 279 fclose(ofp); 280 return 0; 281 } 282 283 int 284 queue_message_fd_r(uint32_t msgid) 285 { 286 int fdin = -1, fdout = -1, fd = -1; 287 FILE *ifp = NULL; 288 FILE *ofp = NULL; 289 290 profile_enter("queue_message_fd_r"); 291 fdin = handler_message_fd_r(msgid); 292 profile_leave(); 293 294 log_trace(TRACE_QUEUE, 295 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 296 297 if (fdin == -1) 298 return (-1); 299 300 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 301 if ((fdout = mktmpfile()) == -1) 302 goto err; 303 if ((fd = dup(fdout)) == -1) 304 goto err; 305 if ((ifp = fdopen(fdin, "r")) == NULL) 306 goto err; 307 fdin = fd; 308 fd = -1; 309 if ((ofp = fdopen(fdout, "w+")) == NULL) 310 goto err; 311 312 if (!crypto_decrypt_file(ifp, ofp)) 313 goto err; 314 315 fclose(ifp); 316 ifp = NULL; 317 fclose(ofp); 318 ofp = NULL; 319 lseek(fdin, SEEK_SET, 0); 320 } 321 322 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 323 if ((fdout = mktmpfile()) == -1) 324 goto err; 325 if ((fd = dup(fdout)) == -1) 326 goto err; 327 if ((ifp = fdopen(fdin, "r")) == NULL) 328 goto err; 329 fdin = fd; 330 fd = -1; 331 if ((ofp = fdopen(fdout, "w+")) == NULL) 332 goto err; 333 334 if (!uncompress_file(ifp, ofp)) 335 goto err; 336 337 fclose(ifp); 338 ifp = NULL; 339 fclose(ofp); 340 ofp = NULL; 341 lseek(fdin, SEEK_SET, 0); 342 } 343 344 return (fdin); 345 346 err: 347 if (fd != -1) 348 close(fd); 349 if (fdin != -1) 350 close(fdin); 351 if (fdout != -1) 352 close(fdout); 353 if (ifp) 354 fclose(ifp); 355 if (ofp) 356 fclose(ofp); 357 return -1; 358 } 359 360 int 361 queue_message_fd_rw(uint32_t msgid) 362 { 363 char buf[PATH_MAX]; 364 365 queue_message_path(msgid, buf, sizeof(buf)); 366 367 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 368 } 369 370 static int 371 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 372 { 373 char *evp; 374 size_t evplen; 375 size_t complen; 376 char compbuf[sizeof(struct envelope)]; 377 size_t enclen; 378 char encbuf[sizeof(struct envelope)]; 379 380 evp = evpbuf; 381 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 382 if (evplen == 0) 383 return (0); 384 385 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 386 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 387 if (complen == 0) 388 return (0); 389 evp = compbuf; 390 evplen = complen; 391 } 392 393 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 394 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 395 if (enclen == 0) 396 return (0); 397 evp = encbuf; 398 evplen = enclen; 399 } 400 401 memmove(evpbuf, evp, evplen); 402 403 return (evplen); 404 } 405 406 static int 407 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 408 { 409 char *evp; 410 size_t evplen; 411 char compbuf[sizeof(struct envelope)]; 412 size_t complen; 413 char encbuf[sizeof(struct envelope)]; 414 size_t enclen; 415 416 evp = evpbuf; 417 evplen = evpbufsize; 418 419 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 420 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 421 if (enclen == 0) 422 return (0); 423 evp = encbuf; 424 evplen = enclen; 425 } 426 427 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 428 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 429 if (complen == 0) 430 return (0); 431 evp = compbuf; 432 evplen = complen; 433 } 434 435 return (envelope_load_buffer(ep, evp, evplen)); 436 } 437 438 static void 439 queue_envelope_cache_add(struct envelope *e) 440 { 441 struct envelope *cached; 442 443 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 444 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 445 446 cached = xcalloc(1, sizeof *cached); 447 *cached = *e; 448 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 449 tree_xset(&evpcache_tree, e->id, cached); 450 stat_increment("queue.evpcache.size", 1); 451 } 452 453 static void 454 queue_envelope_cache_update(struct envelope *e) 455 { 456 struct envelope *cached; 457 458 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 459 queue_envelope_cache_add(e); 460 stat_increment("queue.evpcache.update.missed", 1); 461 } else { 462 TAILQ_REMOVE(&evpcache_list, cached, entry); 463 *cached = *e; 464 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 465 stat_increment("queue.evpcache.update.hit", 1); 466 } 467 } 468 469 static void 470 queue_envelope_cache_del(uint64_t evpid) 471 { 472 struct envelope *cached; 473 474 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 475 return; 476 477 TAILQ_REMOVE(&evpcache_list, cached, entry); 478 free(cached); 479 stat_decrement("queue.evpcache.size", 1); 480 } 481 482 int 483 queue_envelope_create(struct envelope *ep) 484 { 485 int r; 486 char evpbuf[sizeof(struct envelope)]; 487 size_t evplen; 488 uint64_t evpid; 489 uint32_t msgid; 490 491 ep->creation = time(NULL); 492 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 493 if (evplen == 0) 494 return (0); 495 496 evpid = ep->id; 497 msgid = evpid_to_msgid(evpid); 498 499 profile_enter("queue_envelope_create"); 500 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 501 profile_leave(); 502 503 log_trace(TRACE_QUEUE, 504 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 505 evpid, evplen, r, ep->id); 506 507 if (!r) { 508 ep->creation = 0; 509 ep->id = 0; 510 } 511 512 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 513 queue_envelope_cache_add(ep); 514 515 return (r); 516 } 517 518 int 519 queue_envelope_delete(uint64_t evpid) 520 { 521 int r; 522 523 if (env->sc_queue_flags & QUEUE_EVPCACHE) 524 queue_envelope_cache_del(evpid); 525 526 profile_enter("queue_envelope_delete"); 527 r = handler_envelope_delete(evpid); 528 profile_leave(); 529 530 log_trace(TRACE_QUEUE, 531 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 532 evpid, r); 533 534 return (r); 535 } 536 537 int 538 queue_envelope_load(uint64_t evpid, struct envelope *ep) 539 { 540 const char *e; 541 char evpbuf[sizeof(struct envelope)]; 542 size_t evplen; 543 struct envelope *cached; 544 545 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 546 (cached = tree_get(&evpcache_tree, evpid))) { 547 *ep = *cached; 548 stat_increment("queue.evpcache.load.hit", 1); 549 return (1); 550 } 551 552 ep->id = evpid; 553 profile_enter("queue_envelope_load"); 554 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 555 profile_leave(); 556 557 log_trace(TRACE_QUEUE, 558 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 559 evpid, evplen); 560 561 if (evplen == 0) 562 return (0); 563 564 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 565 if ((e = envelope_validate(ep)) == NULL) { 566 ep->id = evpid; 567 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 568 queue_envelope_cache_add(ep); 569 stat_increment("queue.evpcache.load.missed", 1); 570 } 571 return (1); 572 } 573 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 574 evpid, e); 575 } 576 return (0); 577 } 578 579 int 580 queue_envelope_update(struct envelope *ep) 581 { 582 char evpbuf[sizeof(struct envelope)]; 583 size_t evplen; 584 int r; 585 586 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 587 if (evplen == 0) 588 return (0); 589 590 profile_enter("queue_envelope_update"); 591 r = handler_envelope_update(ep->id, evpbuf, evplen); 592 profile_leave(); 593 594 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 595 queue_envelope_cache_update(ep); 596 597 log_trace(TRACE_QUEUE, 598 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 599 ep->id, r); 600 601 return (r); 602 } 603 604 int 605 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data) 606 { 607 char evpbuf[sizeof(struct envelope)]; 608 uint64_t evpid; 609 int r; 610 const char *e; 611 612 profile_enter("queue_message_walk"); 613 r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf, 614 msgid, done, data); 615 profile_leave(); 616 617 log_trace(TRACE_QUEUE, 618 "queue-backend: queue_message_walk() -> %d (%016"PRIx64")", 619 r, evpid); 620 621 if (r == -1) 622 return (r); 623 624 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 625 if ((e = envelope_validate(ep)) == NULL) { 626 ep->id = evpid; 627 /* 628 * do not cache the envelope here, while discovering 629 * envelopes one could re-run discover on already 630 * scheduled envelopes which leads to triggering of 631 * strict checks in caching. Envelopes could anyway 632 * be loaded from backend if it isn't cached. 633 */ 634 return (1); 635 } 636 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 637 evpid, e); 638 } 639 return (0); 640 } 641 642 int 643 queue_envelope_walk(struct envelope *ep) 644 { 645 const char *e; 646 uint64_t evpid; 647 char evpbuf[sizeof(struct envelope)]; 648 int r; 649 650 profile_enter("queue_envelope_walk"); 651 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 652 profile_leave(); 653 654 log_trace(TRACE_QUEUE, 655 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 656 r, evpid); 657 658 if (r == -1) 659 return (r); 660 661 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 662 if ((e = envelope_validate(ep)) == NULL) { 663 ep->id = evpid; 664 if (env->sc_queue_flags & QUEUE_EVPCACHE) 665 queue_envelope_cache_add(ep); 666 return (1); 667 } 668 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 669 evpid, e); 670 } 671 return (0); 672 } 673 674 uint32_t 675 queue_generate_msgid(void) 676 { 677 uint32_t msgid; 678 679 while ((msgid = arc4random()) == 0) 680 ; 681 682 return msgid; 683 } 684 685 uint64_t 686 queue_generate_evpid(uint32_t msgid) 687 { 688 uint32_t rnd; 689 uint64_t evpid; 690 691 while ((rnd = arc4random()) == 0) 692 ; 693 694 evpid = msgid; 695 evpid <<= 32; 696 evpid |= rnd; 697 698 return evpid; 699 } 700 701 static const char* 702 envelope_validate(struct envelope *ep) 703 { 704 if (ep->version != SMTPD_ENVELOPE_VERSION) 705 return "version mismatch"; 706 707 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 708 return "invalid helo"; 709 if (ep->helo[0] == '\0') 710 return "empty helo"; 711 712 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 713 return "invalid hostname"; 714 if (ep->hostname[0] == '\0') 715 return "empty hostname"; 716 717 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 718 return "invalid error line"; 719 720 if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL) 721 return "unknown dispatcher"; 722 723 return NULL; 724 } 725 726 void 727 queue_api_on_close(int(*cb)(void)) 728 { 729 handler_close = cb; 730 } 731 732 void 733 queue_api_on_message_create(int(*cb)(uint32_t *)) 734 { 735 handler_message_create = cb; 736 } 737 738 void 739 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 740 { 741 handler_message_commit = cb; 742 } 743 744 void 745 queue_api_on_message_delete(int(*cb)(uint32_t)) 746 { 747 handler_message_delete = cb; 748 } 749 750 void 751 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 752 { 753 handler_message_fd_r = cb; 754 } 755 756 void 757 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 758 { 759 handler_envelope_create = cb; 760 } 761 762 void 763 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 764 { 765 handler_envelope_delete = cb; 766 } 767 768 void 769 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 770 { 771 handler_envelope_update = cb; 772 } 773 774 void 775 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 776 { 777 handler_envelope_load = cb; 778 } 779 780 void 781 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 782 { 783 handler_envelope_walk = cb; 784 } 785 786 void 787 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t, 788 uint32_t, int *, void **)) 789 { 790 handler_message_walk = cb; 791 } 792