xref: /openbsd-src/usr.sbin/smtpd/queue_backend.c (revision d7bcae4d6d4d5ecc8bada62c46fe63bb4ddc3d0c)
1 /*	$OpenBSD: queue_backend.c,v 1.47 2013/10/26 12:27:59 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <imsg.h>
31 #include <inttypes.h>
32 #include <libgen.h>
33 #include <pwd.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 
40 #include "smtpd.h"
41 #include "log.h"
42 
43 static const char* envelope_validate(struct envelope *);
44 
45 extern struct queue_backend	queue_backend_fs;
46 extern struct queue_backend	queue_backend_null;
47 extern struct queue_backend	queue_backend_proc;
48 extern struct queue_backend	queue_backend_ram;
49 
50 static void queue_envelope_cache_add(struct envelope *);
51 static void queue_envelope_cache_update(struct envelope *);
52 static void queue_envelope_cache_del(uint64_t evpid);
53 
54 TAILQ_HEAD(evplst, envelope);
55 
56 static struct tree		evpcache_tree;
57 static struct evplst		evpcache_list;
58 static struct queue_backend	*backend;
59 
60 static int (*handler_message_create)(uint32_t *);
61 static int (*handler_message_commit)(uint32_t, const char*);
62 static int (*handler_message_delete)(uint32_t);
63 static int (*handler_message_fd_r)(uint32_t);
64 static int (*handler_message_corrupt)(uint32_t);
65 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
66 static int (*handler_envelope_delete)(uint64_t);
67 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
68 static int (*handler_envelope_load)(uint64_t, char *, size_t);
69 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
70 
71 #ifdef QUEUE_PROFILING
72 
73 static struct {
74 	struct timespec	 t0;
75 	const char	*name;
76 } profile;
77 
78 static inline void profile_enter(const char *name)
79 {
80 	if ((profiling & PROFILE_QUEUE) == 0)
81 		return;
82 
83 	profile.name = name;
84 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
85 }
86 
87 static inline void profile_leave(void)
88 {
89 	struct timespec	 t1, dt;
90 
91 	if ((profiling & PROFILE_QUEUE) == 0)
92 		return;
93 
94 	clock_gettime(CLOCK_MONOTONIC, &t1);
95 	timespecsub(&t1, &profile.t0, &dt);
96 	log_debug("profile-queue: %s %lld.%06ld", profile.name,
97 	    (long long)dt.tv_sec * 1000000 + dt.tv_nsec / 1000000,
98 	    dt.tv_nsec % 1000000);
99 }
100 #else
101 #define profile_enter(x)	do {} while (0)
102 #define profile_leave()		do {} while (0)
103 #endif
104 
105 static int
106 queue_message_path(uint32_t msgid, char *buf, size_t len)
107 {
108 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
109 }
110 
111 int
112 queue_init(const char *name, int server)
113 {
114 	struct passwd	*pwq;
115 	int		 r;
116 
117 	pwq = getpwnam(SMTPD_QUEUE_USER);
118 	if (pwq == NULL)
119 		pwq = getpwnam(SMTPD_USER);
120 	if (pwq == NULL)
121 		errx(1, "unknown user %s", SMTPD_USER);
122 
123 	tree_init(&evpcache_tree);
124 	TAILQ_INIT(&evpcache_list);
125 
126 	if (!strcmp(name, "fs"))
127 		backend = &queue_backend_fs;
128 	if (!strcmp(name, "null"))
129 		backend = &queue_backend_null;
130 	if (!strcmp(name, "proc"))
131 		backend = &queue_backend_proc;
132 	if (!strcmp(name, "ram"))
133 		backend = &queue_backend_ram;
134 
135 	if (backend == NULL) {
136 		log_warn("could not find queue backend \"%s\"", name);
137 		return (0);
138 	}
139 
140 	if (server) {
141 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
142 			errx(1, "error in spool directory setup");
143 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 01777, 0, 0, 1) == 0)
144 			errx(1, "error in offline directory setup");
145 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
146 			errx(1, "error in purge directory setup");
147 
148 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
149 
150 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
151 			errx(1, "error in purge directory setup");
152 	}
153 
154 	r = backend->init(pwq, server);
155 
156 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
157 
158 	return (r);
159 }
160 
161 int
162 queue_message_create(uint32_t *msgid)
163 {
164 	int	r;
165 
166 	profile_enter("queue_message_create");
167 	r = handler_message_create(msgid);
168 	profile_leave();
169 
170 	log_trace(TRACE_QUEUE,
171 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
172 	    r, *msgid);
173 
174 	return (r);
175 }
176 
177 int
178 queue_message_delete(uint32_t msgid)
179 {
180 	char	msgpath[MAXPATHLEN];
181 	int	r;
182 
183 	profile_enter("queue_message_delete");
184 	r = handler_message_delete(msgid);
185 	profile_leave();
186 
187 	/* in case the message is incoming */
188 	queue_message_path(msgid, msgpath, sizeof(msgpath));
189 	unlink(msgpath);
190 
191 	log_trace(TRACE_QUEUE,
192 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
193 
194 	return (r);
195 }
196 
197 int
198 queue_message_commit(uint32_t msgid)
199 {
200 	int	r;
201 	char	msgpath[MAXPATHLEN];
202 	char	tmppath[MAXPATHLEN];
203 	FILE	*ifp = NULL;
204 	FILE	*ofp = NULL;
205 
206 	profile_enter("queue_message_commit");
207 
208 	queue_message_path(msgid, msgpath, sizeof(msgpath));
209 
210 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
211 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
212 		ifp = fopen(msgpath, "r");
213 		ofp = fopen(tmppath, "w+");
214 		if (ifp == NULL || ofp == NULL)
215 			goto err;
216 		if (! compress_file(ifp, ofp))
217 			goto err;
218 		fclose(ifp);
219 		fclose(ofp);
220 		ifp = NULL;
221 		ofp = NULL;
222 
223 		if (rename(tmppath, msgpath) == -1) {
224 			if (errno == ENOSPC)
225 				return (0);
226 			unlink(tmppath);
227 			log_warn("rename");
228 			return (0);
229 		}
230 	}
231 
232 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
233 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
234 		ifp = fopen(msgpath, "r");
235 		ofp = fopen(tmppath, "w+");
236 		if (ifp == NULL || ofp == NULL)
237 			goto err;
238 		if (! crypto_encrypt_file(ifp, ofp))
239 			goto err;
240 		fclose(ifp);
241 		fclose(ofp);
242 		ifp = NULL;
243 		ofp = NULL;
244 
245 		if (rename(tmppath, msgpath) == -1) {
246 			if (errno == ENOSPC)
247 				return (0);
248 			unlink(tmppath);
249 			log_warn("rename");
250 			return (0);
251 		}
252 	}
253 
254 	r = handler_message_commit(msgid, msgpath);
255 	profile_leave();
256 
257 	/* in case it's not done by the backend */
258 	unlink(msgpath);
259 
260 	log_trace(TRACE_QUEUE,
261 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
262 	    msgid, r);
263 
264 	return (r);
265 
266 err:
267 	if (ifp)
268 		fclose(ifp);
269 	if (ofp)
270 		fclose(ofp);
271 	return 0;
272 }
273 
274 int
275 queue_message_corrupt(uint32_t msgid)
276 {
277 	int	r;
278 
279 	profile_enter("queue_message_corrupt");
280 	r = handler_message_corrupt(msgid);
281 	profile_leave();
282 
283 	log_trace(TRACE_QUEUE,
284 	    "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r);
285 
286 	return (r);
287 }
288 
289 int
290 queue_message_fd_r(uint32_t msgid)
291 {
292 	int	fdin = -1, fdout = -1, fd = -1;
293 	FILE	*ifp = NULL;
294 	FILE	*ofp = NULL;
295 
296 	profile_enter("queue_message_fd_r");
297 	fdin = handler_message_fd_r(msgid);
298 	profile_leave();
299 
300 	log_trace(TRACE_QUEUE,
301 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
302 
303 	if (fdin == -1)
304 		return (-1);
305 
306 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
307 		if ((fdout = mktmpfile()) == -1)
308 			goto err;
309 		if ((fd = dup(fdout)) == -1)
310 			goto err;
311 		if ((ifp = fdopen(fdin, "r")) == NULL)
312 			goto err;
313 		fdin = fd;
314 		fd = -1;
315 		if ((ofp = fdopen(fdout, "w+")) == NULL)
316 			goto err;
317 
318 		if (! crypto_decrypt_file(ifp, ofp))
319 			goto err;
320 
321 		fclose(ifp);
322 		fclose(ofp);
323 		lseek(fdin, SEEK_SET, 0);
324 	}
325 
326 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
327 		if ((fdout = mktmpfile()) == -1)
328 			goto err;
329 		if ((fd = dup(fdout)) == -1)
330 			goto err;
331 		if ((ifp = fdopen(fdin, "r")) == NULL)
332 			goto err;
333 		fdin = fd;
334 		fd = -1;
335 		if ((ofp = fdopen(fdout, "w+")) == NULL)
336 			goto err;
337 
338 		if (! uncompress_file(ifp, ofp))
339 			goto err;
340 
341 		fclose(ifp);
342 		fclose(ofp);
343 		lseek(fdin, SEEK_SET, 0);
344 	}
345 
346 	return (fdin);
347 
348 err:
349 	if (fd != -1)
350 		close(fd);
351 	if (fdin != -1)
352 		close(fdin);
353 	if (fdout != -1)
354 		close(fdout);
355 	if (ifp)
356 		fclose(ifp);
357 	if (ofp)
358 		fclose(ofp);
359 	return -1;
360 }
361 
362 int
363 queue_message_fd_rw(uint32_t msgid)
364 {
365 	char buf[SMTPD_MAXPATHLEN];
366 
367 	queue_message_path(msgid, buf, sizeof(buf));
368 
369 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
370 }
371 
372 static int
373 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
374 {
375 	char   *evp;
376 	size_t	evplen;
377 	size_t	complen;
378 	char	compbuf[sizeof(struct envelope)];
379 	size_t	enclen;
380 	char	encbuf[sizeof(struct envelope)];
381 
382 	evp = evpbuf;
383 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
384 	if (evplen == 0)
385 		return (0);
386 
387 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
388 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
389 		if (complen == 0)
390 			return (0);
391 		evp = compbuf;
392 		evplen = complen;
393 	}
394 
395 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
396 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
397 		if (enclen == 0)
398 			return (0);
399 		evp = encbuf;
400 		evplen = enclen;
401 	}
402 
403 	memmove(evpbuf, evp, evplen);
404 
405 	return (evplen);
406 }
407 
408 static int
409 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
410 {
411 	char		*evp;
412 	size_t		 evplen;
413 	char		 compbuf[sizeof(struct envelope)];
414 	size_t		 complen;
415 	char		 encbuf[sizeof(struct envelope)];
416 	size_t		 enclen;
417 
418 	evp = evpbuf;
419 	evplen = evpbufsize;
420 
421 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
422 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
423 		if (enclen == 0)
424 			return (0);
425 		evp = encbuf;
426 		evplen = enclen;
427 	}
428 
429 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
430 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
431 		if (complen == 0)
432 			return (0);
433 		evp = compbuf;
434 		evplen = complen;
435 	}
436 
437 	return (envelope_load_buffer(ep, evp, evplen));
438 }
439 
440 static void
441 queue_envelope_cache_add(struct envelope *e)
442 {
443 	struct envelope *cached;
444 
445 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
446 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
447 
448 	cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add");
449 	*cached = *e;
450 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
451 	tree_xset(&evpcache_tree, e->id, cached);
452 	stat_increment("queue.evpcache.size", 1);
453 }
454 
455 static void
456 queue_envelope_cache_update(struct envelope *e)
457 {
458 	struct envelope *cached;
459 
460 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
461 		queue_envelope_cache_add(e);
462 		stat_increment("queue.evpcache.update.missed", 1);
463 	} else {
464 		TAILQ_REMOVE(&evpcache_list, cached, entry);
465 		*cached = *e;
466 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
467 		stat_increment("queue.evpcache.update.hit", 1);
468 	}
469 }
470 
471 static void
472 queue_envelope_cache_del(uint64_t evpid)
473 {
474 	struct envelope *cached;
475 
476 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
477 		return;
478 
479 	TAILQ_REMOVE(&evpcache_list, cached, entry);
480 	free(cached);
481 	stat_decrement("queue.evpcache.size", 1);
482 }
483 
484 int
485 queue_envelope_create(struct envelope *ep)
486 {
487 	int		 r;
488 	char		 evpbuf[sizeof(struct envelope)];
489 	size_t		 evplen;
490 	uint64_t	 evpid;
491 	uint32_t	 msgid;
492 
493 	ep->creation = time(NULL);
494 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
495 	if (evplen == 0)
496 		return (0);
497 
498 	evpid = ep->id;
499 	msgid = evpid_to_msgid(evpid);
500 
501 	profile_enter("queue_envelope_create");
502 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
503 	profile_leave();
504 
505 	log_trace(TRACE_QUEUE,
506 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
507 	    evpid, evplen, r, ep->id);
508 
509 	if (!r) {
510 		ep->creation = 0;
511 		ep->id = 0;
512 	}
513 
514 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
515 		queue_envelope_cache_add(ep);
516 
517 	return (r);
518 }
519 
520 int
521 queue_envelope_delete(uint64_t evpid)
522 {
523 	int	r;
524 
525 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
526 		queue_envelope_cache_del(evpid);
527 
528 	profile_enter("queue_envelope_delete");
529 	r = handler_envelope_delete(evpid);
530 	profile_leave();
531 
532 	log_trace(TRACE_QUEUE,
533 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
534 	    evpid, r);
535 
536 	return (r);
537 }
538 
539 int
540 queue_envelope_load(uint64_t evpid, struct envelope *ep)
541 {
542 	const char	*e;
543 	char		 evpbuf[sizeof(struct envelope)];
544 	size_t		 evplen;
545 	struct envelope	*cached;
546 
547 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
548 	    (cached = tree_get(&evpcache_tree, evpid))) {
549 		*ep = *cached;
550 		stat_increment("queue.evpcache.load.hit", 1);
551 		return (1);
552 	}
553 
554 	ep->id = evpid;
555 	profile_enter("queue_envelope_load");
556 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
557 	profile_leave();
558 
559 	log_trace(TRACE_QUEUE,
560 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
561 	    evpid, evplen);
562 
563 	if (evplen == 0)
564 		return (0);
565 
566 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
567 		if ((e = envelope_validate(ep)) == NULL) {
568 			ep->id = evpid;
569 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
570 				queue_envelope_cache_add(ep);
571 				stat_increment("queue.evpcache.load.missed", 1);
572 			}
573 			return (1);
574 		}
575 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
576 		    ep->id, e);
577 	}
578 
579 	(void)queue_message_corrupt(evpid_to_msgid(evpid));
580 	return (0);
581 }
582 
583 int
584 queue_envelope_update(struct envelope *ep)
585 {
586 	char	evpbuf[sizeof(struct envelope)];
587 	size_t	evplen;
588 	int	r;
589 
590 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
591 	if (evplen == 0)
592 		return (0);
593 
594 	profile_enter("queue_envelope_update");
595 	r = handler_envelope_update(ep->id, evpbuf, evplen);
596 	profile_leave();
597 
598 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
599 		queue_envelope_cache_update(ep);
600 
601 	log_trace(TRACE_QUEUE,
602 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
603 	    ep->id, r);
604 
605 	return (r);
606 }
607 
608 int
609 queue_envelope_walk(struct envelope *ep)
610 {
611 	const char	*e;
612 	uint64_t	 evpid;
613 	char		 evpbuf[sizeof(struct envelope)];
614 	int		 r;
615 
616 	profile_enter("queue_envelope_walk");
617 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
618 	profile_leave();
619 
620 	log_trace(TRACE_QUEUE,
621 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
622 	    r, evpid);
623 
624 	if (r == -1)
625 		return (r);
626 
627 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
628 		if ((e = envelope_validate(ep)) == NULL) {
629 			ep->id = evpid;
630 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
631 				queue_envelope_cache_add(ep);
632 			return (1);
633 		}
634 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
635 		    ep->id, e);
636 	}
637 
638 	(void)queue_message_corrupt(evpid_to_msgid(evpid));
639 	return (0);
640 }
641 
642 uint32_t
643 queue_generate_msgid(void)
644 {
645 	uint32_t msgid;
646 
647 	while ((msgid = arc4random_uniform(0xffffffff)) == 0)
648 		;
649 
650 	return msgid;
651 }
652 
653 uint64_t
654 queue_generate_evpid(uint32_t msgid)
655 {
656 	uint32_t rnd;
657 	uint64_t evpid;
658 
659 	while ((rnd = arc4random_uniform(0xffffffff)) == 0)
660 		;
661 
662 	evpid = msgid;
663 	evpid <<= 32;
664 	evpid |= rnd;
665 
666 	return evpid;
667 }
668 
669 static const char*
670 envelope_validate(struct envelope *ep)
671 {
672 	if (ep->version != SMTPD_ENVELOPE_VERSION)
673 		return "version mismatch";
674 
675 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
676 		return "invalid helo";
677 	if (ep->helo[0] == '\0')
678 		return "empty helo";
679 
680 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
681 		return "invalid hostname";
682 	if (ep->hostname[0] == '\0')
683 		return "empty hostname";
684 
685 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
686 		return "invalid error line";
687 
688 	return NULL;
689 }
690 
691 void
692 queue_api_on_message_create(int(*cb)(uint32_t *))
693 {
694 	handler_message_create = cb;
695 }
696 
697 void
698 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
699 {
700 	handler_message_commit = cb;
701 }
702 
703 void
704 queue_api_on_message_delete(int(*cb)(uint32_t))
705 {
706 	handler_message_delete = cb;
707 }
708 
709 void
710 queue_api_on_message_fd_r(int(*cb)(uint32_t))
711 {
712 	handler_message_fd_r = cb;
713 }
714 
715 void
716 queue_api_on_message_corrupt(int(*cb)(uint32_t))
717 {
718 	handler_message_corrupt = cb;
719 }
720 
721 void
722 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
723 {
724 	handler_envelope_create = cb;
725 }
726 
727 void
728 queue_api_on_envelope_delete(int(*cb)(uint64_t))
729 {
730 	handler_envelope_delete = cb;
731 }
732 
733 void
734 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
735 {
736 	handler_envelope_update = cb;
737 }
738 
739 void
740 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
741 {
742 	handler_envelope_load = cb;
743 }
744 
745 void
746 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
747 {
748 	handler_envelope_walk = cb;
749 }
750