1 /* $OpenBSD: queue_backend.c,v 1.62 2016/02/04 12:46:28 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <grp.h> 31 #include <imsg.h> 32 #include <limits.h> 33 #include <inttypes.h> 34 #include <libgen.h> 35 #include <pwd.h> 36 #include <stdio.h> 37 #include <stdlib.h> 38 #include <string.h> 39 #include <time.h> 40 #include <unistd.h> 41 42 #include "smtpd.h" 43 #include "log.h" 44 45 static const char* envelope_validate(struct envelope *); 46 47 extern struct queue_backend queue_backend_fs; 48 extern struct queue_backend queue_backend_null; 49 extern struct queue_backend queue_backend_proc; 50 extern struct queue_backend queue_backend_ram; 51 52 static void queue_envelope_cache_add(struct envelope *); 53 static void queue_envelope_cache_update(struct envelope *); 54 static void queue_envelope_cache_del(uint64_t evpid); 55 56 TAILQ_HEAD(evplst, envelope); 57 58 static struct tree evpcache_tree; 59 static struct evplst evpcache_list; 60 static struct queue_backend *backend; 61 62 static int (*handler_close)(void); 63 static int (*handler_message_create)(uint32_t *); 64 static int (*handler_message_commit)(uint32_t, const char*); 65 static int (*handler_message_delete)(uint32_t); 66 static int (*handler_message_fd_r)(uint32_t); 67 static int (*handler_message_corrupt)(uint32_t); 68 static int (*handler_message_uncorrupt)(uint32_t); 69 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 70 static int (*handler_envelope_delete)(uint64_t); 71 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 72 static int (*handler_envelope_load)(uint64_t, char *, size_t); 73 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 74 static int (*handler_message_walk)(uint64_t *, char *, size_t, 75 uint32_t, int *, void **); 76 77 #ifdef QUEUE_PROFILING 78 79 static struct { 80 struct timespec t0; 81 const char *name; 82 } profile; 83 84 static inline void profile_enter(const char *name) 85 { 86 if ((profiling & PROFILE_QUEUE) == 0) 87 return; 88 89 profile.name = name; 90 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 91 } 92 93 static inline void profile_leave(void) 94 { 95 struct timespec t1, dt; 96 97 if ((profiling & PROFILE_QUEUE) == 0) 98 return; 99 100 clock_gettime(CLOCK_MONOTONIC, &t1); 101 timespecsub(&t1, &profile.t0, &dt); 102 log_debug("profile-queue: %s %lld.%09ld", profile.name, 103 (long long)dt.tv_sec, dt.tv_nsec); 104 } 105 #else 106 #define profile_enter(x) do {} while (0) 107 #define profile_leave() do {} while (0) 108 #endif 109 110 static int 111 queue_message_path(uint32_t msgid, char *buf, size_t len) 112 { 113 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 114 } 115 116 int 117 queue_init(const char *name, int server) 118 { 119 struct passwd *pwq; 120 struct group *gr; 121 int r; 122 123 pwq = getpwnam(SMTPD_QUEUE_USER); 124 if (pwq == NULL) 125 errx(1, "unknown user %s", SMTPD_QUEUE_USER); 126 127 gr = getgrnam(SMTPD_QUEUE_GROUP); 128 if (gr == NULL) 129 errx(1, "unknown group %s", SMTPD_QUEUE_GROUP); 130 131 tree_init(&evpcache_tree); 132 TAILQ_INIT(&evpcache_list); 133 134 if (!strcmp(name, "fs")) 135 backend = &queue_backend_fs; 136 else if (!strcmp(name, "null")) 137 backend = &queue_backend_null; 138 else if (!strcmp(name, "ram")) 139 backend = &queue_backend_ram; 140 else 141 backend = &queue_backend_proc; 142 143 if (server) { 144 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 145 errx(1, "error in spool directory setup"); 146 if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) 147 errx(1, "error in offline directory setup"); 148 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 149 errx(1, "error in purge directory setup"); 150 151 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 152 153 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 154 errx(1, "error in purge directory setup"); 155 } 156 157 r = backend->init(pwq, server, name); 158 159 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 160 161 return (r); 162 } 163 164 int 165 queue_close(void) 166 { 167 if (handler_close) 168 return (handler_close()); 169 170 return (1); 171 } 172 173 int 174 queue_message_create(uint32_t *msgid) 175 { 176 int r; 177 178 profile_enter("queue_message_create"); 179 r = handler_message_create(msgid); 180 profile_leave(); 181 182 log_trace(TRACE_QUEUE, 183 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 184 r, *msgid); 185 186 return (r); 187 } 188 189 int 190 queue_message_delete(uint32_t msgid) 191 { 192 char msgpath[PATH_MAX]; 193 uint64_t evpid; 194 void *iter; 195 int r; 196 197 profile_enter("queue_message_delete"); 198 r = handler_message_delete(msgid); 199 profile_leave(); 200 201 /* in case the message is incoming */ 202 queue_message_path(msgid, msgpath, sizeof(msgpath)); 203 unlink(msgpath); 204 205 /* remove remaining envelopes from the cache if any (on rollback) */ 206 evpid = msgid_to_evpid(msgid); 207 for (;;) { 208 iter = NULL; 209 if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL)) 210 break; 211 if (evpid_to_msgid(evpid) != msgid) 212 break; 213 queue_envelope_cache_del(evpid); 214 } 215 216 log_trace(TRACE_QUEUE, 217 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 218 219 return (r); 220 } 221 222 int 223 queue_message_commit(uint32_t msgid) 224 { 225 int r; 226 char msgpath[PATH_MAX]; 227 char tmppath[PATH_MAX]; 228 FILE *ifp = NULL; 229 FILE *ofp = NULL; 230 231 profile_enter("queue_message_commit"); 232 233 queue_message_path(msgid, msgpath, sizeof(msgpath)); 234 235 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 236 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 237 ifp = fopen(msgpath, "r"); 238 ofp = fopen(tmppath, "w+"); 239 if (ifp == NULL || ofp == NULL) 240 goto err; 241 if (!compress_file(ifp, ofp)) 242 goto err; 243 fclose(ifp); 244 fclose(ofp); 245 ifp = NULL; 246 ofp = NULL; 247 248 if (rename(tmppath, msgpath) == -1) { 249 if (errno == ENOSPC) 250 return (0); 251 unlink(tmppath); 252 log_warn("rename"); 253 return (0); 254 } 255 } 256 257 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 258 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 259 ifp = fopen(msgpath, "r"); 260 ofp = fopen(tmppath, "w+"); 261 if (ifp == NULL || ofp == NULL) 262 goto err; 263 if (!crypto_encrypt_file(ifp, ofp)) 264 goto err; 265 fclose(ifp); 266 fclose(ofp); 267 ifp = NULL; 268 ofp = NULL; 269 270 if (rename(tmppath, msgpath) == -1) { 271 if (errno == ENOSPC) 272 return (0); 273 unlink(tmppath); 274 log_warn("rename"); 275 return (0); 276 } 277 } 278 279 r = handler_message_commit(msgid, msgpath); 280 profile_leave(); 281 282 /* in case it's not done by the backend */ 283 unlink(msgpath); 284 285 log_trace(TRACE_QUEUE, 286 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 287 msgid, r); 288 289 return (r); 290 291 err: 292 if (ifp) 293 fclose(ifp); 294 if (ofp) 295 fclose(ofp); 296 return 0; 297 } 298 299 int 300 queue_message_corrupt(uint32_t msgid) 301 { 302 int r; 303 304 profile_enter("queue_message_corrupt"); 305 r = handler_message_corrupt(msgid); 306 profile_leave(); 307 308 log_trace(TRACE_QUEUE, 309 "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r); 310 311 return (r); 312 } 313 314 int 315 queue_message_uncorrupt(uint32_t msgid) 316 { 317 return handler_message_uncorrupt(msgid); 318 } 319 320 int 321 queue_message_fd_r(uint32_t msgid) 322 { 323 int fdin = -1, fdout = -1, fd = -1; 324 FILE *ifp = NULL; 325 FILE *ofp = NULL; 326 327 profile_enter("queue_message_fd_r"); 328 fdin = handler_message_fd_r(msgid); 329 profile_leave(); 330 331 log_trace(TRACE_QUEUE, 332 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 333 334 if (fdin == -1) 335 return (-1); 336 337 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 338 if ((fdout = mktmpfile()) == -1) 339 goto err; 340 if ((fd = dup(fdout)) == -1) 341 goto err; 342 if ((ifp = fdopen(fdin, "r")) == NULL) 343 goto err; 344 fdin = fd; 345 fd = -1; 346 if ((ofp = fdopen(fdout, "w+")) == NULL) 347 goto err; 348 349 if (!crypto_decrypt_file(ifp, ofp)) 350 goto err; 351 352 fclose(ifp); 353 ifp = NULL; 354 fclose(ofp); 355 ofp = NULL; 356 lseek(fdin, SEEK_SET, 0); 357 } 358 359 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 360 if ((fdout = mktmpfile()) == -1) 361 goto err; 362 if ((fd = dup(fdout)) == -1) 363 goto err; 364 if ((ifp = fdopen(fdin, "r")) == NULL) 365 goto err; 366 fdin = fd; 367 fd = -1; 368 if ((ofp = fdopen(fdout, "w+")) == NULL) 369 goto err; 370 371 if (!uncompress_file(ifp, ofp)) 372 goto err; 373 374 fclose(ifp); 375 ifp = NULL; 376 fclose(ofp); 377 ofp = NULL; 378 lseek(fdin, SEEK_SET, 0); 379 } 380 381 return (fdin); 382 383 err: 384 if (fd != -1) 385 close(fd); 386 if (fdin != -1) 387 close(fdin); 388 if (fdout != -1) 389 close(fdout); 390 if (ifp) 391 fclose(ifp); 392 if (ofp) 393 fclose(ofp); 394 return -1; 395 } 396 397 int 398 queue_message_fd_rw(uint32_t msgid) 399 { 400 char buf[PATH_MAX]; 401 402 queue_message_path(msgid, buf, sizeof(buf)); 403 404 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 405 } 406 407 static int 408 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 409 { 410 char *evp; 411 size_t evplen; 412 size_t complen; 413 char compbuf[sizeof(struct envelope)]; 414 size_t enclen; 415 char encbuf[sizeof(struct envelope)]; 416 417 evp = evpbuf; 418 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 419 if (evplen == 0) 420 return (0); 421 422 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 423 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 424 if (complen == 0) 425 return (0); 426 evp = compbuf; 427 evplen = complen; 428 } 429 430 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 431 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 432 if (enclen == 0) 433 return (0); 434 evp = encbuf; 435 evplen = enclen; 436 } 437 438 memmove(evpbuf, evp, evplen); 439 440 return (evplen); 441 } 442 443 static int 444 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 445 { 446 char *evp; 447 size_t evplen; 448 char compbuf[sizeof(struct envelope)]; 449 size_t complen; 450 char encbuf[sizeof(struct envelope)]; 451 size_t enclen; 452 453 evp = evpbuf; 454 evplen = evpbufsize; 455 456 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 457 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 458 if (enclen == 0) 459 return (0); 460 evp = encbuf; 461 evplen = enclen; 462 } 463 464 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 465 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 466 if (complen == 0) 467 return (0); 468 evp = compbuf; 469 evplen = complen; 470 } 471 472 return (envelope_load_buffer(ep, evp, evplen)); 473 } 474 475 static void 476 queue_envelope_cache_add(struct envelope *e) 477 { 478 struct envelope *cached; 479 480 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 481 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 482 483 cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); 484 *cached = *e; 485 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 486 tree_xset(&evpcache_tree, e->id, cached); 487 stat_increment("queue.evpcache.size", 1); 488 } 489 490 static void 491 queue_envelope_cache_update(struct envelope *e) 492 { 493 struct envelope *cached; 494 495 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 496 queue_envelope_cache_add(e); 497 stat_increment("queue.evpcache.update.missed", 1); 498 } else { 499 TAILQ_REMOVE(&evpcache_list, cached, entry); 500 *cached = *e; 501 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 502 stat_increment("queue.evpcache.update.hit", 1); 503 } 504 } 505 506 static void 507 queue_envelope_cache_del(uint64_t evpid) 508 { 509 struct envelope *cached; 510 511 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 512 return; 513 514 TAILQ_REMOVE(&evpcache_list, cached, entry); 515 free(cached); 516 stat_decrement("queue.evpcache.size", 1); 517 } 518 519 int 520 queue_envelope_create(struct envelope *ep) 521 { 522 int r; 523 char evpbuf[sizeof(struct envelope)]; 524 size_t evplen; 525 uint64_t evpid; 526 uint32_t msgid; 527 528 ep->creation = time(NULL); 529 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 530 if (evplen == 0) 531 return (0); 532 533 evpid = ep->id; 534 msgid = evpid_to_msgid(evpid); 535 536 profile_enter("queue_envelope_create"); 537 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 538 profile_leave(); 539 540 log_trace(TRACE_QUEUE, 541 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 542 evpid, evplen, r, ep->id); 543 544 if (!r) { 545 ep->creation = 0; 546 ep->id = 0; 547 } 548 549 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 550 queue_envelope_cache_add(ep); 551 552 return (r); 553 } 554 555 int 556 queue_envelope_delete(uint64_t evpid) 557 { 558 int r; 559 560 if (env->sc_queue_flags & QUEUE_EVPCACHE) 561 queue_envelope_cache_del(evpid); 562 563 profile_enter("queue_envelope_delete"); 564 r = handler_envelope_delete(evpid); 565 profile_leave(); 566 567 log_trace(TRACE_QUEUE, 568 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 569 evpid, r); 570 571 return (r); 572 } 573 574 int 575 queue_envelope_load(uint64_t evpid, struct envelope *ep) 576 { 577 const char *e; 578 char evpbuf[sizeof(struct envelope)]; 579 size_t evplen; 580 struct envelope *cached; 581 582 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 583 (cached = tree_get(&evpcache_tree, evpid))) { 584 *ep = *cached; 585 stat_increment("queue.evpcache.load.hit", 1); 586 return (1); 587 } 588 589 ep->id = evpid; 590 profile_enter("queue_envelope_load"); 591 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 592 profile_leave(); 593 594 log_trace(TRACE_QUEUE, 595 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 596 evpid, evplen); 597 598 if (evplen == 0) 599 return (0); 600 601 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 602 if ((e = envelope_validate(ep)) == NULL) { 603 ep->id = evpid; 604 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 605 queue_envelope_cache_add(ep); 606 stat_increment("queue.evpcache.load.missed", 1); 607 } 608 return (1); 609 } 610 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 611 ep->id, e); 612 } 613 614 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 615 return (0); 616 } 617 618 int 619 queue_envelope_update(struct envelope *ep) 620 { 621 char evpbuf[sizeof(struct envelope)]; 622 size_t evplen; 623 int r; 624 625 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 626 if (evplen == 0) 627 return (0); 628 629 profile_enter("queue_envelope_update"); 630 r = handler_envelope_update(ep->id, evpbuf, evplen); 631 profile_leave(); 632 633 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 634 queue_envelope_cache_update(ep); 635 636 log_trace(TRACE_QUEUE, 637 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 638 ep->id, r); 639 640 return (r); 641 } 642 643 int 644 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data) 645 { 646 char evpbuf[sizeof(struct envelope)]; 647 uint64_t evpid; 648 int r; 649 const char *e; 650 651 profile_enter("queue_message_walk"); 652 r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf, 653 msgid, done, data); 654 profile_leave(); 655 656 log_trace(TRACE_QUEUE, 657 "queue-backend: queue_message_walk() -> %d (%016"PRIx64")", 658 r, evpid); 659 660 if (r == -1) 661 return (r); 662 663 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 664 if ((e = envelope_validate(ep)) == NULL) { 665 ep->id = evpid; 666 /* 667 * do not cache the envelope here, while discovering 668 * envelopes one could re-run discover on already 669 * scheduled envelopes which leads to triggering of 670 * strict checks in caching. Envelopes could anyway 671 * be loaded from backend if it isn't cached. 672 */ 673 return (1); 674 } 675 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 676 ep->id, e); 677 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 678 } 679 680 return (0); 681 } 682 683 int 684 queue_envelope_walk(struct envelope *ep) 685 { 686 const char *e; 687 uint64_t evpid; 688 char evpbuf[sizeof(struct envelope)]; 689 int r; 690 691 profile_enter("queue_envelope_walk"); 692 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 693 profile_leave(); 694 695 log_trace(TRACE_QUEUE, 696 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 697 r, evpid); 698 699 if (r == -1) 700 return (r); 701 702 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 703 if ((e = envelope_validate(ep)) == NULL) { 704 ep->id = evpid; 705 if (env->sc_queue_flags & QUEUE_EVPCACHE) 706 queue_envelope_cache_add(ep); 707 return (1); 708 } 709 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 710 ep->id, e); 711 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 712 } 713 714 return (0); 715 } 716 717 uint32_t 718 queue_generate_msgid(void) 719 { 720 uint32_t msgid; 721 722 while ((msgid = arc4random()) == 0) 723 ; 724 725 return msgid; 726 } 727 728 uint64_t 729 queue_generate_evpid(uint32_t msgid) 730 { 731 uint32_t rnd; 732 uint64_t evpid; 733 734 while ((rnd = arc4random()) == 0) 735 ; 736 737 evpid = msgid; 738 evpid <<= 32; 739 evpid |= rnd; 740 741 return evpid; 742 } 743 744 static const char* 745 envelope_validate(struct envelope *ep) 746 { 747 if (ep->version != SMTPD_ENVELOPE_VERSION) 748 return "version mismatch"; 749 750 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 751 return "invalid helo"; 752 if (ep->helo[0] == '\0') 753 return "empty helo"; 754 755 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 756 return "invalid hostname"; 757 if (ep->hostname[0] == '\0') 758 return "empty hostname"; 759 760 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 761 return "invalid error line"; 762 763 return NULL; 764 } 765 766 void 767 queue_api_on_close(int(*cb)(void)) 768 { 769 handler_close = cb; 770 } 771 772 void 773 queue_api_on_message_create(int(*cb)(uint32_t *)) 774 { 775 handler_message_create = cb; 776 } 777 778 void 779 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 780 { 781 handler_message_commit = cb; 782 } 783 784 void 785 queue_api_on_message_delete(int(*cb)(uint32_t)) 786 { 787 handler_message_delete = cb; 788 } 789 790 void 791 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 792 { 793 handler_message_fd_r = cb; 794 } 795 796 void 797 queue_api_on_message_corrupt(int(*cb)(uint32_t)) 798 { 799 handler_message_corrupt = cb; 800 } 801 802 void 803 queue_api_on_message_uncorrupt(int(*cb)(uint32_t)) 804 { 805 handler_message_uncorrupt = cb; 806 } 807 808 void 809 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 810 { 811 handler_envelope_create = cb; 812 } 813 814 void 815 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 816 { 817 handler_envelope_delete = cb; 818 } 819 820 void 821 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 822 { 823 handler_envelope_update = cb; 824 } 825 826 void 827 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 828 { 829 handler_envelope_load = cb; 830 } 831 832 void 833 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 834 { 835 handler_envelope_walk = cb; 836 } 837 838 void 839 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t, 840 uint32_t, int *, void **)) 841 { 842 handler_message_walk = cb; 843 } 844