xref: /openbsd-src/usr.sbin/smtpd/queue_backend.c (revision 98f67d1688f1b9fc1ab749b00f67fbe7f50e49a3)
1 /*	$OpenBSD: queue_backend.c,v 1.52 2014/07/08 15:45:32 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <imsg.h>
31 #include <inttypes.h>
32 #include <libgen.h>
33 #include <pwd.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 
40 #include "smtpd.h"
41 #include "log.h"
42 
43 static const char* envelope_validate(struct envelope *);
44 
45 extern struct queue_backend	queue_backend_fs;
46 extern struct queue_backend	queue_backend_null;
47 extern struct queue_backend	queue_backend_proc;
48 extern struct queue_backend	queue_backend_ram;
49 
50 static void queue_envelope_cache_add(struct envelope *);
51 static void queue_envelope_cache_update(struct envelope *);
52 static void queue_envelope_cache_del(uint64_t evpid);
53 
54 TAILQ_HEAD(evplst, envelope);
55 
56 static struct tree		evpcache_tree;
57 static struct evplst		evpcache_list;
58 static struct queue_backend	*backend;
59 
60 static int (*handler_close)(void);
61 static int (*handler_message_create)(uint32_t *);
62 static int (*handler_message_commit)(uint32_t, const char*);
63 static int (*handler_message_delete)(uint32_t);
64 static int (*handler_message_fd_r)(uint32_t);
65 static int (*handler_message_corrupt)(uint32_t);
66 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
67 static int (*handler_envelope_delete)(uint64_t);
68 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
69 static int (*handler_envelope_load)(uint64_t, char *, size_t);
70 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
71 
72 #ifdef QUEUE_PROFILING
73 
74 static struct {
75 	struct timespec	 t0;
76 	const char	*name;
77 } profile;
78 
79 static inline void profile_enter(const char *name)
80 {
81 	if ((profiling & PROFILE_QUEUE) == 0)
82 		return;
83 
84 	profile.name = name;
85 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
86 }
87 
88 static inline void profile_leave(void)
89 {
90 	struct timespec	 t1, dt;
91 
92 	if ((profiling & PROFILE_QUEUE) == 0)
93 		return;
94 
95 	clock_gettime(CLOCK_MONOTONIC, &t1);
96 	timespecsub(&t1, &profile.t0, &dt);
97 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
98 	    (long long)dt.tv_sec, dt.tv_nsec);
99 }
100 #else
101 #define profile_enter(x)	do {} while (0)
102 #define profile_leave()		do {} while (0)
103 #endif
104 
105 static int
106 queue_message_path(uint32_t msgid, char *buf, size_t len)
107 {
108 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
109 }
110 
111 int
112 queue_init(const char *name, int server)
113 {
114 	struct passwd	*pwq;
115 	int		 r;
116 
117 	pwq = getpwnam(SMTPD_QUEUE_USER);
118 	if (pwq == NULL)
119 		errx(1, "unknown user %s", SMTPD_QUEUE_USER);
120 
121 	tree_init(&evpcache_tree);
122 	TAILQ_INIT(&evpcache_list);
123 
124 	if (!strcmp(name, "fs"))
125 		backend = &queue_backend_fs;
126 	else if (!strcmp(name, "null"))
127 		backend = &queue_backend_null;
128 	else if (!strcmp(name, "ram"))
129 		backend = &queue_backend_ram;
130 	else
131 		backend = &queue_backend_proc;
132 
133 	if (server) {
134 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
135 			errx(1, "error in spool directory setup");
136 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 01777, 0, 0, 1) == 0)
137 			errx(1, "error in offline directory setup");
138 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
139 			errx(1, "error in purge directory setup");
140 
141 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
142 
143 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
144 			errx(1, "error in purge directory setup");
145 	}
146 
147 	r = backend->init(pwq, server, name);
148 
149 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
150 
151 	return (r);
152 }
153 
154 int
155 queue_close(void)
156 {
157 	if (handler_close)
158 		return (handler_close());
159 
160 	return (1);
161 }
162 
163 int
164 queue_message_create(uint32_t *msgid)
165 {
166 	int	r;
167 
168 	profile_enter("queue_message_create");
169 	r = handler_message_create(msgid);
170 	profile_leave();
171 
172 	log_trace(TRACE_QUEUE,
173 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
174 	    r, *msgid);
175 
176 	return (r);
177 }
178 
179 int
180 queue_message_delete(uint32_t msgid)
181 {
182 	char	msgpath[MAXPATHLEN];
183 	int	r;
184 
185 	profile_enter("queue_message_delete");
186 	r = handler_message_delete(msgid);
187 	profile_leave();
188 
189 	/* in case the message is incoming */
190 	queue_message_path(msgid, msgpath, sizeof(msgpath));
191 	unlink(msgpath);
192 
193 	log_trace(TRACE_QUEUE,
194 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
195 
196 	return (r);
197 }
198 
199 int
200 queue_message_commit(uint32_t msgid)
201 {
202 	int	r;
203 	char	msgpath[MAXPATHLEN];
204 	char	tmppath[MAXPATHLEN];
205 	FILE	*ifp = NULL;
206 	FILE	*ofp = NULL;
207 
208 	profile_enter("queue_message_commit");
209 
210 	queue_message_path(msgid, msgpath, sizeof(msgpath));
211 
212 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
213 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
214 		ifp = fopen(msgpath, "r");
215 		ofp = fopen(tmppath, "w+");
216 		if (ifp == NULL || ofp == NULL)
217 			goto err;
218 		if (! compress_file(ifp, ofp))
219 			goto err;
220 		fclose(ifp);
221 		fclose(ofp);
222 		ifp = NULL;
223 		ofp = NULL;
224 
225 		if (rename(tmppath, msgpath) == -1) {
226 			if (errno == ENOSPC)
227 				return (0);
228 			unlink(tmppath);
229 			log_warn("rename");
230 			return (0);
231 		}
232 	}
233 
234 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
235 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
236 		ifp = fopen(msgpath, "r");
237 		ofp = fopen(tmppath, "w+");
238 		if (ifp == NULL || ofp == NULL)
239 			goto err;
240 		if (! crypto_encrypt_file(ifp, ofp))
241 			goto err;
242 		fclose(ifp);
243 		fclose(ofp);
244 		ifp = NULL;
245 		ofp = NULL;
246 
247 		if (rename(tmppath, msgpath) == -1) {
248 			if (errno == ENOSPC)
249 				return (0);
250 			unlink(tmppath);
251 			log_warn("rename");
252 			return (0);
253 		}
254 	}
255 
256 	r = handler_message_commit(msgid, msgpath);
257 	profile_leave();
258 
259 	/* in case it's not done by the backend */
260 	unlink(msgpath);
261 
262 	log_trace(TRACE_QUEUE,
263 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
264 	    msgid, r);
265 
266 	return (r);
267 
268 err:
269 	if (ifp)
270 		fclose(ifp);
271 	if (ofp)
272 		fclose(ofp);
273 	return 0;
274 }
275 
276 int
277 queue_message_corrupt(uint32_t msgid)
278 {
279 	int	r;
280 
281 	profile_enter("queue_message_corrupt");
282 	r = handler_message_corrupt(msgid);
283 	profile_leave();
284 
285 	log_trace(TRACE_QUEUE,
286 	    "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r);
287 
288 	return (r);
289 }
290 
291 int
292 queue_message_fd_r(uint32_t msgid)
293 {
294 	int	fdin = -1, fdout = -1, fd = -1;
295 	FILE	*ifp = NULL;
296 	FILE	*ofp = NULL;
297 
298 	profile_enter("queue_message_fd_r");
299 	fdin = handler_message_fd_r(msgid);
300 	profile_leave();
301 
302 	log_trace(TRACE_QUEUE,
303 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
304 
305 	if (fdin == -1)
306 		return (-1);
307 
308 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
309 		if ((fdout = mktmpfile()) == -1)
310 			goto err;
311 		if ((fd = dup(fdout)) == -1)
312 			goto err;
313 		if ((ifp = fdopen(fdin, "r")) == NULL)
314 			goto err;
315 		fdin = fd;
316 		fd = -1;
317 		if ((ofp = fdopen(fdout, "w+")) == NULL)
318 			goto err;
319 
320 		if (! crypto_decrypt_file(ifp, ofp))
321 			goto err;
322 
323 		fclose(ifp);
324 		ifp = NULL;
325 		fclose(ofp);
326 		ofp = NULL;
327 		lseek(fdin, SEEK_SET, 0);
328 	}
329 
330 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
331 		if ((fdout = mktmpfile()) == -1)
332 			goto err;
333 		if ((fd = dup(fdout)) == -1)
334 			goto err;
335 		if ((ifp = fdopen(fdin, "r")) == NULL)
336 			goto err;
337 		fdin = fd;
338 		fd = -1;
339 		if ((ofp = fdopen(fdout, "w+")) == NULL)
340 			goto err;
341 
342 		if (! uncompress_file(ifp, ofp))
343 			goto err;
344 
345 		fclose(ifp);
346 		ifp = NULL;
347 		fclose(ofp);
348 		ofp = NULL;
349 		lseek(fdin, SEEK_SET, 0);
350 	}
351 
352 	return (fdin);
353 
354 err:
355 	if (fd != -1)
356 		close(fd);
357 	if (fdin != -1)
358 		close(fdin);
359 	if (fdout != -1)
360 		close(fdout);
361 	if (ifp)
362 		fclose(ifp);
363 	if (ofp)
364 		fclose(ofp);
365 	return -1;
366 }
367 
368 int
369 queue_message_fd_rw(uint32_t msgid)
370 {
371 	char buf[SMTPD_MAXPATHLEN];
372 
373 	queue_message_path(msgid, buf, sizeof(buf));
374 
375 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
376 }
377 
378 static int
379 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
380 {
381 	char   *evp;
382 	size_t	evplen;
383 	size_t	complen;
384 	char	compbuf[sizeof(struct envelope)];
385 	size_t	enclen;
386 	char	encbuf[sizeof(struct envelope)];
387 
388 	evp = evpbuf;
389 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
390 	if (evplen == 0)
391 		return (0);
392 
393 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
394 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
395 		if (complen == 0)
396 			return (0);
397 		evp = compbuf;
398 		evplen = complen;
399 	}
400 
401 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
402 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
403 		if (enclen == 0)
404 			return (0);
405 		evp = encbuf;
406 		evplen = enclen;
407 	}
408 
409 	memmove(evpbuf, evp, evplen);
410 
411 	return (evplen);
412 }
413 
414 static int
415 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
416 {
417 	char		*evp;
418 	size_t		 evplen;
419 	char		 compbuf[sizeof(struct envelope)];
420 	size_t		 complen;
421 	char		 encbuf[sizeof(struct envelope)];
422 	size_t		 enclen;
423 
424 	evp = evpbuf;
425 	evplen = evpbufsize;
426 
427 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
428 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
429 		if (enclen == 0)
430 			return (0);
431 		evp = encbuf;
432 		evplen = enclen;
433 	}
434 
435 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
436 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
437 		if (complen == 0)
438 			return (0);
439 		evp = compbuf;
440 		evplen = complen;
441 	}
442 
443 	return (envelope_load_buffer(ep, evp, evplen));
444 }
445 
446 static void
447 queue_envelope_cache_add(struct envelope *e)
448 {
449 	struct envelope *cached;
450 
451 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
452 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
453 
454 	cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add");
455 	*cached = *e;
456 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
457 	tree_xset(&evpcache_tree, e->id, cached);
458 	stat_increment("queue.evpcache.size", 1);
459 }
460 
461 static void
462 queue_envelope_cache_update(struct envelope *e)
463 {
464 	struct envelope *cached;
465 
466 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
467 		queue_envelope_cache_add(e);
468 		stat_increment("queue.evpcache.update.missed", 1);
469 	} else {
470 		TAILQ_REMOVE(&evpcache_list, cached, entry);
471 		*cached = *e;
472 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
473 		stat_increment("queue.evpcache.update.hit", 1);
474 	}
475 }
476 
477 static void
478 queue_envelope_cache_del(uint64_t evpid)
479 {
480 	struct envelope *cached;
481 
482 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
483 		return;
484 
485 	TAILQ_REMOVE(&evpcache_list, cached, entry);
486 	free(cached);
487 	stat_decrement("queue.evpcache.size", 1);
488 }
489 
490 int
491 queue_envelope_create(struct envelope *ep)
492 {
493 	int		 r;
494 	char		 evpbuf[sizeof(struct envelope)];
495 	size_t		 evplen;
496 	uint64_t	 evpid;
497 	uint32_t	 msgid;
498 
499 	ep->creation = time(NULL);
500 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
501 	if (evplen == 0)
502 		return (0);
503 
504 	evpid = ep->id;
505 	msgid = evpid_to_msgid(evpid);
506 
507 	profile_enter("queue_envelope_create");
508 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
509 	profile_leave();
510 
511 	log_trace(TRACE_QUEUE,
512 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
513 	    evpid, evplen, r, ep->id);
514 
515 	if (!r) {
516 		ep->creation = 0;
517 		ep->id = 0;
518 	}
519 
520 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
521 		queue_envelope_cache_add(ep);
522 
523 	return (r);
524 }
525 
526 int
527 queue_envelope_delete(uint64_t evpid)
528 {
529 	int	r;
530 
531 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
532 		queue_envelope_cache_del(evpid);
533 
534 	profile_enter("queue_envelope_delete");
535 	r = handler_envelope_delete(evpid);
536 	profile_leave();
537 
538 	log_trace(TRACE_QUEUE,
539 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
540 	    evpid, r);
541 
542 	return (r);
543 }
544 
545 int
546 queue_envelope_load(uint64_t evpid, struct envelope *ep)
547 {
548 	const char	*e;
549 	char		 evpbuf[sizeof(struct envelope)];
550 	size_t		 evplen;
551 	struct envelope	*cached;
552 
553 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
554 	    (cached = tree_get(&evpcache_tree, evpid))) {
555 		*ep = *cached;
556 		stat_increment("queue.evpcache.load.hit", 1);
557 		return (1);
558 	}
559 
560 	ep->id = evpid;
561 	profile_enter("queue_envelope_load");
562 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
563 	profile_leave();
564 
565 	log_trace(TRACE_QUEUE,
566 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
567 	    evpid, evplen);
568 
569 	if (evplen == 0)
570 		return (0);
571 
572 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
573 		if ((e = envelope_validate(ep)) == NULL) {
574 			ep->id = evpid;
575 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
576 				queue_envelope_cache_add(ep);
577 				stat_increment("queue.evpcache.load.missed", 1);
578 			}
579 			return (1);
580 		}
581 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
582 		    ep->id, e);
583 	}
584 
585 	(void)queue_message_corrupt(evpid_to_msgid(evpid));
586 	return (0);
587 }
588 
589 int
590 queue_envelope_update(struct envelope *ep)
591 {
592 	char	evpbuf[sizeof(struct envelope)];
593 	size_t	evplen;
594 	int	r;
595 
596 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
597 	if (evplen == 0)
598 		return (0);
599 
600 	profile_enter("queue_envelope_update");
601 	r = handler_envelope_update(ep->id, evpbuf, evplen);
602 	profile_leave();
603 
604 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
605 		queue_envelope_cache_update(ep);
606 
607 	log_trace(TRACE_QUEUE,
608 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
609 	    ep->id, r);
610 
611 	return (r);
612 }
613 
614 int
615 queue_envelope_walk(struct envelope *ep)
616 {
617 	const char	*e;
618 	uint64_t	 evpid;
619 	char		 evpbuf[sizeof(struct envelope)];
620 	int		 r;
621 
622 	profile_enter("queue_envelope_walk");
623 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
624 	profile_leave();
625 
626 	log_trace(TRACE_QUEUE,
627 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
628 	    r, evpid);
629 
630 	if (r == -1)
631 		return (r);
632 
633 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
634 		if ((e = envelope_validate(ep)) == NULL) {
635 			ep->id = evpid;
636 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
637 				queue_envelope_cache_add(ep);
638 			return (1);
639 		}
640 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
641 		    ep->id, e);
642 		(void)queue_message_corrupt(evpid_to_msgid(evpid));
643 	}
644 
645 	return (0);
646 }
647 
648 uint32_t
649 queue_generate_msgid(void)
650 {
651 	uint32_t msgid;
652 
653 	while ((msgid = arc4random_uniform(0xffffffff)) == 0)
654 		;
655 
656 	return msgid;
657 }
658 
659 uint64_t
660 queue_generate_evpid(uint32_t msgid)
661 {
662 	uint32_t rnd;
663 	uint64_t evpid;
664 
665 	while ((rnd = arc4random_uniform(0xffffffff)) == 0)
666 		;
667 
668 	evpid = msgid;
669 	evpid <<= 32;
670 	evpid |= rnd;
671 
672 	return evpid;
673 }
674 
675 static const char*
676 envelope_validate(struct envelope *ep)
677 {
678 	if (ep->version != SMTPD_ENVELOPE_VERSION)
679 		return "version mismatch";
680 
681 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
682 		return "invalid helo";
683 	if (ep->helo[0] == '\0')
684 		return "empty helo";
685 
686 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
687 		return "invalid hostname";
688 	if (ep->hostname[0] == '\0')
689 		return "empty hostname";
690 
691 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
692 		return "invalid error line";
693 
694 	return NULL;
695 }
696 
697 void
698 queue_api_on_close(int(*cb)(void))
699 {
700 	handler_close = cb;
701 }
702 
703 void
704 queue_api_on_message_create(int(*cb)(uint32_t *))
705 {
706 	handler_message_create = cb;
707 }
708 
709 void
710 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
711 {
712 	handler_message_commit = cb;
713 }
714 
715 void
716 queue_api_on_message_delete(int(*cb)(uint32_t))
717 {
718 	handler_message_delete = cb;
719 }
720 
721 void
722 queue_api_on_message_fd_r(int(*cb)(uint32_t))
723 {
724 	handler_message_fd_r = cb;
725 }
726 
727 void
728 queue_api_on_message_corrupt(int(*cb)(uint32_t))
729 {
730 	handler_message_corrupt = cb;
731 }
732 
733 void
734 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
735 {
736 	handler_envelope_create = cb;
737 }
738 
739 void
740 queue_api_on_envelope_delete(int(*cb)(uint64_t))
741 {
742 	handler_envelope_delete = cb;
743 }
744 
745 void
746 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
747 {
748 	handler_envelope_update = cb;
749 }
750 
751 void
752 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
753 {
754 	handler_envelope_load = cb;
755 }
756 
757 void
758 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
759 {
760 	handler_envelope_walk = cb;
761 }
762