xref: /openbsd-src/usr.sbin/smtpd/queue_backend.c (revision ef15259e1ab8c01faa1ca1fa308a6f92fdb3f80a)
1 /*	$OpenBSD: queue_backend.c,v 1.63 2018/05/14 15:23:05 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <grp.h>
31 #include <imsg.h>
32 #include <limits.h>
33 #include <inttypes.h>
34 #include <libgen.h>
35 #include <pwd.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <time.h>
40 #include <unistd.h>
41 
42 #include "smtpd.h"
43 #include "log.h"
44 
45 static const char* envelope_validate(struct envelope *);
46 
47 extern struct queue_backend	queue_backend_fs;
48 extern struct queue_backend	queue_backend_null;
49 extern struct queue_backend	queue_backend_proc;
50 extern struct queue_backend	queue_backend_ram;
51 
52 static void queue_envelope_cache_add(struct envelope *);
53 static void queue_envelope_cache_update(struct envelope *);
54 static void queue_envelope_cache_del(uint64_t evpid);
55 
56 TAILQ_HEAD(evplst, envelope);
57 
58 static struct tree		evpcache_tree;
59 static struct evplst		evpcache_list;
60 static struct queue_backend	*backend;
61 
62 static int (*handler_close)(void);
63 static int (*handler_message_create)(uint32_t *);
64 static int (*handler_message_commit)(uint32_t, const char*);
65 static int (*handler_message_delete)(uint32_t);
66 static int (*handler_message_fd_r)(uint32_t);
67 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
68 static int (*handler_envelope_delete)(uint64_t);
69 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
70 static int (*handler_envelope_load)(uint64_t, char *, size_t);
71 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
72 static int (*handler_message_walk)(uint64_t *, char *, size_t,
73     uint32_t, int *, void **);
74 
75 #ifdef QUEUE_PROFILING
76 
77 static struct {
78 	struct timespec	 t0;
79 	const char	*name;
80 } profile;
81 
82 static inline void profile_enter(const char *name)
83 {
84 	if ((profiling & PROFILE_QUEUE) == 0)
85 		return;
86 
87 	profile.name = name;
88 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
89 }
90 
91 static inline void profile_leave(void)
92 {
93 	struct timespec	 t1, dt;
94 
95 	if ((profiling & PROFILE_QUEUE) == 0)
96 		return;
97 
98 	clock_gettime(CLOCK_MONOTONIC, &t1);
99 	timespecsub(&t1, &profile.t0, &dt);
100 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
101 	    (long long)dt.tv_sec, dt.tv_nsec);
102 }
103 #else
104 #define profile_enter(x)	do {} while (0)
105 #define profile_leave()		do {} while (0)
106 #endif
107 
108 static int
109 queue_message_path(uint32_t msgid, char *buf, size_t len)
110 {
111 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
112 }
113 
114 int
115 queue_init(const char *name, int server)
116 {
117 	struct passwd	*pwq;
118 	struct group	*gr;
119 	int		 r;
120 
121 	pwq = getpwnam(SMTPD_QUEUE_USER);
122 	if (pwq == NULL)
123 		errx(1, "unknown user %s", SMTPD_QUEUE_USER);
124 
125 	gr = getgrnam(SMTPD_QUEUE_GROUP);
126 	if (gr == NULL)
127 		errx(1, "unknown group %s", SMTPD_QUEUE_GROUP);
128 
129 	tree_init(&evpcache_tree);
130 	TAILQ_INIT(&evpcache_list);
131 
132 	if (!strcmp(name, "fs"))
133 		backend = &queue_backend_fs;
134 	else if (!strcmp(name, "null"))
135 		backend = &queue_backend_null;
136 	else if (!strcmp(name, "ram"))
137 		backend = &queue_backend_ram;
138 	else
139 		backend = &queue_backend_proc;
140 
141 	if (server) {
142 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
143 			errx(1, "error in spool directory setup");
144 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
145 			errx(1, "error in offline directory setup");
146 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
147 			errx(1, "error in purge directory setup");
148 
149 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
150 
151 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
152 			errx(1, "error in purge directory setup");
153 	}
154 
155 	r = backend->init(pwq, server, name);
156 
157 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
158 
159 	return (r);
160 }
161 
162 int
163 queue_close(void)
164 {
165 	if (handler_close)
166 		return (handler_close());
167 
168 	return (1);
169 }
170 
171 int
172 queue_message_create(uint32_t *msgid)
173 {
174 	int	r;
175 
176 	profile_enter("queue_message_create");
177 	r = handler_message_create(msgid);
178 	profile_leave();
179 
180 	log_trace(TRACE_QUEUE,
181 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
182 	    r, *msgid);
183 
184 	return (r);
185 }
186 
187 int
188 queue_message_delete(uint32_t msgid)
189 {
190 	char	msgpath[PATH_MAX];
191 	uint64_t evpid;
192 	void   *iter;
193 	int	r;
194 
195 	profile_enter("queue_message_delete");
196 	r = handler_message_delete(msgid);
197 	profile_leave();
198 
199 	/* in case the message is incoming */
200 	queue_message_path(msgid, msgpath, sizeof(msgpath));
201 	unlink(msgpath);
202 
203 	/* remove remaining envelopes from the cache if any (on rollback) */
204 	evpid = msgid_to_evpid(msgid);
205 	for (;;) {
206 		iter = NULL;
207 		if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
208 			break;
209 		if (evpid_to_msgid(evpid) != msgid)
210 			break;
211 		queue_envelope_cache_del(evpid);
212 	}
213 
214 	log_trace(TRACE_QUEUE,
215 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
216 
217 	return (r);
218 }
219 
220 int
221 queue_message_commit(uint32_t msgid)
222 {
223 	int	r;
224 	char	msgpath[PATH_MAX];
225 	char	tmppath[PATH_MAX];
226 	FILE	*ifp = NULL;
227 	FILE	*ofp = NULL;
228 
229 	profile_enter("queue_message_commit");
230 
231 	queue_message_path(msgid, msgpath, sizeof(msgpath));
232 
233 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
234 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
235 		ifp = fopen(msgpath, "r");
236 		ofp = fopen(tmppath, "w+");
237 		if (ifp == NULL || ofp == NULL)
238 			goto err;
239 		if (!compress_file(ifp, ofp))
240 			goto err;
241 		fclose(ifp);
242 		fclose(ofp);
243 		ifp = NULL;
244 		ofp = NULL;
245 
246 		if (rename(tmppath, msgpath) == -1) {
247 			if (errno == ENOSPC)
248 				return (0);
249 			unlink(tmppath);
250 			log_warn("rename");
251 			return (0);
252 		}
253 	}
254 
255 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
256 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
257 		ifp = fopen(msgpath, "r");
258 		ofp = fopen(tmppath, "w+");
259 		if (ifp == NULL || ofp == NULL)
260 			goto err;
261 		if (!crypto_encrypt_file(ifp, ofp))
262 			goto err;
263 		fclose(ifp);
264 		fclose(ofp);
265 		ifp = NULL;
266 		ofp = NULL;
267 
268 		if (rename(tmppath, msgpath) == -1) {
269 			if (errno == ENOSPC)
270 				return (0);
271 			unlink(tmppath);
272 			log_warn("rename");
273 			return (0);
274 		}
275 	}
276 
277 	r = handler_message_commit(msgid, msgpath);
278 	profile_leave();
279 
280 	/* in case it's not done by the backend */
281 	unlink(msgpath);
282 
283 	log_trace(TRACE_QUEUE,
284 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
285 	    msgid, r);
286 
287 	return (r);
288 
289 err:
290 	if (ifp)
291 		fclose(ifp);
292 	if (ofp)
293 		fclose(ofp);
294 	return 0;
295 }
296 
297 int
298 queue_message_fd_r(uint32_t msgid)
299 {
300 	int	fdin = -1, fdout = -1, fd = -1;
301 	FILE	*ifp = NULL;
302 	FILE	*ofp = NULL;
303 
304 	profile_enter("queue_message_fd_r");
305 	fdin = handler_message_fd_r(msgid);
306 	profile_leave();
307 
308 	log_trace(TRACE_QUEUE,
309 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
310 
311 	if (fdin == -1)
312 		return (-1);
313 
314 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
315 		if ((fdout = mktmpfile()) == -1)
316 			goto err;
317 		if ((fd = dup(fdout)) == -1)
318 			goto err;
319 		if ((ifp = fdopen(fdin, "r")) == NULL)
320 			goto err;
321 		fdin = fd;
322 		fd = -1;
323 		if ((ofp = fdopen(fdout, "w+")) == NULL)
324 			goto err;
325 
326 		if (!crypto_decrypt_file(ifp, ofp))
327 			goto err;
328 
329 		fclose(ifp);
330 		ifp = NULL;
331 		fclose(ofp);
332 		ofp = NULL;
333 		lseek(fdin, SEEK_SET, 0);
334 	}
335 
336 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
337 		if ((fdout = mktmpfile()) == -1)
338 			goto err;
339 		if ((fd = dup(fdout)) == -1)
340 			goto err;
341 		if ((ifp = fdopen(fdin, "r")) == NULL)
342 			goto err;
343 		fdin = fd;
344 		fd = -1;
345 		if ((ofp = fdopen(fdout, "w+")) == NULL)
346 			goto err;
347 
348 		if (!uncompress_file(ifp, ofp))
349 			goto err;
350 
351 		fclose(ifp);
352 		ifp = NULL;
353 		fclose(ofp);
354 		ofp = NULL;
355 		lseek(fdin, SEEK_SET, 0);
356 	}
357 
358 	return (fdin);
359 
360 err:
361 	if (fd != -1)
362 		close(fd);
363 	if (fdin != -1)
364 		close(fdin);
365 	if (fdout != -1)
366 		close(fdout);
367 	if (ifp)
368 		fclose(ifp);
369 	if (ofp)
370 		fclose(ofp);
371 	return -1;
372 }
373 
374 int
375 queue_message_fd_rw(uint32_t msgid)
376 {
377 	char buf[PATH_MAX];
378 
379 	queue_message_path(msgid, buf, sizeof(buf));
380 
381 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
382 }
383 
384 static int
385 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
386 {
387 	char   *evp;
388 	size_t	evplen;
389 	size_t	complen;
390 	char	compbuf[sizeof(struct envelope)];
391 	size_t	enclen;
392 	char	encbuf[sizeof(struct envelope)];
393 
394 	evp = evpbuf;
395 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
396 	if (evplen == 0)
397 		return (0);
398 
399 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
400 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
401 		if (complen == 0)
402 			return (0);
403 		evp = compbuf;
404 		evplen = complen;
405 	}
406 
407 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
408 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
409 		if (enclen == 0)
410 			return (0);
411 		evp = encbuf;
412 		evplen = enclen;
413 	}
414 
415 	memmove(evpbuf, evp, evplen);
416 
417 	return (evplen);
418 }
419 
420 static int
421 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
422 {
423 	char		*evp;
424 	size_t		 evplen;
425 	char		 compbuf[sizeof(struct envelope)];
426 	size_t		 complen;
427 	char		 encbuf[sizeof(struct envelope)];
428 	size_t		 enclen;
429 
430 	evp = evpbuf;
431 	evplen = evpbufsize;
432 
433 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
434 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
435 		if (enclen == 0)
436 			return (0);
437 		evp = encbuf;
438 		evplen = enclen;
439 	}
440 
441 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
442 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
443 		if (complen == 0)
444 			return (0);
445 		evp = compbuf;
446 		evplen = complen;
447 	}
448 
449 	return (envelope_load_buffer(ep, evp, evplen));
450 }
451 
452 static void
453 queue_envelope_cache_add(struct envelope *e)
454 {
455 	struct envelope *cached;
456 
457 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
458 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
459 
460 	cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add");
461 	*cached = *e;
462 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
463 	tree_xset(&evpcache_tree, e->id, cached);
464 	stat_increment("queue.evpcache.size", 1);
465 }
466 
467 static void
468 queue_envelope_cache_update(struct envelope *e)
469 {
470 	struct envelope *cached;
471 
472 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
473 		queue_envelope_cache_add(e);
474 		stat_increment("queue.evpcache.update.missed", 1);
475 	} else {
476 		TAILQ_REMOVE(&evpcache_list, cached, entry);
477 		*cached = *e;
478 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
479 		stat_increment("queue.evpcache.update.hit", 1);
480 	}
481 }
482 
483 static void
484 queue_envelope_cache_del(uint64_t evpid)
485 {
486 	struct envelope *cached;
487 
488 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
489 		return;
490 
491 	TAILQ_REMOVE(&evpcache_list, cached, entry);
492 	free(cached);
493 	stat_decrement("queue.evpcache.size", 1);
494 }
495 
496 int
497 queue_envelope_create(struct envelope *ep)
498 {
499 	int		 r;
500 	char		 evpbuf[sizeof(struct envelope)];
501 	size_t		 evplen;
502 	uint64_t	 evpid;
503 	uint32_t	 msgid;
504 
505 	ep->creation = time(NULL);
506 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
507 	if (evplen == 0)
508 		return (0);
509 
510 	evpid = ep->id;
511 	msgid = evpid_to_msgid(evpid);
512 
513 	profile_enter("queue_envelope_create");
514 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
515 	profile_leave();
516 
517 	log_trace(TRACE_QUEUE,
518 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
519 	    evpid, evplen, r, ep->id);
520 
521 	if (!r) {
522 		ep->creation = 0;
523 		ep->id = 0;
524 	}
525 
526 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
527 		queue_envelope_cache_add(ep);
528 
529 	return (r);
530 }
531 
532 int
533 queue_envelope_delete(uint64_t evpid)
534 {
535 	int	r;
536 
537 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
538 		queue_envelope_cache_del(evpid);
539 
540 	profile_enter("queue_envelope_delete");
541 	r = handler_envelope_delete(evpid);
542 	profile_leave();
543 
544 	log_trace(TRACE_QUEUE,
545 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
546 	    evpid, r);
547 
548 	return (r);
549 }
550 
551 int
552 queue_envelope_load(uint64_t evpid, struct envelope *ep)
553 {
554 	const char	*e;
555 	char		 evpbuf[sizeof(struct envelope)];
556 	size_t		 evplen;
557 	struct envelope	*cached;
558 
559 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
560 	    (cached = tree_get(&evpcache_tree, evpid))) {
561 		*ep = *cached;
562 		stat_increment("queue.evpcache.load.hit", 1);
563 		return (1);
564 	}
565 
566 	ep->id = evpid;
567 	profile_enter("queue_envelope_load");
568 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
569 	profile_leave();
570 
571 	log_trace(TRACE_QUEUE,
572 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
573 	    evpid, evplen);
574 
575 	if (evplen == 0)
576 		return (0);
577 
578 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
579 		if ((e = envelope_validate(ep)) == NULL) {
580 			ep->id = evpid;
581 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
582 				queue_envelope_cache_add(ep);
583 				stat_increment("queue.evpcache.load.missed", 1);
584 			}
585 			return (1);
586 		}
587 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
588 		    evpid, e);
589 	}
590 	return (0);
591 }
592 
593 int
594 queue_envelope_update(struct envelope *ep)
595 {
596 	char	evpbuf[sizeof(struct envelope)];
597 	size_t	evplen;
598 	int	r;
599 
600 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
601 	if (evplen == 0)
602 		return (0);
603 
604 	profile_enter("queue_envelope_update");
605 	r = handler_envelope_update(ep->id, evpbuf, evplen);
606 	profile_leave();
607 
608 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
609 		queue_envelope_cache_update(ep);
610 
611 	log_trace(TRACE_QUEUE,
612 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
613 	    ep->id, r);
614 
615 	return (r);
616 }
617 
618 int
619 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
620 {
621 	char		 evpbuf[sizeof(struct envelope)];
622 	uint64_t	 evpid;
623 	int		 r;
624 	const char	*e;
625 
626 	profile_enter("queue_message_walk");
627 	r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
628 	    msgid, done, data);
629 	profile_leave();
630 
631 	log_trace(TRACE_QUEUE,
632 	    "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
633 	    r, evpid);
634 
635 	if (r == -1)
636 		return (r);
637 
638 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
639 		if ((e = envelope_validate(ep)) == NULL) {
640 			ep->id = evpid;
641 			/*
642 			 * do not cache the envelope here, while discovering
643 			 * envelopes one could re-run discover on already
644 			 * scheduled envelopes which leads to triggering of
645 			 * strict checks in caching. Envelopes could anyway
646 			 * be loaded from backend if it isn't cached.
647 			 */
648 			return (1);
649 		}
650 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
651 		    evpid, e);
652 	}
653 	return (0);
654 }
655 
656 int
657 queue_envelope_walk(struct envelope *ep)
658 {
659 	const char	*e;
660 	uint64_t	 evpid;
661 	char		 evpbuf[sizeof(struct envelope)];
662 	int		 r;
663 
664 	profile_enter("queue_envelope_walk");
665 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
666 	profile_leave();
667 
668 	log_trace(TRACE_QUEUE,
669 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
670 	    r, evpid);
671 
672 	if (r == -1)
673 		return (r);
674 
675 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
676 		if ((e = envelope_validate(ep)) == NULL) {
677 			ep->id = evpid;
678 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
679 				queue_envelope_cache_add(ep);
680 			return (1);
681 		}
682 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
683 		    evpid, e);
684 	}
685 	return (0);
686 }
687 
688 uint32_t
689 queue_generate_msgid(void)
690 {
691 	uint32_t msgid;
692 
693 	while ((msgid = arc4random()) == 0)
694 		;
695 
696 	return msgid;
697 }
698 
699 uint64_t
700 queue_generate_evpid(uint32_t msgid)
701 {
702 	uint32_t rnd;
703 	uint64_t evpid;
704 
705 	while ((rnd = arc4random()) == 0)
706 		;
707 
708 	evpid = msgid;
709 	evpid <<= 32;
710 	evpid |= rnd;
711 
712 	return evpid;
713 }
714 
715 static const char*
716 envelope_validate(struct envelope *ep)
717 {
718 	if (ep->version != SMTPD_ENVELOPE_VERSION)
719 		return "version mismatch";
720 
721 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
722 		return "invalid helo";
723 	if (ep->helo[0] == '\0')
724 		return "empty helo";
725 
726 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
727 		return "invalid hostname";
728 	if (ep->hostname[0] == '\0')
729 		return "empty hostname";
730 
731 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
732 		return "invalid error line";
733 
734 	return NULL;
735 }
736 
737 void
738 queue_api_on_close(int(*cb)(void))
739 {
740 	handler_close = cb;
741 }
742 
743 void
744 queue_api_on_message_create(int(*cb)(uint32_t *))
745 {
746 	handler_message_create = cb;
747 }
748 
749 void
750 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
751 {
752 	handler_message_commit = cb;
753 }
754 
755 void
756 queue_api_on_message_delete(int(*cb)(uint32_t))
757 {
758 	handler_message_delete = cb;
759 }
760 
761 void
762 queue_api_on_message_fd_r(int(*cb)(uint32_t))
763 {
764 	handler_message_fd_r = cb;
765 }
766 
767 void
768 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
769 {
770 	handler_envelope_create = cb;
771 }
772 
773 void
774 queue_api_on_envelope_delete(int(*cb)(uint64_t))
775 {
776 	handler_envelope_delete = cb;
777 }
778 
779 void
780 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
781 {
782 	handler_envelope_update = cb;
783 }
784 
785 void
786 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
787 {
788 	handler_envelope_load = cb;
789 }
790 
791 void
792 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
793 {
794 	handler_envelope_walk = cb;
795 }
796 
797 void
798 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
799     uint32_t, int *, void **))
800 {
801 	handler_message_walk = cb;
802 }
803