1 /* $OpenBSD: queue_backend.c,v 1.63 2018/05/14 15:23:05 gilles Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <grp.h> 31 #include <imsg.h> 32 #include <limits.h> 33 #include <inttypes.h> 34 #include <libgen.h> 35 #include <pwd.h> 36 #include <stdio.h> 37 #include <stdlib.h> 38 #include <string.h> 39 #include <time.h> 40 #include <unistd.h> 41 42 #include "smtpd.h" 43 #include "log.h" 44 45 static const char* envelope_validate(struct envelope *); 46 47 extern struct queue_backend queue_backend_fs; 48 extern struct queue_backend queue_backend_null; 49 extern struct queue_backend queue_backend_proc; 50 extern struct queue_backend queue_backend_ram; 51 52 static void queue_envelope_cache_add(struct envelope *); 53 static void queue_envelope_cache_update(struct envelope *); 54 static void queue_envelope_cache_del(uint64_t evpid); 55 56 TAILQ_HEAD(evplst, envelope); 57 58 static struct tree evpcache_tree; 59 static struct evplst evpcache_list; 60 static struct queue_backend *backend; 61 62 static int (*handler_close)(void); 63 static int (*handler_message_create)(uint32_t *); 64 static int (*handler_message_commit)(uint32_t, const char*); 65 static int (*handler_message_delete)(uint32_t); 66 static int (*handler_message_fd_r)(uint32_t); 67 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 68 static int (*handler_envelope_delete)(uint64_t); 69 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 70 static int (*handler_envelope_load)(uint64_t, char *, size_t); 71 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 72 static int (*handler_message_walk)(uint64_t *, char *, size_t, 73 uint32_t, int *, void **); 74 75 #ifdef QUEUE_PROFILING 76 77 static struct { 78 struct timespec t0; 79 const char *name; 80 } profile; 81 82 static inline void profile_enter(const char *name) 83 { 84 if ((profiling & PROFILE_QUEUE) == 0) 85 return; 86 87 profile.name = name; 88 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 89 } 90 91 static inline void profile_leave(void) 92 { 93 struct timespec t1, dt; 94 95 if ((profiling & PROFILE_QUEUE) == 0) 96 return; 97 98 clock_gettime(CLOCK_MONOTONIC, &t1); 99 timespecsub(&t1, &profile.t0, &dt); 100 log_debug("profile-queue: %s %lld.%09ld", profile.name, 101 (long long)dt.tv_sec, dt.tv_nsec); 102 } 103 #else 104 #define profile_enter(x) do {} while (0) 105 #define profile_leave() do {} while (0) 106 #endif 107 108 static int 109 queue_message_path(uint32_t msgid, char *buf, size_t len) 110 { 111 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 112 } 113 114 int 115 queue_init(const char *name, int server) 116 { 117 struct passwd *pwq; 118 struct group *gr; 119 int r; 120 121 pwq = getpwnam(SMTPD_QUEUE_USER); 122 if (pwq == NULL) 123 errx(1, "unknown user %s", SMTPD_QUEUE_USER); 124 125 gr = getgrnam(SMTPD_QUEUE_GROUP); 126 if (gr == NULL) 127 errx(1, "unknown group %s", SMTPD_QUEUE_GROUP); 128 129 tree_init(&evpcache_tree); 130 TAILQ_INIT(&evpcache_list); 131 132 if (!strcmp(name, "fs")) 133 backend = &queue_backend_fs; 134 else if (!strcmp(name, "null")) 135 backend = &queue_backend_null; 136 else if (!strcmp(name, "ram")) 137 backend = &queue_backend_ram; 138 else 139 backend = &queue_backend_proc; 140 141 if (server) { 142 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 143 errx(1, "error in spool directory setup"); 144 if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) 145 errx(1, "error in offline directory setup"); 146 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 147 errx(1, "error in purge directory setup"); 148 149 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 150 151 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 152 errx(1, "error in purge directory setup"); 153 } 154 155 r = backend->init(pwq, server, name); 156 157 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 158 159 return (r); 160 } 161 162 int 163 queue_close(void) 164 { 165 if (handler_close) 166 return (handler_close()); 167 168 return (1); 169 } 170 171 int 172 queue_message_create(uint32_t *msgid) 173 { 174 int r; 175 176 profile_enter("queue_message_create"); 177 r = handler_message_create(msgid); 178 profile_leave(); 179 180 log_trace(TRACE_QUEUE, 181 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 182 r, *msgid); 183 184 return (r); 185 } 186 187 int 188 queue_message_delete(uint32_t msgid) 189 { 190 char msgpath[PATH_MAX]; 191 uint64_t evpid; 192 void *iter; 193 int r; 194 195 profile_enter("queue_message_delete"); 196 r = handler_message_delete(msgid); 197 profile_leave(); 198 199 /* in case the message is incoming */ 200 queue_message_path(msgid, msgpath, sizeof(msgpath)); 201 unlink(msgpath); 202 203 /* remove remaining envelopes from the cache if any (on rollback) */ 204 evpid = msgid_to_evpid(msgid); 205 for (;;) { 206 iter = NULL; 207 if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL)) 208 break; 209 if (evpid_to_msgid(evpid) != msgid) 210 break; 211 queue_envelope_cache_del(evpid); 212 } 213 214 log_trace(TRACE_QUEUE, 215 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 216 217 return (r); 218 } 219 220 int 221 queue_message_commit(uint32_t msgid) 222 { 223 int r; 224 char msgpath[PATH_MAX]; 225 char tmppath[PATH_MAX]; 226 FILE *ifp = NULL; 227 FILE *ofp = NULL; 228 229 profile_enter("queue_message_commit"); 230 231 queue_message_path(msgid, msgpath, sizeof(msgpath)); 232 233 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 234 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 235 ifp = fopen(msgpath, "r"); 236 ofp = fopen(tmppath, "w+"); 237 if (ifp == NULL || ofp == NULL) 238 goto err; 239 if (!compress_file(ifp, ofp)) 240 goto err; 241 fclose(ifp); 242 fclose(ofp); 243 ifp = NULL; 244 ofp = NULL; 245 246 if (rename(tmppath, msgpath) == -1) { 247 if (errno == ENOSPC) 248 return (0); 249 unlink(tmppath); 250 log_warn("rename"); 251 return (0); 252 } 253 } 254 255 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 256 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 257 ifp = fopen(msgpath, "r"); 258 ofp = fopen(tmppath, "w+"); 259 if (ifp == NULL || ofp == NULL) 260 goto err; 261 if (!crypto_encrypt_file(ifp, ofp)) 262 goto err; 263 fclose(ifp); 264 fclose(ofp); 265 ifp = NULL; 266 ofp = NULL; 267 268 if (rename(tmppath, msgpath) == -1) { 269 if (errno == ENOSPC) 270 return (0); 271 unlink(tmppath); 272 log_warn("rename"); 273 return (0); 274 } 275 } 276 277 r = handler_message_commit(msgid, msgpath); 278 profile_leave(); 279 280 /* in case it's not done by the backend */ 281 unlink(msgpath); 282 283 log_trace(TRACE_QUEUE, 284 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 285 msgid, r); 286 287 return (r); 288 289 err: 290 if (ifp) 291 fclose(ifp); 292 if (ofp) 293 fclose(ofp); 294 return 0; 295 } 296 297 int 298 queue_message_fd_r(uint32_t msgid) 299 { 300 int fdin = -1, fdout = -1, fd = -1; 301 FILE *ifp = NULL; 302 FILE *ofp = NULL; 303 304 profile_enter("queue_message_fd_r"); 305 fdin = handler_message_fd_r(msgid); 306 profile_leave(); 307 308 log_trace(TRACE_QUEUE, 309 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 310 311 if (fdin == -1) 312 return (-1); 313 314 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 315 if ((fdout = mktmpfile()) == -1) 316 goto err; 317 if ((fd = dup(fdout)) == -1) 318 goto err; 319 if ((ifp = fdopen(fdin, "r")) == NULL) 320 goto err; 321 fdin = fd; 322 fd = -1; 323 if ((ofp = fdopen(fdout, "w+")) == NULL) 324 goto err; 325 326 if (!crypto_decrypt_file(ifp, ofp)) 327 goto err; 328 329 fclose(ifp); 330 ifp = NULL; 331 fclose(ofp); 332 ofp = NULL; 333 lseek(fdin, SEEK_SET, 0); 334 } 335 336 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 337 if ((fdout = mktmpfile()) == -1) 338 goto err; 339 if ((fd = dup(fdout)) == -1) 340 goto err; 341 if ((ifp = fdopen(fdin, "r")) == NULL) 342 goto err; 343 fdin = fd; 344 fd = -1; 345 if ((ofp = fdopen(fdout, "w+")) == NULL) 346 goto err; 347 348 if (!uncompress_file(ifp, ofp)) 349 goto err; 350 351 fclose(ifp); 352 ifp = NULL; 353 fclose(ofp); 354 ofp = NULL; 355 lseek(fdin, SEEK_SET, 0); 356 } 357 358 return (fdin); 359 360 err: 361 if (fd != -1) 362 close(fd); 363 if (fdin != -1) 364 close(fdin); 365 if (fdout != -1) 366 close(fdout); 367 if (ifp) 368 fclose(ifp); 369 if (ofp) 370 fclose(ofp); 371 return -1; 372 } 373 374 int 375 queue_message_fd_rw(uint32_t msgid) 376 { 377 char buf[PATH_MAX]; 378 379 queue_message_path(msgid, buf, sizeof(buf)); 380 381 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 382 } 383 384 static int 385 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 386 { 387 char *evp; 388 size_t evplen; 389 size_t complen; 390 char compbuf[sizeof(struct envelope)]; 391 size_t enclen; 392 char encbuf[sizeof(struct envelope)]; 393 394 evp = evpbuf; 395 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 396 if (evplen == 0) 397 return (0); 398 399 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 400 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 401 if (complen == 0) 402 return (0); 403 evp = compbuf; 404 evplen = complen; 405 } 406 407 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 408 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 409 if (enclen == 0) 410 return (0); 411 evp = encbuf; 412 evplen = enclen; 413 } 414 415 memmove(evpbuf, evp, evplen); 416 417 return (evplen); 418 } 419 420 static int 421 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 422 { 423 char *evp; 424 size_t evplen; 425 char compbuf[sizeof(struct envelope)]; 426 size_t complen; 427 char encbuf[sizeof(struct envelope)]; 428 size_t enclen; 429 430 evp = evpbuf; 431 evplen = evpbufsize; 432 433 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 434 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 435 if (enclen == 0) 436 return (0); 437 evp = encbuf; 438 evplen = enclen; 439 } 440 441 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 442 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 443 if (complen == 0) 444 return (0); 445 evp = compbuf; 446 evplen = complen; 447 } 448 449 return (envelope_load_buffer(ep, evp, evplen)); 450 } 451 452 static void 453 queue_envelope_cache_add(struct envelope *e) 454 { 455 struct envelope *cached; 456 457 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 458 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 459 460 cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); 461 *cached = *e; 462 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 463 tree_xset(&evpcache_tree, e->id, cached); 464 stat_increment("queue.evpcache.size", 1); 465 } 466 467 static void 468 queue_envelope_cache_update(struct envelope *e) 469 { 470 struct envelope *cached; 471 472 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 473 queue_envelope_cache_add(e); 474 stat_increment("queue.evpcache.update.missed", 1); 475 } else { 476 TAILQ_REMOVE(&evpcache_list, cached, entry); 477 *cached = *e; 478 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 479 stat_increment("queue.evpcache.update.hit", 1); 480 } 481 } 482 483 static void 484 queue_envelope_cache_del(uint64_t evpid) 485 { 486 struct envelope *cached; 487 488 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 489 return; 490 491 TAILQ_REMOVE(&evpcache_list, cached, entry); 492 free(cached); 493 stat_decrement("queue.evpcache.size", 1); 494 } 495 496 int 497 queue_envelope_create(struct envelope *ep) 498 { 499 int r; 500 char evpbuf[sizeof(struct envelope)]; 501 size_t evplen; 502 uint64_t evpid; 503 uint32_t msgid; 504 505 ep->creation = time(NULL); 506 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 507 if (evplen == 0) 508 return (0); 509 510 evpid = ep->id; 511 msgid = evpid_to_msgid(evpid); 512 513 profile_enter("queue_envelope_create"); 514 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 515 profile_leave(); 516 517 log_trace(TRACE_QUEUE, 518 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 519 evpid, evplen, r, ep->id); 520 521 if (!r) { 522 ep->creation = 0; 523 ep->id = 0; 524 } 525 526 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 527 queue_envelope_cache_add(ep); 528 529 return (r); 530 } 531 532 int 533 queue_envelope_delete(uint64_t evpid) 534 { 535 int r; 536 537 if (env->sc_queue_flags & QUEUE_EVPCACHE) 538 queue_envelope_cache_del(evpid); 539 540 profile_enter("queue_envelope_delete"); 541 r = handler_envelope_delete(evpid); 542 profile_leave(); 543 544 log_trace(TRACE_QUEUE, 545 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 546 evpid, r); 547 548 return (r); 549 } 550 551 int 552 queue_envelope_load(uint64_t evpid, struct envelope *ep) 553 { 554 const char *e; 555 char evpbuf[sizeof(struct envelope)]; 556 size_t evplen; 557 struct envelope *cached; 558 559 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 560 (cached = tree_get(&evpcache_tree, evpid))) { 561 *ep = *cached; 562 stat_increment("queue.evpcache.load.hit", 1); 563 return (1); 564 } 565 566 ep->id = evpid; 567 profile_enter("queue_envelope_load"); 568 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 569 profile_leave(); 570 571 log_trace(TRACE_QUEUE, 572 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 573 evpid, evplen); 574 575 if (evplen == 0) 576 return (0); 577 578 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 579 if ((e = envelope_validate(ep)) == NULL) { 580 ep->id = evpid; 581 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 582 queue_envelope_cache_add(ep); 583 stat_increment("queue.evpcache.load.missed", 1); 584 } 585 return (1); 586 } 587 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 588 evpid, e); 589 } 590 return (0); 591 } 592 593 int 594 queue_envelope_update(struct envelope *ep) 595 { 596 char evpbuf[sizeof(struct envelope)]; 597 size_t evplen; 598 int r; 599 600 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 601 if (evplen == 0) 602 return (0); 603 604 profile_enter("queue_envelope_update"); 605 r = handler_envelope_update(ep->id, evpbuf, evplen); 606 profile_leave(); 607 608 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 609 queue_envelope_cache_update(ep); 610 611 log_trace(TRACE_QUEUE, 612 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 613 ep->id, r); 614 615 return (r); 616 } 617 618 int 619 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data) 620 { 621 char evpbuf[sizeof(struct envelope)]; 622 uint64_t evpid; 623 int r; 624 const char *e; 625 626 profile_enter("queue_message_walk"); 627 r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf, 628 msgid, done, data); 629 profile_leave(); 630 631 log_trace(TRACE_QUEUE, 632 "queue-backend: queue_message_walk() -> %d (%016"PRIx64")", 633 r, evpid); 634 635 if (r == -1) 636 return (r); 637 638 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 639 if ((e = envelope_validate(ep)) == NULL) { 640 ep->id = evpid; 641 /* 642 * do not cache the envelope here, while discovering 643 * envelopes one could re-run discover on already 644 * scheduled envelopes which leads to triggering of 645 * strict checks in caching. Envelopes could anyway 646 * be loaded from backend if it isn't cached. 647 */ 648 return (1); 649 } 650 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 651 evpid, e); 652 } 653 return (0); 654 } 655 656 int 657 queue_envelope_walk(struct envelope *ep) 658 { 659 const char *e; 660 uint64_t evpid; 661 char evpbuf[sizeof(struct envelope)]; 662 int r; 663 664 profile_enter("queue_envelope_walk"); 665 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 666 profile_leave(); 667 668 log_trace(TRACE_QUEUE, 669 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 670 r, evpid); 671 672 if (r == -1) 673 return (r); 674 675 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 676 if ((e = envelope_validate(ep)) == NULL) { 677 ep->id = evpid; 678 if (env->sc_queue_flags & QUEUE_EVPCACHE) 679 queue_envelope_cache_add(ep); 680 return (1); 681 } 682 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 683 evpid, e); 684 } 685 return (0); 686 } 687 688 uint32_t 689 queue_generate_msgid(void) 690 { 691 uint32_t msgid; 692 693 while ((msgid = arc4random()) == 0) 694 ; 695 696 return msgid; 697 } 698 699 uint64_t 700 queue_generate_evpid(uint32_t msgid) 701 { 702 uint32_t rnd; 703 uint64_t evpid; 704 705 while ((rnd = arc4random()) == 0) 706 ; 707 708 evpid = msgid; 709 evpid <<= 32; 710 evpid |= rnd; 711 712 return evpid; 713 } 714 715 static const char* 716 envelope_validate(struct envelope *ep) 717 { 718 if (ep->version != SMTPD_ENVELOPE_VERSION) 719 return "version mismatch"; 720 721 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 722 return "invalid helo"; 723 if (ep->helo[0] == '\0') 724 return "empty helo"; 725 726 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 727 return "invalid hostname"; 728 if (ep->hostname[0] == '\0') 729 return "empty hostname"; 730 731 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 732 return "invalid error line"; 733 734 return NULL; 735 } 736 737 void 738 queue_api_on_close(int(*cb)(void)) 739 { 740 handler_close = cb; 741 } 742 743 void 744 queue_api_on_message_create(int(*cb)(uint32_t *)) 745 { 746 handler_message_create = cb; 747 } 748 749 void 750 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 751 { 752 handler_message_commit = cb; 753 } 754 755 void 756 queue_api_on_message_delete(int(*cb)(uint32_t)) 757 { 758 handler_message_delete = cb; 759 } 760 761 void 762 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 763 { 764 handler_message_fd_r = cb; 765 } 766 767 void 768 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 769 { 770 handler_envelope_create = cb; 771 } 772 773 void 774 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 775 { 776 handler_envelope_delete = cb; 777 } 778 779 void 780 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 781 { 782 handler_envelope_update = cb; 783 } 784 785 void 786 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 787 { 788 handler_envelope_load = cb; 789 } 790 791 void 792 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 793 { 794 handler_envelope_walk = cb; 795 } 796 797 void 798 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t, 799 uint32_t, int *, void **)) 800 { 801 handler_message_walk = cb; 802 } 803