xref: /openbsd-src/usr.sbin/smtpd/queue_backend.c (revision ff01b04420a9b0b926042e0f7faa33b099bedb0a)
1 /*	$OpenBSD: queue_backend.c,v 1.67 2021/05/26 18:08:55 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <errno.h>
27 #include <event.h>
28 #include <fcntl.h>
29 #include <grp.h>
30 #include <imsg.h>
31 #include <limits.h>
32 #include <inttypes.h>
33 #include <pwd.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 
40 #include "smtpd.h"
41 #include "log.h"
42 
43 static const char* envelope_validate(struct envelope *);
44 
45 extern struct queue_backend	queue_backend_fs;
46 extern struct queue_backend	queue_backend_null;
47 extern struct queue_backend	queue_backend_proc;
48 extern struct queue_backend	queue_backend_ram;
49 
50 static void queue_envelope_cache_add(struct envelope *);
51 static void queue_envelope_cache_update(struct envelope *);
52 static void queue_envelope_cache_del(uint64_t evpid);
53 
54 TAILQ_HEAD(evplst, envelope);
55 
56 static struct tree		evpcache_tree;
57 static struct evplst		evpcache_list;
58 static struct queue_backend	*backend;
59 
60 static int (*handler_close)(void);
61 static int (*handler_message_create)(uint32_t *);
62 static int (*handler_message_commit)(uint32_t, const char*);
63 static int (*handler_message_delete)(uint32_t);
64 static int (*handler_message_fd_r)(uint32_t);
65 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
66 static int (*handler_envelope_delete)(uint64_t);
67 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
68 static int (*handler_envelope_load)(uint64_t, char *, size_t);
69 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
70 static int (*handler_message_walk)(uint64_t *, char *, size_t,
71     uint32_t, int *, void **);
72 
73 #ifdef QUEUE_PROFILING
74 
75 static struct {
76 	struct timespec	 t0;
77 	const char	*name;
78 } profile;
79 
80 static inline void profile_enter(const char *name)
81 {
82 	if ((profiling & PROFILE_QUEUE) == 0)
83 		return;
84 
85 	profile.name = name;
86 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
87 }
88 
89 static inline void profile_leave(void)
90 {
91 	struct timespec	 t1, dt;
92 
93 	if ((profiling & PROFILE_QUEUE) == 0)
94 		return;
95 
96 	clock_gettime(CLOCK_MONOTONIC, &t1);
97 	timespecsub(&t1, &profile.t0, &dt);
98 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
99 	    (long long)dt.tv_sec, dt.tv_nsec);
100 }
101 #else
102 #define profile_enter(x)	do {} while (0)
103 #define profile_leave()		do {} while (0)
104 #endif
105 
106 static int
107 queue_message_path(uint32_t msgid, char *buf, size_t len)
108 {
109 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
110 }
111 
112 int
113 queue_init(const char *name, int server)
114 {
115 	struct passwd	*pwq;
116 	struct group	*gr;
117 	int		 r;
118 
119 	pwq = getpwnam(SMTPD_QUEUE_USER);
120 	if (pwq == NULL)
121 		fatalx("unknown user %s", SMTPD_QUEUE_USER);
122 
123 	gr = getgrnam(SMTPD_QUEUE_GROUP);
124 	if (gr == NULL)
125 		fatalx("unknown group %s", SMTPD_QUEUE_GROUP);
126 
127 	tree_init(&evpcache_tree);
128 	TAILQ_INIT(&evpcache_list);
129 
130 	if (!strcmp(name, "fs"))
131 		backend = &queue_backend_fs;
132 	else if (!strcmp(name, "null"))
133 		backend = &queue_backend_null;
134 	else if (!strcmp(name, "ram"))
135 		backend = &queue_backend_ram;
136 	else
137 		backend = &queue_backend_proc;
138 
139 	if (server) {
140 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
141 			fatalx("error in spool directory setup");
142 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
143 			fatalx("error in offline directory setup");
144 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
145 			fatalx("error in purge directory setup");
146 
147 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
148 
149 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
150 			fatalx("error in purge directory setup");
151 	}
152 
153 	r = backend->init(pwq, server, name);
154 
155 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
156 
157 	return (r);
158 }
159 
160 int
161 queue_close(void)
162 {
163 	if (handler_close)
164 		return (handler_close());
165 
166 	return (1);
167 }
168 
169 int
170 queue_message_create(uint32_t *msgid)
171 {
172 	int	r;
173 
174 	profile_enter("queue_message_create");
175 	r = handler_message_create(msgid);
176 	profile_leave();
177 
178 	log_trace(TRACE_QUEUE,
179 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
180 	    r, *msgid);
181 
182 	return (r);
183 }
184 
185 int
186 queue_message_delete(uint32_t msgid)
187 {
188 	char	msgpath[PATH_MAX];
189 	uint64_t evpid;
190 	void   *iter;
191 	int	r;
192 
193 	profile_enter("queue_message_delete");
194 	r = handler_message_delete(msgid);
195 	profile_leave();
196 
197 	/* in case the message is incoming */
198 	queue_message_path(msgid, msgpath, sizeof(msgpath));
199 	unlink(msgpath);
200 
201 	/* remove remaining envelopes from the cache if any (on rollback) */
202 	evpid = msgid_to_evpid(msgid);
203 	for (;;) {
204 		iter = NULL;
205 		if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
206 			break;
207 		if (evpid_to_msgid(evpid) != msgid)
208 			break;
209 		queue_envelope_cache_del(evpid);
210 	}
211 
212 	log_trace(TRACE_QUEUE,
213 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
214 
215 	return (r);
216 }
217 
218 int
219 queue_message_commit(uint32_t msgid)
220 {
221 	int	r;
222 	char	msgpath[PATH_MAX];
223 	char	tmppath[PATH_MAX];
224 	FILE	*ifp = NULL;
225 	FILE	*ofp = NULL;
226 
227 	profile_enter("queue_message_commit");
228 
229 	queue_message_path(msgid, msgpath, sizeof(msgpath));
230 
231 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
232 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
233 		ifp = fopen(msgpath, "r");
234 		ofp = fopen(tmppath, "w+");
235 		if (ifp == NULL || ofp == NULL)
236 			goto err;
237 		if (!compress_file(ifp, ofp))
238 			goto err;
239 		fclose(ifp);
240 		fclose(ofp);
241 		ifp = NULL;
242 		ofp = NULL;
243 
244 		if (rename(tmppath, msgpath) == -1) {
245 			if (errno == ENOSPC)
246 				return (0);
247 			unlink(tmppath);
248 			log_warn("rename");
249 			return (0);
250 		}
251 	}
252 
253 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
254 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
255 		ifp = fopen(msgpath, "r");
256 		ofp = fopen(tmppath, "w+");
257 		if (ifp == NULL || ofp == NULL)
258 			goto err;
259 		if (!crypto_encrypt_file(ifp, ofp))
260 			goto err;
261 		fclose(ifp);
262 		fclose(ofp);
263 		ifp = NULL;
264 		ofp = NULL;
265 
266 		if (rename(tmppath, msgpath) == -1) {
267 			if (errno == ENOSPC)
268 				return (0);
269 			unlink(tmppath);
270 			log_warn("rename");
271 			return (0);
272 		}
273 	}
274 
275 	r = handler_message_commit(msgid, msgpath);
276 	profile_leave();
277 
278 	/* in case it's not done by the backend */
279 	unlink(msgpath);
280 
281 	log_trace(TRACE_QUEUE,
282 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
283 	    msgid, r);
284 
285 	return (r);
286 
287 err:
288 	if (ifp)
289 		fclose(ifp);
290 	if (ofp)
291 		fclose(ofp);
292 	return 0;
293 }
294 
295 int
296 queue_message_fd_r(uint32_t msgid)
297 {
298 	int	fdin = -1, fdout = -1, fd = -1;
299 	FILE	*ifp = NULL;
300 	FILE	*ofp = NULL;
301 
302 	profile_enter("queue_message_fd_r");
303 	fdin = handler_message_fd_r(msgid);
304 	profile_leave();
305 
306 	log_trace(TRACE_QUEUE,
307 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
308 
309 	if (fdin == -1)
310 		return (-1);
311 
312 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
313 		if ((fdout = mktmpfile()) == -1)
314 			goto err;
315 		if ((fd = dup(fdout)) == -1)
316 			goto err;
317 		if ((ifp = fdopen(fdin, "r")) == NULL)
318 			goto err;
319 		fdin = fd;
320 		fd = -1;
321 		if ((ofp = fdopen(fdout, "w+")) == NULL)
322 			goto err;
323 
324 		if (!crypto_decrypt_file(ifp, ofp))
325 			goto err;
326 
327 		fclose(ifp);
328 		ifp = NULL;
329 		fclose(ofp);
330 		ofp = NULL;
331 		lseek(fdin, SEEK_SET, 0);
332 	}
333 
334 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
335 		if ((fdout = mktmpfile()) == -1)
336 			goto err;
337 		if ((fd = dup(fdout)) == -1)
338 			goto err;
339 		if ((ifp = fdopen(fdin, "r")) == NULL)
340 			goto err;
341 		fdin = fd;
342 		fd = -1;
343 		if ((ofp = fdopen(fdout, "w+")) == NULL)
344 			goto err;
345 
346 		if (!uncompress_file(ifp, ofp))
347 			goto err;
348 
349 		fclose(ifp);
350 		ifp = NULL;
351 		fclose(ofp);
352 		ofp = NULL;
353 		lseek(fdin, SEEK_SET, 0);
354 	}
355 
356 	return (fdin);
357 
358 err:
359 	if (fd != -1)
360 		close(fd);
361 	if (fdin != -1)
362 		close(fdin);
363 	if (fdout != -1)
364 		close(fdout);
365 	if (ifp)
366 		fclose(ifp);
367 	if (ofp)
368 		fclose(ofp);
369 	return -1;
370 }
371 
372 int
373 queue_message_fd_rw(uint32_t msgid)
374 {
375 	char buf[PATH_MAX];
376 
377 	queue_message_path(msgid, buf, sizeof(buf));
378 
379 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
380 }
381 
382 static int
383 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
384 {
385 	char   *evp;
386 	size_t	evplen;
387 	size_t	complen;
388 	char	compbuf[sizeof(struct envelope)];
389 	size_t	enclen;
390 	char	encbuf[sizeof(struct envelope)];
391 
392 	evp = evpbuf;
393 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
394 	if (evplen == 0)
395 		return (0);
396 
397 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
398 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
399 		if (complen == 0)
400 			return (0);
401 		evp = compbuf;
402 		evplen = complen;
403 	}
404 
405 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
406 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
407 		if (enclen == 0)
408 			return (0);
409 		evp = encbuf;
410 		evplen = enclen;
411 	}
412 
413 	memmove(evpbuf, evp, evplen);
414 
415 	return (evplen);
416 }
417 
418 static int
419 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
420 {
421 	char		*evp;
422 	size_t		 evplen;
423 	char		 compbuf[sizeof(struct envelope)];
424 	size_t		 complen;
425 	char		 encbuf[sizeof(struct envelope)];
426 	size_t		 enclen;
427 
428 	evp = evpbuf;
429 	evplen = evpbufsize;
430 
431 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
432 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
433 		if (enclen == 0)
434 			return (0);
435 		evp = encbuf;
436 		evplen = enclen;
437 	}
438 
439 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
440 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
441 		if (complen == 0)
442 			return (0);
443 		evp = compbuf;
444 		evplen = complen;
445 	}
446 
447 	return (envelope_load_buffer(ep, evp, evplen));
448 }
449 
450 static void
451 queue_envelope_cache_add(struct envelope *e)
452 {
453 	struct envelope *cached;
454 
455 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
456 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
457 
458 	cached = xcalloc(1, sizeof *cached);
459 	*cached = *e;
460 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
461 	tree_xset(&evpcache_tree, e->id, cached);
462 	stat_increment("queue.evpcache.size", 1);
463 }
464 
465 static void
466 queue_envelope_cache_update(struct envelope *e)
467 {
468 	struct envelope *cached;
469 
470 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
471 		queue_envelope_cache_add(e);
472 		stat_increment("queue.evpcache.update.missed", 1);
473 	} else {
474 		TAILQ_REMOVE(&evpcache_list, cached, entry);
475 		*cached = *e;
476 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
477 		stat_increment("queue.evpcache.update.hit", 1);
478 	}
479 }
480 
481 static void
482 queue_envelope_cache_del(uint64_t evpid)
483 {
484 	struct envelope *cached;
485 
486 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
487 		return;
488 
489 	TAILQ_REMOVE(&evpcache_list, cached, entry);
490 	free(cached);
491 	stat_decrement("queue.evpcache.size", 1);
492 }
493 
494 int
495 queue_envelope_create(struct envelope *ep)
496 {
497 	int		 r;
498 	char		 evpbuf[sizeof(struct envelope)];
499 	size_t		 evplen;
500 	uint64_t	 evpid;
501 	uint32_t	 msgid;
502 
503 	ep->creation = time(NULL);
504 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
505 	if (evplen == 0)
506 		return (0);
507 
508 	evpid = ep->id;
509 	msgid = evpid_to_msgid(evpid);
510 
511 	profile_enter("queue_envelope_create");
512 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
513 	profile_leave();
514 
515 	log_trace(TRACE_QUEUE,
516 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
517 	    evpid, evplen, r, ep->id);
518 
519 	if (!r) {
520 		ep->creation = 0;
521 		ep->id = 0;
522 	}
523 
524 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
525 		queue_envelope_cache_add(ep);
526 
527 	return (r);
528 }
529 
530 int
531 queue_envelope_delete(uint64_t evpid)
532 {
533 	int	r;
534 
535 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
536 		queue_envelope_cache_del(evpid);
537 
538 	profile_enter("queue_envelope_delete");
539 	r = handler_envelope_delete(evpid);
540 	profile_leave();
541 
542 	log_trace(TRACE_QUEUE,
543 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
544 	    evpid, r);
545 
546 	return (r);
547 }
548 
549 int
550 queue_envelope_load(uint64_t evpid, struct envelope *ep)
551 {
552 	const char	*e;
553 	char		 evpbuf[sizeof(struct envelope)];
554 	size_t		 evplen;
555 	struct envelope	*cached;
556 
557 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
558 	    (cached = tree_get(&evpcache_tree, evpid))) {
559 		*ep = *cached;
560 		stat_increment("queue.evpcache.load.hit", 1);
561 		return (1);
562 	}
563 
564 	ep->id = evpid;
565 	profile_enter("queue_envelope_load");
566 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
567 	profile_leave();
568 
569 	log_trace(TRACE_QUEUE,
570 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
571 	    evpid, evplen);
572 
573 	if (evplen == 0)
574 		return (0);
575 
576 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
577 		if ((e = envelope_validate(ep)) == NULL) {
578 			ep->id = evpid;
579 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
580 				queue_envelope_cache_add(ep);
581 				stat_increment("queue.evpcache.load.missed", 1);
582 			}
583 			return (1);
584 		}
585 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
586 		    evpid, e);
587 	}
588 	return (0);
589 }
590 
591 int
592 queue_envelope_update(struct envelope *ep)
593 {
594 	char	evpbuf[sizeof(struct envelope)];
595 	size_t	evplen;
596 	int	r;
597 
598 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
599 	if (evplen == 0)
600 		return (0);
601 
602 	profile_enter("queue_envelope_update");
603 	r = handler_envelope_update(ep->id, evpbuf, evplen);
604 	profile_leave();
605 
606 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
607 		queue_envelope_cache_update(ep);
608 
609 	log_trace(TRACE_QUEUE,
610 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
611 	    ep->id, r);
612 
613 	return (r);
614 }
615 
616 int
617 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
618 {
619 	char		 evpbuf[sizeof(struct envelope)];
620 	uint64_t	 evpid;
621 	int		 r;
622 	const char	*e;
623 
624 	profile_enter("queue_message_walk");
625 	r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
626 	    msgid, done, data);
627 	profile_leave();
628 
629 	log_trace(TRACE_QUEUE,
630 	    "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
631 	    r, evpid);
632 
633 	if (r == -1)
634 		return (r);
635 
636 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
637 		if ((e = envelope_validate(ep)) == NULL) {
638 			ep->id = evpid;
639 			/*
640 			 * do not cache the envelope here, while discovering
641 			 * envelopes one could re-run discover on already
642 			 * scheduled envelopes which leads to triggering of
643 			 * strict checks in caching. Envelopes could anyway
644 			 * be loaded from backend if it isn't cached.
645 			 */
646 			return (1);
647 		}
648 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
649 		    evpid, e);
650 	}
651 	return (0);
652 }
653 
654 int
655 queue_envelope_walk(struct envelope *ep)
656 {
657 	const char	*e;
658 	uint64_t	 evpid;
659 	char		 evpbuf[sizeof(struct envelope)];
660 	int		 r;
661 
662 	profile_enter("queue_envelope_walk");
663 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
664 	profile_leave();
665 
666 	log_trace(TRACE_QUEUE,
667 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
668 	    r, evpid);
669 
670 	if (r == -1)
671 		return (r);
672 
673 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
674 		if ((e = envelope_validate(ep)) == NULL) {
675 			ep->id = evpid;
676 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
677 				queue_envelope_cache_add(ep);
678 			return (1);
679 		}
680 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
681 		    evpid, e);
682 	}
683 	return (0);
684 }
685 
686 uint32_t
687 queue_generate_msgid(void)
688 {
689 	uint32_t msgid;
690 
691 	while ((msgid = arc4random()) == 0)
692 		;
693 
694 	return msgid;
695 }
696 
697 uint64_t
698 queue_generate_evpid(uint32_t msgid)
699 {
700 	uint32_t rnd;
701 	uint64_t evpid;
702 
703 	while ((rnd = arc4random()) == 0)
704 		;
705 
706 	evpid = msgid;
707 	evpid <<= 32;
708 	evpid |= rnd;
709 
710 	return evpid;
711 }
712 
713 static const char*
714 envelope_validate(struct envelope *ep)
715 {
716 	if (ep->version != SMTPD_ENVELOPE_VERSION)
717 		return "version mismatch";
718 
719 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
720 		return "invalid helo";
721 	if (ep->helo[0] == '\0')
722 		return "empty helo";
723 
724 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
725 		return "invalid hostname";
726 	if (ep->hostname[0] == '\0')
727 		return "empty hostname";
728 
729 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
730 		return "invalid error line";
731 
732 	if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL)
733 		return "unknown dispatcher";
734 
735 	return NULL;
736 }
737 
738 void
739 queue_api_on_close(int(*cb)(void))
740 {
741 	handler_close = cb;
742 }
743 
744 void
745 queue_api_on_message_create(int(*cb)(uint32_t *))
746 {
747 	handler_message_create = cb;
748 }
749 
750 void
751 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
752 {
753 	handler_message_commit = cb;
754 }
755 
756 void
757 queue_api_on_message_delete(int(*cb)(uint32_t))
758 {
759 	handler_message_delete = cb;
760 }
761 
762 void
763 queue_api_on_message_fd_r(int(*cb)(uint32_t))
764 {
765 	handler_message_fd_r = cb;
766 }
767 
768 void
769 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
770 {
771 	handler_envelope_create = cb;
772 }
773 
774 void
775 queue_api_on_envelope_delete(int(*cb)(uint64_t))
776 {
777 	handler_envelope_delete = cb;
778 }
779 
780 void
781 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
782 {
783 	handler_envelope_update = cb;
784 }
785 
786 void
787 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
788 {
789 	handler_envelope_load = cb;
790 }
791 
792 void
793 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
794 {
795 	handler_envelope_walk = cb;
796 }
797 
798 void
799 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
800     uint32_t, int *, void **))
801 {
802 	handler_message_walk = cb;
803 }
804