xref: /openbsd-src/usr.sbin/smtpd/queue_backend.c (revision d0fc3bb68efd6c434b4053cd7adb29023cbec341)
1 /*	$OpenBSD: queue_backend.c,v 1.68 2021/06/14 17:58:16 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <grp.h>
22 #include <inttypes.h>
23 #include <pwd.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <unistd.h>
27 
28 #include "smtpd.h"
29 #include "log.h"
30 
31 static const char* envelope_validate(struct envelope *);
32 
33 extern struct queue_backend	queue_backend_fs;
34 extern struct queue_backend	queue_backend_null;
35 extern struct queue_backend	queue_backend_proc;
36 extern struct queue_backend	queue_backend_ram;
37 
38 static void queue_envelope_cache_add(struct envelope *);
39 static void queue_envelope_cache_update(struct envelope *);
40 static void queue_envelope_cache_del(uint64_t evpid);
41 
42 TAILQ_HEAD(evplst, envelope);
43 
44 static struct tree		evpcache_tree;
45 static struct evplst		evpcache_list;
46 static struct queue_backend	*backend;
47 
48 static int (*handler_close)(void);
49 static int (*handler_message_create)(uint32_t *);
50 static int (*handler_message_commit)(uint32_t, const char*);
51 static int (*handler_message_delete)(uint32_t);
52 static int (*handler_message_fd_r)(uint32_t);
53 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
54 static int (*handler_envelope_delete)(uint64_t);
55 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
56 static int (*handler_envelope_load)(uint64_t, char *, size_t);
57 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
58 static int (*handler_message_walk)(uint64_t *, char *, size_t,
59     uint32_t, int *, void **);
60 
61 #ifdef QUEUE_PROFILING
62 
63 static struct {
64 	struct timespec	 t0;
65 	const char	*name;
66 } profile;
67 
68 static inline void profile_enter(const char *name)
69 {
70 	if ((profiling & PROFILE_QUEUE) == 0)
71 		return;
72 
73 	profile.name = name;
74 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
75 }
76 
77 static inline void profile_leave(void)
78 {
79 	struct timespec	 t1, dt;
80 
81 	if ((profiling & PROFILE_QUEUE) == 0)
82 		return;
83 
84 	clock_gettime(CLOCK_MONOTONIC, &t1);
85 	timespecsub(&t1, &profile.t0, &dt);
86 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
87 	    (long long)dt.tv_sec, dt.tv_nsec);
88 }
89 #else
90 #define profile_enter(x)	do {} while (0)
91 #define profile_leave()		do {} while (0)
92 #endif
93 
94 static int
95 queue_message_path(uint32_t msgid, char *buf, size_t len)
96 {
97 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
98 }
99 
100 int
101 queue_init(const char *name, int server)
102 {
103 	struct passwd	*pwq;
104 	struct group	*gr;
105 	int		 r;
106 
107 	pwq = getpwnam(SMTPD_QUEUE_USER);
108 	if (pwq == NULL)
109 		fatalx("unknown user %s", SMTPD_QUEUE_USER);
110 
111 	gr = getgrnam(SMTPD_QUEUE_GROUP);
112 	if (gr == NULL)
113 		fatalx("unknown group %s", SMTPD_QUEUE_GROUP);
114 
115 	tree_init(&evpcache_tree);
116 	TAILQ_INIT(&evpcache_list);
117 
118 	if (!strcmp(name, "fs"))
119 		backend = &queue_backend_fs;
120 	else if (!strcmp(name, "null"))
121 		backend = &queue_backend_null;
122 	else if (!strcmp(name, "ram"))
123 		backend = &queue_backend_ram;
124 	else
125 		backend = &queue_backend_proc;
126 
127 	if (server) {
128 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
129 			fatalx("error in spool directory setup");
130 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
131 			fatalx("error in offline directory setup");
132 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
133 			fatalx("error in purge directory setup");
134 
135 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
136 
137 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
138 			fatalx("error in purge directory setup");
139 	}
140 
141 	r = backend->init(pwq, server, name);
142 
143 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
144 
145 	return (r);
146 }
147 
148 int
149 queue_close(void)
150 {
151 	if (handler_close)
152 		return (handler_close());
153 
154 	return (1);
155 }
156 
157 int
158 queue_message_create(uint32_t *msgid)
159 {
160 	int	r;
161 
162 	profile_enter("queue_message_create");
163 	r = handler_message_create(msgid);
164 	profile_leave();
165 
166 	log_trace(TRACE_QUEUE,
167 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
168 	    r, *msgid);
169 
170 	return (r);
171 }
172 
173 int
174 queue_message_delete(uint32_t msgid)
175 {
176 	char	msgpath[PATH_MAX];
177 	uint64_t evpid;
178 	void   *iter;
179 	int	r;
180 
181 	profile_enter("queue_message_delete");
182 	r = handler_message_delete(msgid);
183 	profile_leave();
184 
185 	/* in case the message is incoming */
186 	queue_message_path(msgid, msgpath, sizeof(msgpath));
187 	unlink(msgpath);
188 
189 	/* remove remaining envelopes from the cache if any (on rollback) */
190 	evpid = msgid_to_evpid(msgid);
191 	for (;;) {
192 		iter = NULL;
193 		if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
194 			break;
195 		if (evpid_to_msgid(evpid) != msgid)
196 			break;
197 		queue_envelope_cache_del(evpid);
198 	}
199 
200 	log_trace(TRACE_QUEUE,
201 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
202 
203 	return (r);
204 }
205 
206 int
207 queue_message_commit(uint32_t msgid)
208 {
209 	int	r;
210 	char	msgpath[PATH_MAX];
211 	char	tmppath[PATH_MAX];
212 	FILE	*ifp = NULL;
213 	FILE	*ofp = NULL;
214 
215 	profile_enter("queue_message_commit");
216 
217 	queue_message_path(msgid, msgpath, sizeof(msgpath));
218 
219 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
220 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
221 		ifp = fopen(msgpath, "r");
222 		ofp = fopen(tmppath, "w+");
223 		if (ifp == NULL || ofp == NULL)
224 			goto err;
225 		if (!compress_file(ifp, ofp))
226 			goto err;
227 		fclose(ifp);
228 		fclose(ofp);
229 		ifp = NULL;
230 		ofp = NULL;
231 
232 		if (rename(tmppath, msgpath) == -1) {
233 			if (errno == ENOSPC)
234 				return (0);
235 			unlink(tmppath);
236 			log_warn("rename");
237 			return (0);
238 		}
239 	}
240 
241 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
242 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
243 		ifp = fopen(msgpath, "r");
244 		ofp = fopen(tmppath, "w+");
245 		if (ifp == NULL || ofp == NULL)
246 			goto err;
247 		if (!crypto_encrypt_file(ifp, ofp))
248 			goto err;
249 		fclose(ifp);
250 		fclose(ofp);
251 		ifp = NULL;
252 		ofp = NULL;
253 
254 		if (rename(tmppath, msgpath) == -1) {
255 			if (errno == ENOSPC)
256 				return (0);
257 			unlink(tmppath);
258 			log_warn("rename");
259 			return (0);
260 		}
261 	}
262 
263 	r = handler_message_commit(msgid, msgpath);
264 	profile_leave();
265 
266 	/* in case it's not done by the backend */
267 	unlink(msgpath);
268 
269 	log_trace(TRACE_QUEUE,
270 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
271 	    msgid, r);
272 
273 	return (r);
274 
275 err:
276 	if (ifp)
277 		fclose(ifp);
278 	if (ofp)
279 		fclose(ofp);
280 	return 0;
281 }
282 
283 int
284 queue_message_fd_r(uint32_t msgid)
285 {
286 	int	fdin = -1, fdout = -1, fd = -1;
287 	FILE	*ifp = NULL;
288 	FILE	*ofp = NULL;
289 
290 	profile_enter("queue_message_fd_r");
291 	fdin = handler_message_fd_r(msgid);
292 	profile_leave();
293 
294 	log_trace(TRACE_QUEUE,
295 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
296 
297 	if (fdin == -1)
298 		return (-1);
299 
300 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
301 		if ((fdout = mktmpfile()) == -1)
302 			goto err;
303 		if ((fd = dup(fdout)) == -1)
304 			goto err;
305 		if ((ifp = fdopen(fdin, "r")) == NULL)
306 			goto err;
307 		fdin = fd;
308 		fd = -1;
309 		if ((ofp = fdopen(fdout, "w+")) == NULL)
310 			goto err;
311 
312 		if (!crypto_decrypt_file(ifp, ofp))
313 			goto err;
314 
315 		fclose(ifp);
316 		ifp = NULL;
317 		fclose(ofp);
318 		ofp = NULL;
319 		lseek(fdin, SEEK_SET, 0);
320 	}
321 
322 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
323 		if ((fdout = mktmpfile()) == -1)
324 			goto err;
325 		if ((fd = dup(fdout)) == -1)
326 			goto err;
327 		if ((ifp = fdopen(fdin, "r")) == NULL)
328 			goto err;
329 		fdin = fd;
330 		fd = -1;
331 		if ((ofp = fdopen(fdout, "w+")) == NULL)
332 			goto err;
333 
334 		if (!uncompress_file(ifp, ofp))
335 			goto err;
336 
337 		fclose(ifp);
338 		ifp = NULL;
339 		fclose(ofp);
340 		ofp = NULL;
341 		lseek(fdin, SEEK_SET, 0);
342 	}
343 
344 	return (fdin);
345 
346 err:
347 	if (fd != -1)
348 		close(fd);
349 	if (fdin != -1)
350 		close(fdin);
351 	if (fdout != -1)
352 		close(fdout);
353 	if (ifp)
354 		fclose(ifp);
355 	if (ofp)
356 		fclose(ofp);
357 	return -1;
358 }
359 
360 int
361 queue_message_fd_rw(uint32_t msgid)
362 {
363 	char buf[PATH_MAX];
364 
365 	queue_message_path(msgid, buf, sizeof(buf));
366 
367 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
368 }
369 
370 static int
371 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
372 {
373 	char   *evp;
374 	size_t	evplen;
375 	size_t	complen;
376 	char	compbuf[sizeof(struct envelope)];
377 	size_t	enclen;
378 	char	encbuf[sizeof(struct envelope)];
379 
380 	evp = evpbuf;
381 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
382 	if (evplen == 0)
383 		return (0);
384 
385 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
386 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
387 		if (complen == 0)
388 			return (0);
389 		evp = compbuf;
390 		evplen = complen;
391 	}
392 
393 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
394 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
395 		if (enclen == 0)
396 			return (0);
397 		evp = encbuf;
398 		evplen = enclen;
399 	}
400 
401 	memmove(evpbuf, evp, evplen);
402 
403 	return (evplen);
404 }
405 
406 static int
407 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
408 {
409 	char		*evp;
410 	size_t		 evplen;
411 	char		 compbuf[sizeof(struct envelope)];
412 	size_t		 complen;
413 	char		 encbuf[sizeof(struct envelope)];
414 	size_t		 enclen;
415 
416 	evp = evpbuf;
417 	evplen = evpbufsize;
418 
419 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
420 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
421 		if (enclen == 0)
422 			return (0);
423 		evp = encbuf;
424 		evplen = enclen;
425 	}
426 
427 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
428 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
429 		if (complen == 0)
430 			return (0);
431 		evp = compbuf;
432 		evplen = complen;
433 	}
434 
435 	return (envelope_load_buffer(ep, evp, evplen));
436 }
437 
438 static void
439 queue_envelope_cache_add(struct envelope *e)
440 {
441 	struct envelope *cached;
442 
443 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
444 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
445 
446 	cached = xcalloc(1, sizeof *cached);
447 	*cached = *e;
448 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
449 	tree_xset(&evpcache_tree, e->id, cached);
450 	stat_increment("queue.evpcache.size", 1);
451 }
452 
453 static void
454 queue_envelope_cache_update(struct envelope *e)
455 {
456 	struct envelope *cached;
457 
458 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
459 		queue_envelope_cache_add(e);
460 		stat_increment("queue.evpcache.update.missed", 1);
461 	} else {
462 		TAILQ_REMOVE(&evpcache_list, cached, entry);
463 		*cached = *e;
464 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
465 		stat_increment("queue.evpcache.update.hit", 1);
466 	}
467 }
468 
469 static void
470 queue_envelope_cache_del(uint64_t evpid)
471 {
472 	struct envelope *cached;
473 
474 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
475 		return;
476 
477 	TAILQ_REMOVE(&evpcache_list, cached, entry);
478 	free(cached);
479 	stat_decrement("queue.evpcache.size", 1);
480 }
481 
482 int
483 queue_envelope_create(struct envelope *ep)
484 {
485 	int		 r;
486 	char		 evpbuf[sizeof(struct envelope)];
487 	size_t		 evplen;
488 	uint64_t	 evpid;
489 	uint32_t	 msgid;
490 
491 	ep->creation = time(NULL);
492 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
493 	if (evplen == 0)
494 		return (0);
495 
496 	evpid = ep->id;
497 	msgid = evpid_to_msgid(evpid);
498 
499 	profile_enter("queue_envelope_create");
500 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
501 	profile_leave();
502 
503 	log_trace(TRACE_QUEUE,
504 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
505 	    evpid, evplen, r, ep->id);
506 
507 	if (!r) {
508 		ep->creation = 0;
509 		ep->id = 0;
510 	}
511 
512 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
513 		queue_envelope_cache_add(ep);
514 
515 	return (r);
516 }
517 
518 int
519 queue_envelope_delete(uint64_t evpid)
520 {
521 	int	r;
522 
523 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
524 		queue_envelope_cache_del(evpid);
525 
526 	profile_enter("queue_envelope_delete");
527 	r = handler_envelope_delete(evpid);
528 	profile_leave();
529 
530 	log_trace(TRACE_QUEUE,
531 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
532 	    evpid, r);
533 
534 	return (r);
535 }
536 
537 int
538 queue_envelope_load(uint64_t evpid, struct envelope *ep)
539 {
540 	const char	*e;
541 	char		 evpbuf[sizeof(struct envelope)];
542 	size_t		 evplen;
543 	struct envelope	*cached;
544 
545 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
546 	    (cached = tree_get(&evpcache_tree, evpid))) {
547 		*ep = *cached;
548 		stat_increment("queue.evpcache.load.hit", 1);
549 		return (1);
550 	}
551 
552 	ep->id = evpid;
553 	profile_enter("queue_envelope_load");
554 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
555 	profile_leave();
556 
557 	log_trace(TRACE_QUEUE,
558 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
559 	    evpid, evplen);
560 
561 	if (evplen == 0)
562 		return (0);
563 
564 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
565 		if ((e = envelope_validate(ep)) == NULL) {
566 			ep->id = evpid;
567 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
568 				queue_envelope_cache_add(ep);
569 				stat_increment("queue.evpcache.load.missed", 1);
570 			}
571 			return (1);
572 		}
573 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
574 		    evpid, e);
575 	}
576 	return (0);
577 }
578 
579 int
580 queue_envelope_update(struct envelope *ep)
581 {
582 	char	evpbuf[sizeof(struct envelope)];
583 	size_t	evplen;
584 	int	r;
585 
586 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
587 	if (evplen == 0)
588 		return (0);
589 
590 	profile_enter("queue_envelope_update");
591 	r = handler_envelope_update(ep->id, evpbuf, evplen);
592 	profile_leave();
593 
594 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
595 		queue_envelope_cache_update(ep);
596 
597 	log_trace(TRACE_QUEUE,
598 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
599 	    ep->id, r);
600 
601 	return (r);
602 }
603 
604 int
605 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
606 {
607 	char		 evpbuf[sizeof(struct envelope)];
608 	uint64_t	 evpid;
609 	int		 r;
610 	const char	*e;
611 
612 	profile_enter("queue_message_walk");
613 	r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
614 	    msgid, done, data);
615 	profile_leave();
616 
617 	log_trace(TRACE_QUEUE,
618 	    "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
619 	    r, evpid);
620 
621 	if (r == -1)
622 		return (r);
623 
624 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
625 		if ((e = envelope_validate(ep)) == NULL) {
626 			ep->id = evpid;
627 			/*
628 			 * do not cache the envelope here, while discovering
629 			 * envelopes one could re-run discover on already
630 			 * scheduled envelopes which leads to triggering of
631 			 * strict checks in caching. Envelopes could anyway
632 			 * be loaded from backend if it isn't cached.
633 			 */
634 			return (1);
635 		}
636 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
637 		    evpid, e);
638 	}
639 	return (0);
640 }
641 
642 int
643 queue_envelope_walk(struct envelope *ep)
644 {
645 	const char	*e;
646 	uint64_t	 evpid;
647 	char		 evpbuf[sizeof(struct envelope)];
648 	int		 r;
649 
650 	profile_enter("queue_envelope_walk");
651 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
652 	profile_leave();
653 
654 	log_trace(TRACE_QUEUE,
655 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
656 	    r, evpid);
657 
658 	if (r == -1)
659 		return (r);
660 
661 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
662 		if ((e = envelope_validate(ep)) == NULL) {
663 			ep->id = evpid;
664 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
665 				queue_envelope_cache_add(ep);
666 			return (1);
667 		}
668 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
669 		    evpid, e);
670 	}
671 	return (0);
672 }
673 
674 uint32_t
675 queue_generate_msgid(void)
676 {
677 	uint32_t msgid;
678 
679 	while ((msgid = arc4random()) == 0)
680 		;
681 
682 	return msgid;
683 }
684 
685 uint64_t
686 queue_generate_evpid(uint32_t msgid)
687 {
688 	uint32_t rnd;
689 	uint64_t evpid;
690 
691 	while ((rnd = arc4random()) == 0)
692 		;
693 
694 	evpid = msgid;
695 	evpid <<= 32;
696 	evpid |= rnd;
697 
698 	return evpid;
699 }
700 
701 static const char*
702 envelope_validate(struct envelope *ep)
703 {
704 	if (ep->version != SMTPD_ENVELOPE_VERSION)
705 		return "version mismatch";
706 
707 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
708 		return "invalid helo";
709 	if (ep->helo[0] == '\0')
710 		return "empty helo";
711 
712 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
713 		return "invalid hostname";
714 	if (ep->hostname[0] == '\0')
715 		return "empty hostname";
716 
717 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
718 		return "invalid error line";
719 
720 	if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL)
721 		return "unknown dispatcher";
722 
723 	return NULL;
724 }
725 
726 void
727 queue_api_on_close(int(*cb)(void))
728 {
729 	handler_close = cb;
730 }
731 
732 void
733 queue_api_on_message_create(int(*cb)(uint32_t *))
734 {
735 	handler_message_create = cb;
736 }
737 
738 void
739 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
740 {
741 	handler_message_commit = cb;
742 }
743 
744 void
745 queue_api_on_message_delete(int(*cb)(uint32_t))
746 {
747 	handler_message_delete = cb;
748 }
749 
750 void
751 queue_api_on_message_fd_r(int(*cb)(uint32_t))
752 {
753 	handler_message_fd_r = cb;
754 }
755 
756 void
757 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
758 {
759 	handler_envelope_create = cb;
760 }
761 
762 void
763 queue_api_on_envelope_delete(int(*cb)(uint64_t))
764 {
765 	handler_envelope_delete = cb;
766 }
767 
768 void
769 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
770 {
771 	handler_envelope_update = cb;
772 }
773 
774 void
775 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
776 {
777 	handler_envelope_load = cb;
778 }
779 
780 void
781 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
782 {
783 	handler_envelope_walk = cb;
784 }
785 
786 void
787 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
788     uint32_t, int *, void **))
789 {
790 	handler_message_walk = cb;
791 }
792