xref: /openbsd-src/usr.sbin/smtpd/queue_backend.c (revision 8351d18b7f05448e5b0f3db78cda27c47849c49e)
1 /*	$OpenBSD: queue_backend.c,v 1.56 2015/10/09 14:37:38 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <grp.h>
31 #include <imsg.h>
32 #include <limits.h>
33 #include <inttypes.h>
34 #include <libgen.h>
35 #include <pwd.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <time.h>
40 #include <unistd.h>
41 
42 #include "smtpd.h"
43 #include "log.h"
44 
45 static const char* envelope_validate(struct envelope *);
46 
47 extern struct queue_backend	queue_backend_fs;
48 extern struct queue_backend	queue_backend_null;
49 extern struct queue_backend	queue_backend_proc;
50 extern struct queue_backend	queue_backend_ram;
51 
52 static void queue_envelope_cache_add(struct envelope *);
53 static void queue_envelope_cache_update(struct envelope *);
54 static void queue_envelope_cache_del(uint64_t evpid);
55 
56 TAILQ_HEAD(evplst, envelope);
57 
58 static struct tree		evpcache_tree;
59 static struct evplst		evpcache_list;
60 static struct queue_backend	*backend;
61 
62 static int (*handler_close)(void);
63 static int (*handler_message_create)(uint32_t *);
64 static int (*handler_message_commit)(uint32_t, const char*);
65 static int (*handler_message_delete)(uint32_t);
66 static int (*handler_message_fd_r)(uint32_t);
67 static int (*handler_message_corrupt)(uint32_t);
68 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
69 static int (*handler_envelope_delete)(uint64_t);
70 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
71 static int (*handler_envelope_load)(uint64_t, char *, size_t);
72 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
73 
74 #ifdef QUEUE_PROFILING
75 
76 static struct {
77 	struct timespec	 t0;
78 	const char	*name;
79 } profile;
80 
81 static inline void profile_enter(const char *name)
82 {
83 	if ((profiling & PROFILE_QUEUE) == 0)
84 		return;
85 
86 	profile.name = name;
87 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
88 }
89 
90 static inline void profile_leave(void)
91 {
92 	struct timespec	 t1, dt;
93 
94 	if ((profiling & PROFILE_QUEUE) == 0)
95 		return;
96 
97 	clock_gettime(CLOCK_MONOTONIC, &t1);
98 	timespecsub(&t1, &profile.t0, &dt);
99 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
100 	    (long long)dt.tv_sec, dt.tv_nsec);
101 }
102 #else
103 #define profile_enter(x)	do {} while (0)
104 #define profile_leave()		do {} while (0)
105 #endif
106 
107 static int
108 queue_message_path(uint32_t msgid, char *buf, size_t len)
109 {
110 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
111 }
112 
113 int
114 queue_init(const char *name, int server)
115 {
116 	struct passwd	*pwq;
117 	struct group	*gr;
118 	int		 r;
119 
120 	pwq = getpwnam(SMTPD_QUEUE_USER);
121 	if (pwq == NULL)
122 		errx(1, "unknown user %s", SMTPD_QUEUE_USER);
123 
124 	gr = getgrnam(SMTPD_QUEUE_GROUP);
125 	if (gr == NULL)
126 		errx(1, "unknown group %s", SMTPD_QUEUE_GROUP);
127 
128 	tree_init(&evpcache_tree);
129 	TAILQ_INIT(&evpcache_list);
130 
131 	if (!strcmp(name, "fs"))
132 		backend = &queue_backend_fs;
133 	else if (!strcmp(name, "null"))
134 		backend = &queue_backend_null;
135 	else if (!strcmp(name, "ram"))
136 		backend = &queue_backend_ram;
137 	else
138 		backend = &queue_backend_proc;
139 
140 	if (server) {
141 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
142 			errx(1, "error in spool directory setup");
143 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
144 			errx(1, "error in offline directory setup");
145 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
146 			errx(1, "error in purge directory setup");
147 
148 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
149 
150 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
151 			errx(1, "error in purge directory setup");
152 	}
153 
154 	r = backend->init(pwq, server, name);
155 
156 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
157 
158 	return (r);
159 }
160 
161 int
162 queue_close(void)
163 {
164 	if (handler_close)
165 		return (handler_close());
166 
167 	return (1);
168 }
169 
170 int
171 queue_message_create(uint32_t *msgid)
172 {
173 	int	r;
174 
175 	profile_enter("queue_message_create");
176 	r = handler_message_create(msgid);
177 	profile_leave();
178 
179 	log_trace(TRACE_QUEUE,
180 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
181 	    r, *msgid);
182 
183 	return (r);
184 }
185 
186 int
187 queue_message_delete(uint32_t msgid)
188 {
189 	char	msgpath[PATH_MAX];
190 	int	r;
191 
192 	profile_enter("queue_message_delete");
193 	r = handler_message_delete(msgid);
194 	profile_leave();
195 
196 	/* in case the message is incoming */
197 	queue_message_path(msgid, msgpath, sizeof(msgpath));
198 	unlink(msgpath);
199 
200 	log_trace(TRACE_QUEUE,
201 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
202 
203 	return (r);
204 }
205 
206 int
207 queue_message_commit(uint32_t msgid)
208 {
209 	int	r;
210 	char	msgpath[PATH_MAX];
211 	char	tmppath[PATH_MAX];
212 	FILE	*ifp = NULL;
213 	FILE	*ofp = NULL;
214 
215 	profile_enter("queue_message_commit");
216 
217 	queue_message_path(msgid, msgpath, sizeof(msgpath));
218 
219 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
220 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
221 		ifp = fopen(msgpath, "r");
222 		ofp = fopen(tmppath, "w+");
223 		if (ifp == NULL || ofp == NULL)
224 			goto err;
225 		if (! compress_file(ifp, ofp))
226 			goto err;
227 		fclose(ifp);
228 		fclose(ofp);
229 		ifp = NULL;
230 		ofp = NULL;
231 
232 		if (rename(tmppath, msgpath) == -1) {
233 			if (errno == ENOSPC)
234 				return (0);
235 			unlink(tmppath);
236 			log_warn("rename");
237 			return (0);
238 		}
239 	}
240 
241 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
242 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
243 		ifp = fopen(msgpath, "r");
244 		ofp = fopen(tmppath, "w+");
245 		if (ifp == NULL || ofp == NULL)
246 			goto err;
247 		if (! crypto_encrypt_file(ifp, ofp))
248 			goto err;
249 		fclose(ifp);
250 		fclose(ofp);
251 		ifp = NULL;
252 		ofp = NULL;
253 
254 		if (rename(tmppath, msgpath) == -1) {
255 			if (errno == ENOSPC)
256 				return (0);
257 			unlink(tmppath);
258 			log_warn("rename");
259 			return (0);
260 		}
261 	}
262 
263 	r = handler_message_commit(msgid, msgpath);
264 	profile_leave();
265 
266 	/* in case it's not done by the backend */
267 	unlink(msgpath);
268 
269 	log_trace(TRACE_QUEUE,
270 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
271 	    msgid, r);
272 
273 	return (r);
274 
275 err:
276 	if (ifp)
277 		fclose(ifp);
278 	if (ofp)
279 		fclose(ofp);
280 	return 0;
281 }
282 
283 int
284 queue_message_corrupt(uint32_t msgid)
285 {
286 	int	r;
287 
288 	profile_enter("queue_message_corrupt");
289 	r = handler_message_corrupt(msgid);
290 	profile_leave();
291 
292 	log_trace(TRACE_QUEUE,
293 	    "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r);
294 
295 	return (r);
296 }
297 
298 int
299 queue_message_fd_r(uint32_t msgid)
300 {
301 	int	fdin = -1, fdout = -1, fd = -1;
302 	FILE	*ifp = NULL;
303 	FILE	*ofp = NULL;
304 
305 	profile_enter("queue_message_fd_r");
306 	fdin = handler_message_fd_r(msgid);
307 	profile_leave();
308 
309 	log_trace(TRACE_QUEUE,
310 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
311 
312 	if (fdin == -1)
313 		return (-1);
314 
315 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
316 		if ((fdout = mktmpfile()) == -1)
317 			goto err;
318 		if ((fd = dup(fdout)) == -1)
319 			goto err;
320 		if ((ifp = fdopen(fdin, "r")) == NULL)
321 			goto err;
322 		fdin = fd;
323 		fd = -1;
324 		if ((ofp = fdopen(fdout, "w+")) == NULL)
325 			goto err;
326 
327 		if (! crypto_decrypt_file(ifp, ofp))
328 			goto err;
329 
330 		fclose(ifp);
331 		ifp = NULL;
332 		fclose(ofp);
333 		ofp = NULL;
334 		lseek(fdin, SEEK_SET, 0);
335 	}
336 
337 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
338 		if ((fdout = mktmpfile()) == -1)
339 			goto err;
340 		if ((fd = dup(fdout)) == -1)
341 			goto err;
342 		if ((ifp = fdopen(fdin, "r")) == NULL)
343 			goto err;
344 		fdin = fd;
345 		fd = -1;
346 		if ((ofp = fdopen(fdout, "w+")) == NULL)
347 			goto err;
348 
349 		if (! uncompress_file(ifp, ofp))
350 			goto err;
351 
352 		fclose(ifp);
353 		ifp = NULL;
354 		fclose(ofp);
355 		ofp = NULL;
356 		lseek(fdin, SEEK_SET, 0);
357 	}
358 
359 	return (fdin);
360 
361 err:
362 	if (fd != -1)
363 		close(fd);
364 	if (fdin != -1)
365 		close(fdin);
366 	if (fdout != -1)
367 		close(fdout);
368 	if (ifp)
369 		fclose(ifp);
370 	if (ofp)
371 		fclose(ofp);
372 	return -1;
373 }
374 
375 int
376 queue_message_fd_rw(uint32_t msgid)
377 {
378 	char buf[PATH_MAX];
379 
380 	queue_message_path(msgid, buf, sizeof(buf));
381 
382 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
383 }
384 
385 static int
386 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
387 {
388 	char   *evp;
389 	size_t	evplen;
390 	size_t	complen;
391 	char	compbuf[sizeof(struct envelope)];
392 	size_t	enclen;
393 	char	encbuf[sizeof(struct envelope)];
394 
395 	evp = evpbuf;
396 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
397 	if (evplen == 0)
398 		return (0);
399 
400 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
401 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
402 		if (complen == 0)
403 			return (0);
404 		evp = compbuf;
405 		evplen = complen;
406 	}
407 
408 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
409 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
410 		if (enclen == 0)
411 			return (0);
412 		evp = encbuf;
413 		evplen = enclen;
414 	}
415 
416 	memmove(evpbuf, evp, evplen);
417 
418 	return (evplen);
419 }
420 
421 static int
422 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
423 {
424 	char		*evp;
425 	size_t		 evplen;
426 	char		 compbuf[sizeof(struct envelope)];
427 	size_t		 complen;
428 	char		 encbuf[sizeof(struct envelope)];
429 	size_t		 enclen;
430 
431 	evp = evpbuf;
432 	evplen = evpbufsize;
433 
434 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
435 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
436 		if (enclen == 0)
437 			return (0);
438 		evp = encbuf;
439 		evplen = enclen;
440 	}
441 
442 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
443 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
444 		if (complen == 0)
445 			return (0);
446 		evp = compbuf;
447 		evplen = complen;
448 	}
449 
450 	return (envelope_load_buffer(ep, evp, evplen));
451 }
452 
453 static void
454 queue_envelope_cache_add(struct envelope *e)
455 {
456 	struct envelope *cached;
457 
458 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
459 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
460 
461 	cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add");
462 	*cached = *e;
463 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
464 	tree_xset(&evpcache_tree, e->id, cached);
465 	stat_increment("queue.evpcache.size", 1);
466 }
467 
468 static void
469 queue_envelope_cache_update(struct envelope *e)
470 {
471 	struct envelope *cached;
472 
473 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
474 		queue_envelope_cache_add(e);
475 		stat_increment("queue.evpcache.update.missed", 1);
476 	} else {
477 		TAILQ_REMOVE(&evpcache_list, cached, entry);
478 		*cached = *e;
479 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
480 		stat_increment("queue.evpcache.update.hit", 1);
481 	}
482 }
483 
484 static void
485 queue_envelope_cache_del(uint64_t evpid)
486 {
487 	struct envelope *cached;
488 
489 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
490 		return;
491 
492 	TAILQ_REMOVE(&evpcache_list, cached, entry);
493 	free(cached);
494 	stat_decrement("queue.evpcache.size", 1);
495 }
496 
497 int
498 queue_envelope_create(struct envelope *ep)
499 {
500 	int		 r;
501 	char		 evpbuf[sizeof(struct envelope)];
502 	size_t		 evplen;
503 	uint64_t	 evpid;
504 	uint32_t	 msgid;
505 
506 	ep->creation = time(NULL);
507 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
508 	if (evplen == 0)
509 		return (0);
510 
511 	evpid = ep->id;
512 	msgid = evpid_to_msgid(evpid);
513 
514 	profile_enter("queue_envelope_create");
515 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
516 	profile_leave();
517 
518 	log_trace(TRACE_QUEUE,
519 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
520 	    evpid, evplen, r, ep->id);
521 
522 	if (!r) {
523 		ep->creation = 0;
524 		ep->id = 0;
525 	}
526 
527 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
528 		queue_envelope_cache_add(ep);
529 
530 	return (r);
531 }
532 
533 int
534 queue_envelope_delete(uint64_t evpid)
535 {
536 	int	r;
537 
538 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
539 		queue_envelope_cache_del(evpid);
540 
541 	profile_enter("queue_envelope_delete");
542 	r = handler_envelope_delete(evpid);
543 	profile_leave();
544 
545 	log_trace(TRACE_QUEUE,
546 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
547 	    evpid, r);
548 
549 	return (r);
550 }
551 
552 int
553 queue_envelope_load(uint64_t evpid, struct envelope *ep)
554 {
555 	const char	*e;
556 	char		 evpbuf[sizeof(struct envelope)];
557 	size_t		 evplen;
558 	struct envelope	*cached;
559 
560 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
561 	    (cached = tree_get(&evpcache_tree, evpid))) {
562 		*ep = *cached;
563 		stat_increment("queue.evpcache.load.hit", 1);
564 		return (1);
565 	}
566 
567 	ep->id = evpid;
568 	profile_enter("queue_envelope_load");
569 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
570 	profile_leave();
571 
572 	log_trace(TRACE_QUEUE,
573 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
574 	    evpid, evplen);
575 
576 	if (evplen == 0)
577 		return (0);
578 
579 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
580 		if ((e = envelope_validate(ep)) == NULL) {
581 			ep->id = evpid;
582 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
583 				queue_envelope_cache_add(ep);
584 				stat_increment("queue.evpcache.load.missed", 1);
585 			}
586 			return (1);
587 		}
588 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
589 		    ep->id, e);
590 	}
591 
592 	(void)queue_message_corrupt(evpid_to_msgid(evpid));
593 	return (0);
594 }
595 
596 int
597 queue_envelope_update(struct envelope *ep)
598 {
599 	char	evpbuf[sizeof(struct envelope)];
600 	size_t	evplen;
601 	int	r;
602 
603 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
604 	if (evplen == 0)
605 		return (0);
606 
607 	profile_enter("queue_envelope_update");
608 	r = handler_envelope_update(ep->id, evpbuf, evplen);
609 	profile_leave();
610 
611 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
612 		queue_envelope_cache_update(ep);
613 
614 	log_trace(TRACE_QUEUE,
615 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
616 	    ep->id, r);
617 
618 	return (r);
619 }
620 
621 int
622 queue_envelope_walk(struct envelope *ep)
623 {
624 	const char	*e;
625 	uint64_t	 evpid;
626 	char		 evpbuf[sizeof(struct envelope)];
627 	int		 r;
628 
629 	profile_enter("queue_envelope_walk");
630 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
631 	profile_leave();
632 
633 	log_trace(TRACE_QUEUE,
634 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
635 	    r, evpid);
636 
637 	if (r == -1)
638 		return (r);
639 
640 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
641 		if ((e = envelope_validate(ep)) == NULL) {
642 			ep->id = evpid;
643 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
644 				queue_envelope_cache_add(ep);
645 			return (1);
646 		}
647 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
648 		    ep->id, e);
649 		(void)queue_message_corrupt(evpid_to_msgid(evpid));
650 	}
651 
652 	return (0);
653 }
654 
655 uint32_t
656 queue_generate_msgid(void)
657 {
658 	uint32_t msgid;
659 
660 	while ((msgid = arc4random()) == 0)
661 		;
662 
663 	return msgid;
664 }
665 
666 uint64_t
667 queue_generate_evpid(uint32_t msgid)
668 {
669 	uint32_t rnd;
670 	uint64_t evpid;
671 
672 	while ((rnd = arc4random()) == 0)
673 		;
674 
675 	evpid = msgid;
676 	evpid <<= 32;
677 	evpid |= rnd;
678 
679 	return evpid;
680 }
681 
682 static const char*
683 envelope_validate(struct envelope *ep)
684 {
685 	if (ep->version != SMTPD_ENVELOPE_VERSION)
686 		return "version mismatch";
687 
688 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
689 		return "invalid helo";
690 	if (ep->helo[0] == '\0')
691 		return "empty helo";
692 
693 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
694 		return "invalid hostname";
695 	if (ep->hostname[0] == '\0')
696 		return "empty hostname";
697 
698 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
699 		return "invalid error line";
700 
701 	return NULL;
702 }
703 
704 void
705 queue_api_on_close(int(*cb)(void))
706 {
707 	handler_close = cb;
708 }
709 
710 void
711 queue_api_on_message_create(int(*cb)(uint32_t *))
712 {
713 	handler_message_create = cb;
714 }
715 
716 void
717 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
718 {
719 	handler_message_commit = cb;
720 }
721 
722 void
723 queue_api_on_message_delete(int(*cb)(uint32_t))
724 {
725 	handler_message_delete = cb;
726 }
727 
728 void
729 queue_api_on_message_fd_r(int(*cb)(uint32_t))
730 {
731 	handler_message_fd_r = cb;
732 }
733 
734 void
735 queue_api_on_message_corrupt(int(*cb)(uint32_t))
736 {
737 	handler_message_corrupt = cb;
738 }
739 
740 void
741 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
742 {
743 	handler_envelope_create = cb;
744 }
745 
746 void
747 queue_api_on_envelope_delete(int(*cb)(uint64_t))
748 {
749 	handler_envelope_delete = cb;
750 }
751 
752 void
753 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
754 {
755 	handler_envelope_update = cb;
756 }
757 
758 void
759 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
760 {
761 	handler_envelope_load = cb;
762 }
763 
764 void
765 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
766 {
767 	handler_envelope_walk = cb;
768 }
769