1 /* $OpenBSD: queue_backend.c,v 1.59 2015/11/05 09:14:31 sunil Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <grp.h> 31 #include <imsg.h> 32 #include <limits.h> 33 #include <inttypes.h> 34 #include <libgen.h> 35 #include <pwd.h> 36 #include <stdio.h> 37 #include <stdlib.h> 38 #include <string.h> 39 #include <time.h> 40 #include <unistd.h> 41 42 #include "smtpd.h" 43 #include "log.h" 44 45 static const char* envelope_validate(struct envelope *); 46 47 extern struct queue_backend queue_backend_fs; 48 extern struct queue_backend queue_backend_null; 49 extern struct queue_backend queue_backend_proc; 50 extern struct queue_backend queue_backend_ram; 51 52 static void queue_envelope_cache_add(struct envelope *); 53 static void queue_envelope_cache_update(struct envelope *); 54 static void queue_envelope_cache_del(uint64_t evpid); 55 56 TAILQ_HEAD(evplst, envelope); 57 58 static struct tree evpcache_tree; 59 static struct evplst evpcache_list; 60 static struct queue_backend *backend; 61 62 static int (*handler_close)(void); 63 static int (*handler_message_create)(uint32_t *); 64 static int (*handler_message_commit)(uint32_t, const char*); 65 static int (*handler_message_delete)(uint32_t); 66 static int (*handler_message_fd_r)(uint32_t); 67 static int (*handler_message_corrupt)(uint32_t); 68 static int (*handler_message_uncorrupt)(uint32_t); 69 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 70 static int (*handler_envelope_delete)(uint64_t); 71 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 72 static int (*handler_envelope_load)(uint64_t, char *, size_t); 73 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 74 static int (*handler_message_walk)(uint64_t *, char *, size_t, 75 uint32_t, int *, void **); 76 77 #ifdef QUEUE_PROFILING 78 79 static struct { 80 struct timespec t0; 81 const char *name; 82 } profile; 83 84 static inline void profile_enter(const char *name) 85 { 86 if ((profiling & PROFILE_QUEUE) == 0) 87 return; 88 89 profile.name = name; 90 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 91 } 92 93 static inline void profile_leave(void) 94 { 95 struct timespec t1, dt; 96 97 if ((profiling & PROFILE_QUEUE) == 0) 98 return; 99 100 clock_gettime(CLOCK_MONOTONIC, &t1); 101 timespecsub(&t1, &profile.t0, &dt); 102 log_debug("profile-queue: %s %lld.%09ld", profile.name, 103 (long long)dt.tv_sec, dt.tv_nsec); 104 } 105 #else 106 #define profile_enter(x) do {} while (0) 107 #define profile_leave() do {} while (0) 108 #endif 109 110 static int 111 queue_message_path(uint32_t msgid, char *buf, size_t len) 112 { 113 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 114 } 115 116 int 117 queue_init(const char *name, int server) 118 { 119 struct passwd *pwq; 120 struct group *gr; 121 int r; 122 123 pwq = getpwnam(SMTPD_QUEUE_USER); 124 if (pwq == NULL) 125 errx(1, "unknown user %s", SMTPD_QUEUE_USER); 126 127 gr = getgrnam(SMTPD_QUEUE_GROUP); 128 if (gr == NULL) 129 errx(1, "unknown group %s", SMTPD_QUEUE_GROUP); 130 131 tree_init(&evpcache_tree); 132 TAILQ_INIT(&evpcache_list); 133 134 if (!strcmp(name, "fs")) 135 backend = &queue_backend_fs; 136 else if (!strcmp(name, "null")) 137 backend = &queue_backend_null; 138 else if (!strcmp(name, "ram")) 139 backend = &queue_backend_ram; 140 else 141 backend = &queue_backend_proc; 142 143 if (server) { 144 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 145 errx(1, "error in spool directory setup"); 146 if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) 147 errx(1, "error in offline directory setup"); 148 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 149 errx(1, "error in purge directory setup"); 150 151 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 152 153 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 154 errx(1, "error in purge directory setup"); 155 } 156 157 r = backend->init(pwq, server, name); 158 159 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 160 161 return (r); 162 } 163 164 int 165 queue_close(void) 166 { 167 if (handler_close) 168 return (handler_close()); 169 170 return (1); 171 } 172 173 int 174 queue_message_create(uint32_t *msgid) 175 { 176 int r; 177 178 profile_enter("queue_message_create"); 179 r = handler_message_create(msgid); 180 profile_leave(); 181 182 log_trace(TRACE_QUEUE, 183 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 184 r, *msgid); 185 186 return (r); 187 } 188 189 int 190 queue_message_delete(uint32_t msgid) 191 { 192 char msgpath[PATH_MAX]; 193 int r; 194 195 profile_enter("queue_message_delete"); 196 r = handler_message_delete(msgid); 197 profile_leave(); 198 199 /* in case the message is incoming */ 200 queue_message_path(msgid, msgpath, sizeof(msgpath)); 201 unlink(msgpath); 202 203 log_trace(TRACE_QUEUE, 204 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 205 206 return (r); 207 } 208 209 int 210 queue_message_commit(uint32_t msgid) 211 { 212 int r; 213 char msgpath[PATH_MAX]; 214 char tmppath[PATH_MAX]; 215 FILE *ifp = NULL; 216 FILE *ofp = NULL; 217 218 profile_enter("queue_message_commit"); 219 220 queue_message_path(msgid, msgpath, sizeof(msgpath)); 221 222 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 223 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 224 ifp = fopen(msgpath, "r"); 225 ofp = fopen(tmppath, "w+"); 226 if (ifp == NULL || ofp == NULL) 227 goto err; 228 if (! compress_file(ifp, ofp)) 229 goto err; 230 fclose(ifp); 231 fclose(ofp); 232 ifp = NULL; 233 ofp = NULL; 234 235 if (rename(tmppath, msgpath) == -1) { 236 if (errno == ENOSPC) 237 return (0); 238 unlink(tmppath); 239 log_warn("rename"); 240 return (0); 241 } 242 } 243 244 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 245 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 246 ifp = fopen(msgpath, "r"); 247 ofp = fopen(tmppath, "w+"); 248 if (ifp == NULL || ofp == NULL) 249 goto err; 250 if (! crypto_encrypt_file(ifp, ofp)) 251 goto err; 252 fclose(ifp); 253 fclose(ofp); 254 ifp = NULL; 255 ofp = NULL; 256 257 if (rename(tmppath, msgpath) == -1) { 258 if (errno == ENOSPC) 259 return (0); 260 unlink(tmppath); 261 log_warn("rename"); 262 return (0); 263 } 264 } 265 266 r = handler_message_commit(msgid, msgpath); 267 profile_leave(); 268 269 /* in case it's not done by the backend */ 270 unlink(msgpath); 271 272 log_trace(TRACE_QUEUE, 273 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 274 msgid, r); 275 276 return (r); 277 278 err: 279 if (ifp) 280 fclose(ifp); 281 if (ofp) 282 fclose(ofp); 283 return 0; 284 } 285 286 int 287 queue_message_corrupt(uint32_t msgid) 288 { 289 int r; 290 291 profile_enter("queue_message_corrupt"); 292 r = handler_message_corrupt(msgid); 293 profile_leave(); 294 295 log_trace(TRACE_QUEUE, 296 "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r); 297 298 return (r); 299 } 300 301 int 302 queue_message_uncorrupt(uint32_t msgid) 303 { 304 return handler_message_uncorrupt(msgid); 305 } 306 307 int 308 queue_message_fd_r(uint32_t msgid) 309 { 310 int fdin = -1, fdout = -1, fd = -1; 311 FILE *ifp = NULL; 312 FILE *ofp = NULL; 313 314 profile_enter("queue_message_fd_r"); 315 fdin = handler_message_fd_r(msgid); 316 profile_leave(); 317 318 log_trace(TRACE_QUEUE, 319 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 320 321 if (fdin == -1) 322 return (-1); 323 324 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 325 if ((fdout = mktmpfile()) == -1) 326 goto err; 327 if ((fd = dup(fdout)) == -1) 328 goto err; 329 if ((ifp = fdopen(fdin, "r")) == NULL) 330 goto err; 331 fdin = fd; 332 fd = -1; 333 if ((ofp = fdopen(fdout, "w+")) == NULL) 334 goto err; 335 336 if (! crypto_decrypt_file(ifp, ofp)) 337 goto err; 338 339 fclose(ifp); 340 ifp = NULL; 341 fclose(ofp); 342 ofp = NULL; 343 lseek(fdin, SEEK_SET, 0); 344 } 345 346 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 347 if ((fdout = mktmpfile()) == -1) 348 goto err; 349 if ((fd = dup(fdout)) == -1) 350 goto err; 351 if ((ifp = fdopen(fdin, "r")) == NULL) 352 goto err; 353 fdin = fd; 354 fd = -1; 355 if ((ofp = fdopen(fdout, "w+")) == NULL) 356 goto err; 357 358 if (! uncompress_file(ifp, ofp)) 359 goto err; 360 361 fclose(ifp); 362 ifp = NULL; 363 fclose(ofp); 364 ofp = NULL; 365 lseek(fdin, SEEK_SET, 0); 366 } 367 368 return (fdin); 369 370 err: 371 if (fd != -1) 372 close(fd); 373 if (fdin != -1) 374 close(fdin); 375 if (fdout != -1) 376 close(fdout); 377 if (ifp) 378 fclose(ifp); 379 if (ofp) 380 fclose(ofp); 381 return -1; 382 } 383 384 int 385 queue_message_fd_rw(uint32_t msgid) 386 { 387 char buf[PATH_MAX]; 388 389 queue_message_path(msgid, buf, sizeof(buf)); 390 391 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 392 } 393 394 static int 395 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 396 { 397 char *evp; 398 size_t evplen; 399 size_t complen; 400 char compbuf[sizeof(struct envelope)]; 401 size_t enclen; 402 char encbuf[sizeof(struct envelope)]; 403 404 evp = evpbuf; 405 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 406 if (evplen == 0) 407 return (0); 408 409 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 410 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 411 if (complen == 0) 412 return (0); 413 evp = compbuf; 414 evplen = complen; 415 } 416 417 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 418 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 419 if (enclen == 0) 420 return (0); 421 evp = encbuf; 422 evplen = enclen; 423 } 424 425 memmove(evpbuf, evp, evplen); 426 427 return (evplen); 428 } 429 430 static int 431 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 432 { 433 char *evp; 434 size_t evplen; 435 char compbuf[sizeof(struct envelope)]; 436 size_t complen; 437 char encbuf[sizeof(struct envelope)]; 438 size_t enclen; 439 440 evp = evpbuf; 441 evplen = evpbufsize; 442 443 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 444 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 445 if (enclen == 0) 446 return (0); 447 evp = encbuf; 448 evplen = enclen; 449 } 450 451 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 452 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 453 if (complen == 0) 454 return (0); 455 evp = compbuf; 456 evplen = complen; 457 } 458 459 return (envelope_load_buffer(ep, evp, evplen)); 460 } 461 462 static void 463 queue_envelope_cache_add(struct envelope *e) 464 { 465 struct envelope *cached; 466 467 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 468 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 469 470 cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); 471 *cached = *e; 472 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 473 tree_xset(&evpcache_tree, e->id, cached); 474 stat_increment("queue.evpcache.size", 1); 475 } 476 477 static void 478 queue_envelope_cache_update(struct envelope *e) 479 { 480 struct envelope *cached; 481 482 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 483 queue_envelope_cache_add(e); 484 stat_increment("queue.evpcache.update.missed", 1); 485 } else { 486 TAILQ_REMOVE(&evpcache_list, cached, entry); 487 *cached = *e; 488 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 489 stat_increment("queue.evpcache.update.hit", 1); 490 } 491 } 492 493 static void 494 queue_envelope_cache_del(uint64_t evpid) 495 { 496 struct envelope *cached; 497 498 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 499 return; 500 501 TAILQ_REMOVE(&evpcache_list, cached, entry); 502 free(cached); 503 stat_decrement("queue.evpcache.size", 1); 504 } 505 506 int 507 queue_envelope_create(struct envelope *ep) 508 { 509 int r; 510 char evpbuf[sizeof(struct envelope)]; 511 size_t evplen; 512 uint64_t evpid; 513 uint32_t msgid; 514 515 ep->creation = time(NULL); 516 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 517 if (evplen == 0) 518 return (0); 519 520 evpid = ep->id; 521 msgid = evpid_to_msgid(evpid); 522 523 profile_enter("queue_envelope_create"); 524 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 525 profile_leave(); 526 527 log_trace(TRACE_QUEUE, 528 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 529 evpid, evplen, r, ep->id); 530 531 if (!r) { 532 ep->creation = 0; 533 ep->id = 0; 534 } 535 536 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 537 queue_envelope_cache_add(ep); 538 539 return (r); 540 } 541 542 int 543 queue_envelope_delete(uint64_t evpid) 544 { 545 int r; 546 547 if (env->sc_queue_flags & QUEUE_EVPCACHE) 548 queue_envelope_cache_del(evpid); 549 550 profile_enter("queue_envelope_delete"); 551 r = handler_envelope_delete(evpid); 552 profile_leave(); 553 554 log_trace(TRACE_QUEUE, 555 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 556 evpid, r); 557 558 return (r); 559 } 560 561 int 562 queue_envelope_load(uint64_t evpid, struct envelope *ep) 563 { 564 const char *e; 565 char evpbuf[sizeof(struct envelope)]; 566 size_t evplen; 567 struct envelope *cached; 568 569 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 570 (cached = tree_get(&evpcache_tree, evpid))) { 571 *ep = *cached; 572 stat_increment("queue.evpcache.load.hit", 1); 573 return (1); 574 } 575 576 ep->id = evpid; 577 profile_enter("queue_envelope_load"); 578 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 579 profile_leave(); 580 581 log_trace(TRACE_QUEUE, 582 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 583 evpid, evplen); 584 585 if (evplen == 0) 586 return (0); 587 588 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 589 if ((e = envelope_validate(ep)) == NULL) { 590 ep->id = evpid; 591 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 592 queue_envelope_cache_add(ep); 593 stat_increment("queue.evpcache.load.missed", 1); 594 } 595 return (1); 596 } 597 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 598 ep->id, e); 599 } 600 601 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 602 return (0); 603 } 604 605 int 606 queue_envelope_update(struct envelope *ep) 607 { 608 char evpbuf[sizeof(struct envelope)]; 609 size_t evplen; 610 int r; 611 612 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 613 if (evplen == 0) 614 return (0); 615 616 profile_enter("queue_envelope_update"); 617 r = handler_envelope_update(ep->id, evpbuf, evplen); 618 profile_leave(); 619 620 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 621 queue_envelope_cache_update(ep); 622 623 log_trace(TRACE_QUEUE, 624 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 625 ep->id, r); 626 627 return (r); 628 } 629 630 int 631 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data) 632 { 633 char evpbuf[sizeof(struct envelope)]; 634 uint64_t evpid; 635 int r; 636 const char *e; 637 638 profile_enter("queue_message_walk"); 639 r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf, 640 msgid, done, data); 641 profile_leave(); 642 643 log_trace(TRACE_QUEUE, 644 "queue-backend: queue_message_walk() -> %d (%016"PRIx64")", 645 r, evpid); 646 647 if (r == -1) 648 return (r); 649 650 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 651 if ((e = envelope_validate(ep)) == NULL) { 652 ep->id = evpid; 653 /* 654 * do not cache the envelope here, while discovering 655 * envelopes one could re-run discover on already 656 * scheduled envelopes which leads to triggering of 657 * strict checks in caching. Envelopes could anyway 658 * be loaded from backend if it isn't cached. 659 */ 660 return (1); 661 } 662 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 663 ep->id, e); 664 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 665 } 666 667 return (0); 668 } 669 670 int 671 queue_envelope_walk(struct envelope *ep) 672 { 673 const char *e; 674 uint64_t evpid; 675 char evpbuf[sizeof(struct envelope)]; 676 int r; 677 678 profile_enter("queue_envelope_walk"); 679 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 680 profile_leave(); 681 682 log_trace(TRACE_QUEUE, 683 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 684 r, evpid); 685 686 if (r == -1) 687 return (r); 688 689 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 690 if ((e = envelope_validate(ep)) == NULL) { 691 ep->id = evpid; 692 if (env->sc_queue_flags & QUEUE_EVPCACHE) 693 queue_envelope_cache_add(ep); 694 return (1); 695 } 696 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 697 ep->id, e); 698 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 699 } 700 701 return (0); 702 } 703 704 uint32_t 705 queue_generate_msgid(void) 706 { 707 uint32_t msgid; 708 709 while ((msgid = arc4random()) == 0) 710 ; 711 712 return msgid; 713 } 714 715 uint64_t 716 queue_generate_evpid(uint32_t msgid) 717 { 718 uint32_t rnd; 719 uint64_t evpid; 720 721 while ((rnd = arc4random()) == 0) 722 ; 723 724 evpid = msgid; 725 evpid <<= 32; 726 evpid |= rnd; 727 728 return evpid; 729 } 730 731 static const char* 732 envelope_validate(struct envelope *ep) 733 { 734 if (ep->version != SMTPD_ENVELOPE_VERSION) 735 return "version mismatch"; 736 737 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 738 return "invalid helo"; 739 if (ep->helo[0] == '\0') 740 return "empty helo"; 741 742 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 743 return "invalid hostname"; 744 if (ep->hostname[0] == '\0') 745 return "empty hostname"; 746 747 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 748 return "invalid error line"; 749 750 return NULL; 751 } 752 753 void 754 queue_api_on_close(int(*cb)(void)) 755 { 756 handler_close = cb; 757 } 758 759 void 760 queue_api_on_message_create(int(*cb)(uint32_t *)) 761 { 762 handler_message_create = cb; 763 } 764 765 void 766 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 767 { 768 handler_message_commit = cb; 769 } 770 771 void 772 queue_api_on_message_delete(int(*cb)(uint32_t)) 773 { 774 handler_message_delete = cb; 775 } 776 777 void 778 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 779 { 780 handler_message_fd_r = cb; 781 } 782 783 void 784 queue_api_on_message_corrupt(int(*cb)(uint32_t)) 785 { 786 handler_message_corrupt = cb; 787 } 788 789 void 790 queue_api_on_message_uncorrupt(int(*cb)(uint32_t)) 791 { 792 handler_message_uncorrupt = cb; 793 } 794 795 void 796 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 797 { 798 handler_envelope_create = cb; 799 } 800 801 void 802 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 803 { 804 handler_envelope_delete = cb; 805 } 806 807 void 808 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 809 { 810 handler_envelope_update = cb; 811 } 812 813 void 814 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 815 { 816 handler_envelope_load = cb; 817 } 818 819 void 820 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 821 { 822 handler_envelope_walk = cb; 823 } 824 825 void 826 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t, 827 uint32_t, int *, void **)) 828 { 829 handler_message_walk = cb; 830 } 831