xref: /openbsd-src/usr.sbin/smtpd/queue_backend.c (revision e267740e69b5a7e14e88f9a79e9b78eae78b9ae7)
1 /*	$OpenBSD: queue_backend.c,v 1.51 2014/07/07 09:11:24 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <imsg.h>
31 #include <inttypes.h>
32 #include <libgen.h>
33 #include <pwd.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 
40 #include "smtpd.h"
41 #include "log.h"
42 
43 static const char* envelope_validate(struct envelope *);
44 
45 extern struct queue_backend	queue_backend_fs;
46 extern struct queue_backend	queue_backend_null;
47 extern struct queue_backend	queue_backend_proc;
48 extern struct queue_backend	queue_backend_ram;
49 
50 static void queue_envelope_cache_add(struct envelope *);
51 static void queue_envelope_cache_update(struct envelope *);
52 static void queue_envelope_cache_del(uint64_t evpid);
53 
54 TAILQ_HEAD(evplst, envelope);
55 
56 static struct tree		evpcache_tree;
57 static struct evplst		evpcache_list;
58 static struct queue_backend	*backend;
59 
60 static int (*handler_message_create)(uint32_t *);
61 static int (*handler_message_commit)(uint32_t, const char*);
62 static int (*handler_message_delete)(uint32_t);
63 static int (*handler_message_fd_r)(uint32_t);
64 static int (*handler_message_corrupt)(uint32_t);
65 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
66 static int (*handler_envelope_delete)(uint64_t);
67 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
68 static int (*handler_envelope_load)(uint64_t, char *, size_t);
69 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
70 
71 #ifdef QUEUE_PROFILING
72 
73 static struct {
74 	struct timespec	 t0;
75 	const char	*name;
76 } profile;
77 
78 static inline void profile_enter(const char *name)
79 {
80 	if ((profiling & PROFILE_QUEUE) == 0)
81 		return;
82 
83 	profile.name = name;
84 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
85 }
86 
87 static inline void profile_leave(void)
88 {
89 	struct timespec	 t1, dt;
90 
91 	if ((profiling & PROFILE_QUEUE) == 0)
92 		return;
93 
94 	clock_gettime(CLOCK_MONOTONIC, &t1);
95 	timespecsub(&t1, &profile.t0, &dt);
96 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
97 	    (long long)dt.tv_sec, dt.tv_nsec);
98 }
99 #else
100 #define profile_enter(x)	do {} while (0)
101 #define profile_leave()		do {} while (0)
102 #endif
103 
104 static int
105 queue_message_path(uint32_t msgid, char *buf, size_t len)
106 {
107 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
108 }
109 
110 int
111 queue_init(const char *name, int server)
112 {
113 	struct passwd	*pwq;
114 	int		 r;
115 
116 	pwq = getpwnam(SMTPD_QUEUE_USER);
117 	if (pwq == NULL)
118 		errx(1, "unknown user %s", SMTPD_QUEUE_USER);
119 
120 	tree_init(&evpcache_tree);
121 	TAILQ_INIT(&evpcache_list);
122 
123 	if (!strcmp(name, "fs"))
124 		backend = &queue_backend_fs;
125 	if (!strcmp(name, "null"))
126 		backend = &queue_backend_null;
127 	if (!strcmp(name, "proc"))
128 		backend = &queue_backend_proc;
129 	if (!strcmp(name, "ram"))
130 		backend = &queue_backend_ram;
131 
132 	if (backend == NULL) {
133 		log_warn("could not find queue backend \"%s\"", name);
134 		return (0);
135 	}
136 
137 	if (server) {
138 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
139 			errx(1, "error in spool directory setup");
140 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 01777, 0, 0, 1) == 0)
141 			errx(1, "error in offline directory setup");
142 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
143 			errx(1, "error in purge directory setup");
144 
145 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
146 
147 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
148 			errx(1, "error in purge directory setup");
149 	}
150 
151 	r = backend->init(pwq, server);
152 
153 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
154 
155 	return (r);
156 }
157 
158 int
159 queue_message_create(uint32_t *msgid)
160 {
161 	int	r;
162 
163 	profile_enter("queue_message_create");
164 	r = handler_message_create(msgid);
165 	profile_leave();
166 
167 	log_trace(TRACE_QUEUE,
168 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
169 	    r, *msgid);
170 
171 	return (r);
172 }
173 
174 int
175 queue_message_delete(uint32_t msgid)
176 {
177 	char	msgpath[MAXPATHLEN];
178 	int	r;
179 
180 	profile_enter("queue_message_delete");
181 	r = handler_message_delete(msgid);
182 	profile_leave();
183 
184 	/* in case the message is incoming */
185 	queue_message_path(msgid, msgpath, sizeof(msgpath));
186 	unlink(msgpath);
187 
188 	log_trace(TRACE_QUEUE,
189 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
190 
191 	return (r);
192 }
193 
194 int
195 queue_message_commit(uint32_t msgid)
196 {
197 	int	r;
198 	char	msgpath[MAXPATHLEN];
199 	char	tmppath[MAXPATHLEN];
200 	FILE	*ifp = NULL;
201 	FILE	*ofp = NULL;
202 
203 	profile_enter("queue_message_commit");
204 
205 	queue_message_path(msgid, msgpath, sizeof(msgpath));
206 
207 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
208 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
209 		ifp = fopen(msgpath, "r");
210 		ofp = fopen(tmppath, "w+");
211 		if (ifp == NULL || ofp == NULL)
212 			goto err;
213 		if (! compress_file(ifp, ofp))
214 			goto err;
215 		fclose(ifp);
216 		fclose(ofp);
217 		ifp = NULL;
218 		ofp = NULL;
219 
220 		if (rename(tmppath, msgpath) == -1) {
221 			if (errno == ENOSPC)
222 				return (0);
223 			unlink(tmppath);
224 			log_warn("rename");
225 			return (0);
226 		}
227 	}
228 
229 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
230 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
231 		ifp = fopen(msgpath, "r");
232 		ofp = fopen(tmppath, "w+");
233 		if (ifp == NULL || ofp == NULL)
234 			goto err;
235 		if (! crypto_encrypt_file(ifp, ofp))
236 			goto err;
237 		fclose(ifp);
238 		fclose(ofp);
239 		ifp = NULL;
240 		ofp = NULL;
241 
242 		if (rename(tmppath, msgpath) == -1) {
243 			if (errno == ENOSPC)
244 				return (0);
245 			unlink(tmppath);
246 			log_warn("rename");
247 			return (0);
248 		}
249 	}
250 
251 	r = handler_message_commit(msgid, msgpath);
252 	profile_leave();
253 
254 	/* in case it's not done by the backend */
255 	unlink(msgpath);
256 
257 	log_trace(TRACE_QUEUE,
258 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
259 	    msgid, r);
260 
261 	return (r);
262 
263 err:
264 	if (ifp)
265 		fclose(ifp);
266 	if (ofp)
267 		fclose(ofp);
268 	return 0;
269 }
270 
271 int
272 queue_message_corrupt(uint32_t msgid)
273 {
274 	int	r;
275 
276 	profile_enter("queue_message_corrupt");
277 	r = handler_message_corrupt(msgid);
278 	profile_leave();
279 
280 	log_trace(TRACE_QUEUE,
281 	    "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r);
282 
283 	return (r);
284 }
285 
286 int
287 queue_message_fd_r(uint32_t msgid)
288 {
289 	int	fdin = -1, fdout = -1, fd = -1;
290 	FILE	*ifp = NULL;
291 	FILE	*ofp = NULL;
292 
293 	profile_enter("queue_message_fd_r");
294 	fdin = handler_message_fd_r(msgid);
295 	profile_leave();
296 
297 	log_trace(TRACE_QUEUE,
298 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
299 
300 	if (fdin == -1)
301 		return (-1);
302 
303 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
304 		if ((fdout = mktmpfile()) == -1)
305 			goto err;
306 		if ((fd = dup(fdout)) == -1)
307 			goto err;
308 		if ((ifp = fdopen(fdin, "r")) == NULL)
309 			goto err;
310 		fdin = fd;
311 		fd = -1;
312 		if ((ofp = fdopen(fdout, "w+")) == NULL)
313 			goto err;
314 
315 		if (! crypto_decrypt_file(ifp, ofp))
316 			goto err;
317 
318 		fclose(ifp);
319 		ifp = NULL;
320 		fclose(ofp);
321 		ofp = NULL;
322 		lseek(fdin, SEEK_SET, 0);
323 	}
324 
325 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
326 		if ((fdout = mktmpfile()) == -1)
327 			goto err;
328 		if ((fd = dup(fdout)) == -1)
329 			goto err;
330 		if ((ifp = fdopen(fdin, "r")) == NULL)
331 			goto err;
332 		fdin = fd;
333 		fd = -1;
334 		if ((ofp = fdopen(fdout, "w+")) == NULL)
335 			goto err;
336 
337 		if (! uncompress_file(ifp, ofp))
338 			goto err;
339 
340 		fclose(ifp);
341 		ifp = NULL;
342 		fclose(ofp);
343 		ofp = NULL;
344 		lseek(fdin, SEEK_SET, 0);
345 	}
346 
347 	return (fdin);
348 
349 err:
350 	if (fd != -1)
351 		close(fd);
352 	if (fdin != -1)
353 		close(fdin);
354 	if (fdout != -1)
355 		close(fdout);
356 	if (ifp)
357 		fclose(ifp);
358 	if (ofp)
359 		fclose(ofp);
360 	return -1;
361 }
362 
363 int
364 queue_message_fd_rw(uint32_t msgid)
365 {
366 	char buf[SMTPD_MAXPATHLEN];
367 
368 	queue_message_path(msgid, buf, sizeof(buf));
369 
370 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
371 }
372 
373 static int
374 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
375 {
376 	char   *evp;
377 	size_t	evplen;
378 	size_t	complen;
379 	char	compbuf[sizeof(struct envelope)];
380 	size_t	enclen;
381 	char	encbuf[sizeof(struct envelope)];
382 
383 	evp = evpbuf;
384 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
385 	if (evplen == 0)
386 		return (0);
387 
388 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
389 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
390 		if (complen == 0)
391 			return (0);
392 		evp = compbuf;
393 		evplen = complen;
394 	}
395 
396 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
397 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
398 		if (enclen == 0)
399 			return (0);
400 		evp = encbuf;
401 		evplen = enclen;
402 	}
403 
404 	memmove(evpbuf, evp, evplen);
405 
406 	return (evplen);
407 }
408 
409 static int
410 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
411 {
412 	char		*evp;
413 	size_t		 evplen;
414 	char		 compbuf[sizeof(struct envelope)];
415 	size_t		 complen;
416 	char		 encbuf[sizeof(struct envelope)];
417 	size_t		 enclen;
418 
419 	evp = evpbuf;
420 	evplen = evpbufsize;
421 
422 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
423 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
424 		if (enclen == 0)
425 			return (0);
426 		evp = encbuf;
427 		evplen = enclen;
428 	}
429 
430 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
431 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
432 		if (complen == 0)
433 			return (0);
434 		evp = compbuf;
435 		evplen = complen;
436 	}
437 
438 	return (envelope_load_buffer(ep, evp, evplen));
439 }
440 
441 static void
442 queue_envelope_cache_add(struct envelope *e)
443 {
444 	struct envelope *cached;
445 
446 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
447 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
448 
449 	cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add");
450 	*cached = *e;
451 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
452 	tree_xset(&evpcache_tree, e->id, cached);
453 	stat_increment("queue.evpcache.size", 1);
454 }
455 
456 static void
457 queue_envelope_cache_update(struct envelope *e)
458 {
459 	struct envelope *cached;
460 
461 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
462 		queue_envelope_cache_add(e);
463 		stat_increment("queue.evpcache.update.missed", 1);
464 	} else {
465 		TAILQ_REMOVE(&evpcache_list, cached, entry);
466 		*cached = *e;
467 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
468 		stat_increment("queue.evpcache.update.hit", 1);
469 	}
470 }
471 
472 static void
473 queue_envelope_cache_del(uint64_t evpid)
474 {
475 	struct envelope *cached;
476 
477 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
478 		return;
479 
480 	TAILQ_REMOVE(&evpcache_list, cached, entry);
481 	free(cached);
482 	stat_decrement("queue.evpcache.size", 1);
483 }
484 
485 int
486 queue_envelope_create(struct envelope *ep)
487 {
488 	int		 r;
489 	char		 evpbuf[sizeof(struct envelope)];
490 	size_t		 evplen;
491 	uint64_t	 evpid;
492 	uint32_t	 msgid;
493 
494 	ep->creation = time(NULL);
495 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
496 	if (evplen == 0)
497 		return (0);
498 
499 	evpid = ep->id;
500 	msgid = evpid_to_msgid(evpid);
501 
502 	profile_enter("queue_envelope_create");
503 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
504 	profile_leave();
505 
506 	log_trace(TRACE_QUEUE,
507 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
508 	    evpid, evplen, r, ep->id);
509 
510 	if (!r) {
511 		ep->creation = 0;
512 		ep->id = 0;
513 	}
514 
515 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
516 		queue_envelope_cache_add(ep);
517 
518 	return (r);
519 }
520 
521 int
522 queue_envelope_delete(uint64_t evpid)
523 {
524 	int	r;
525 
526 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
527 		queue_envelope_cache_del(evpid);
528 
529 	profile_enter("queue_envelope_delete");
530 	r = handler_envelope_delete(evpid);
531 	profile_leave();
532 
533 	log_trace(TRACE_QUEUE,
534 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
535 	    evpid, r);
536 
537 	return (r);
538 }
539 
540 int
541 queue_envelope_load(uint64_t evpid, struct envelope *ep)
542 {
543 	const char	*e;
544 	char		 evpbuf[sizeof(struct envelope)];
545 	size_t		 evplen;
546 	struct envelope	*cached;
547 
548 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
549 	    (cached = tree_get(&evpcache_tree, evpid))) {
550 		*ep = *cached;
551 		stat_increment("queue.evpcache.load.hit", 1);
552 		return (1);
553 	}
554 
555 	ep->id = evpid;
556 	profile_enter("queue_envelope_load");
557 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
558 	profile_leave();
559 
560 	log_trace(TRACE_QUEUE,
561 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
562 	    evpid, evplen);
563 
564 	if (evplen == 0)
565 		return (0);
566 
567 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
568 		if ((e = envelope_validate(ep)) == NULL) {
569 			ep->id = evpid;
570 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
571 				queue_envelope_cache_add(ep);
572 				stat_increment("queue.evpcache.load.missed", 1);
573 			}
574 			return (1);
575 		}
576 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
577 		    ep->id, e);
578 	}
579 
580 	(void)queue_message_corrupt(evpid_to_msgid(evpid));
581 	return (0);
582 }
583 
584 int
585 queue_envelope_update(struct envelope *ep)
586 {
587 	char	evpbuf[sizeof(struct envelope)];
588 	size_t	evplen;
589 	int	r;
590 
591 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
592 	if (evplen == 0)
593 		return (0);
594 
595 	profile_enter("queue_envelope_update");
596 	r = handler_envelope_update(ep->id, evpbuf, evplen);
597 	profile_leave();
598 
599 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
600 		queue_envelope_cache_update(ep);
601 
602 	log_trace(TRACE_QUEUE,
603 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
604 	    ep->id, r);
605 
606 	return (r);
607 }
608 
609 int
610 queue_envelope_walk(struct envelope *ep)
611 {
612 	const char	*e;
613 	uint64_t	 evpid;
614 	char		 evpbuf[sizeof(struct envelope)];
615 	int		 r;
616 
617 	profile_enter("queue_envelope_walk");
618 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
619 	profile_leave();
620 
621 	log_trace(TRACE_QUEUE,
622 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
623 	    r, evpid);
624 
625 	if (r == -1)
626 		return (r);
627 
628 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
629 		if ((e = envelope_validate(ep)) == NULL) {
630 			ep->id = evpid;
631 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
632 				queue_envelope_cache_add(ep);
633 			return (1);
634 		}
635 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
636 		    ep->id, e);
637 	}
638 
639 	(void)queue_message_corrupt(evpid_to_msgid(evpid));
640 	return (0);
641 }
642 
643 uint32_t
644 queue_generate_msgid(void)
645 {
646 	uint32_t msgid;
647 
648 	while ((msgid = arc4random_uniform(0xffffffff)) == 0)
649 		;
650 
651 	return msgid;
652 }
653 
654 uint64_t
655 queue_generate_evpid(uint32_t msgid)
656 {
657 	uint32_t rnd;
658 	uint64_t evpid;
659 
660 	while ((rnd = arc4random_uniform(0xffffffff)) == 0)
661 		;
662 
663 	evpid = msgid;
664 	evpid <<= 32;
665 	evpid |= rnd;
666 
667 	return evpid;
668 }
669 
670 static const char*
671 envelope_validate(struct envelope *ep)
672 {
673 	if (ep->version != SMTPD_ENVELOPE_VERSION)
674 		return "version mismatch";
675 
676 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
677 		return "invalid helo";
678 	if (ep->helo[0] == '\0')
679 		return "empty helo";
680 
681 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
682 		return "invalid hostname";
683 	if (ep->hostname[0] == '\0')
684 		return "empty hostname";
685 
686 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
687 		return "invalid error line";
688 
689 	return NULL;
690 }
691 
692 void
693 queue_api_on_message_create(int(*cb)(uint32_t *))
694 {
695 	handler_message_create = cb;
696 }
697 
698 void
699 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
700 {
701 	handler_message_commit = cb;
702 }
703 
704 void
705 queue_api_on_message_delete(int(*cb)(uint32_t))
706 {
707 	handler_message_delete = cb;
708 }
709 
710 void
711 queue_api_on_message_fd_r(int(*cb)(uint32_t))
712 {
713 	handler_message_fd_r = cb;
714 }
715 
716 void
717 queue_api_on_message_corrupt(int(*cb)(uint32_t))
718 {
719 	handler_message_corrupt = cb;
720 }
721 
722 void
723 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
724 {
725 	handler_envelope_create = cb;
726 }
727 
728 void
729 queue_api_on_envelope_delete(int(*cb)(uint64_t))
730 {
731 	handler_envelope_delete = cb;
732 }
733 
734 void
735 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
736 {
737 	handler_envelope_update = cb;
738 }
739 
740 void
741 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
742 {
743 	handler_envelope_load = cb;
744 }
745 
746 void
747 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
748 {
749 	handler_envelope_walk = cb;
750 }
751