xref: /openbsd-src/usr.sbin/smtpd/queue_backend.c (revision a98354405973edac7b33f88ec765e376c1b5fb8a)
1 /*	$OpenBSD: queue_backend.c,v 1.57 2015/10/29 10:25:36 sunil Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <grp.h>
31 #include <imsg.h>
32 #include <limits.h>
33 #include <inttypes.h>
34 #include <libgen.h>
35 #include <pwd.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <time.h>
40 #include <unistd.h>
41 
42 #include "smtpd.h"
43 #include "log.h"
44 
45 static const char* envelope_validate(struct envelope *);
46 
47 extern struct queue_backend	queue_backend_fs;
48 extern struct queue_backend	queue_backend_null;
49 extern struct queue_backend	queue_backend_proc;
50 extern struct queue_backend	queue_backend_ram;
51 
52 static void queue_envelope_cache_add(struct envelope *);
53 static void queue_envelope_cache_update(struct envelope *);
54 static void queue_envelope_cache_del(uint64_t evpid);
55 
56 TAILQ_HEAD(evplst, envelope);
57 
58 static struct tree		evpcache_tree;
59 static struct evplst		evpcache_list;
60 static struct queue_backend	*backend;
61 
62 static int (*handler_close)(void);
63 static int (*handler_message_create)(uint32_t *);
64 static int (*handler_message_commit)(uint32_t, const char*);
65 static int (*handler_message_delete)(uint32_t);
66 static int (*handler_message_fd_r)(uint32_t);
67 static int (*handler_message_corrupt)(uint32_t);
68 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
69 static int (*handler_envelope_delete)(uint64_t);
70 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
71 static int (*handler_envelope_load)(uint64_t, char *, size_t);
72 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
73 static int (*handler_message_walk)(uint64_t *, char *, size_t,
74     uint32_t, int *, void **);
75 
76 #ifdef QUEUE_PROFILING
77 
78 static struct {
79 	struct timespec	 t0;
80 	const char	*name;
81 } profile;
82 
83 static inline void profile_enter(const char *name)
84 {
85 	if ((profiling & PROFILE_QUEUE) == 0)
86 		return;
87 
88 	profile.name = name;
89 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
90 }
91 
92 static inline void profile_leave(void)
93 {
94 	struct timespec	 t1, dt;
95 
96 	if ((profiling & PROFILE_QUEUE) == 0)
97 		return;
98 
99 	clock_gettime(CLOCK_MONOTONIC, &t1);
100 	timespecsub(&t1, &profile.t0, &dt);
101 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
102 	    (long long)dt.tv_sec, dt.tv_nsec);
103 }
104 #else
105 #define profile_enter(x)	do {} while (0)
106 #define profile_leave()		do {} while (0)
107 #endif
108 
109 static int
110 queue_message_path(uint32_t msgid, char *buf, size_t len)
111 {
112 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
113 }
114 
115 int
116 queue_init(const char *name, int server)
117 {
118 	struct passwd	*pwq;
119 	struct group	*gr;
120 	int		 r;
121 
122 	pwq = getpwnam(SMTPD_QUEUE_USER);
123 	if (pwq == NULL)
124 		errx(1, "unknown user %s", SMTPD_QUEUE_USER);
125 
126 	gr = getgrnam(SMTPD_QUEUE_GROUP);
127 	if (gr == NULL)
128 		errx(1, "unknown group %s", SMTPD_QUEUE_GROUP);
129 
130 	tree_init(&evpcache_tree);
131 	TAILQ_INIT(&evpcache_list);
132 
133 	if (!strcmp(name, "fs"))
134 		backend = &queue_backend_fs;
135 	else if (!strcmp(name, "null"))
136 		backend = &queue_backend_null;
137 	else if (!strcmp(name, "ram"))
138 		backend = &queue_backend_ram;
139 	else
140 		backend = &queue_backend_proc;
141 
142 	if (server) {
143 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
144 			errx(1, "error in spool directory setup");
145 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
146 			errx(1, "error in offline directory setup");
147 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
148 			errx(1, "error in purge directory setup");
149 
150 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
151 
152 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
153 			errx(1, "error in purge directory setup");
154 	}
155 
156 	r = backend->init(pwq, server, name);
157 
158 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
159 
160 	return (r);
161 }
162 
163 int
164 queue_close(void)
165 {
166 	if (handler_close)
167 		return (handler_close());
168 
169 	return (1);
170 }
171 
172 int
173 queue_message_create(uint32_t *msgid)
174 {
175 	int	r;
176 
177 	profile_enter("queue_message_create");
178 	r = handler_message_create(msgid);
179 	profile_leave();
180 
181 	log_trace(TRACE_QUEUE,
182 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
183 	    r, *msgid);
184 
185 	return (r);
186 }
187 
188 int
189 queue_message_delete(uint32_t msgid)
190 {
191 	char	msgpath[PATH_MAX];
192 	int	r;
193 
194 	profile_enter("queue_message_delete");
195 	r = handler_message_delete(msgid);
196 	profile_leave();
197 
198 	/* in case the message is incoming */
199 	queue_message_path(msgid, msgpath, sizeof(msgpath));
200 	unlink(msgpath);
201 
202 	log_trace(TRACE_QUEUE,
203 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
204 
205 	return (r);
206 }
207 
208 int
209 queue_message_commit(uint32_t msgid)
210 {
211 	int	r;
212 	char	msgpath[PATH_MAX];
213 	char	tmppath[PATH_MAX];
214 	FILE	*ifp = NULL;
215 	FILE	*ofp = NULL;
216 
217 	profile_enter("queue_message_commit");
218 
219 	queue_message_path(msgid, msgpath, sizeof(msgpath));
220 
221 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
222 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
223 		ifp = fopen(msgpath, "r");
224 		ofp = fopen(tmppath, "w+");
225 		if (ifp == NULL || ofp == NULL)
226 			goto err;
227 		if (! compress_file(ifp, ofp))
228 			goto err;
229 		fclose(ifp);
230 		fclose(ofp);
231 		ifp = NULL;
232 		ofp = NULL;
233 
234 		if (rename(tmppath, msgpath) == -1) {
235 			if (errno == ENOSPC)
236 				return (0);
237 			unlink(tmppath);
238 			log_warn("rename");
239 			return (0);
240 		}
241 	}
242 
243 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
244 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
245 		ifp = fopen(msgpath, "r");
246 		ofp = fopen(tmppath, "w+");
247 		if (ifp == NULL || ofp == NULL)
248 			goto err;
249 		if (! crypto_encrypt_file(ifp, ofp))
250 			goto err;
251 		fclose(ifp);
252 		fclose(ofp);
253 		ifp = NULL;
254 		ofp = NULL;
255 
256 		if (rename(tmppath, msgpath) == -1) {
257 			if (errno == ENOSPC)
258 				return (0);
259 			unlink(tmppath);
260 			log_warn("rename");
261 			return (0);
262 		}
263 	}
264 
265 	r = handler_message_commit(msgid, msgpath);
266 	profile_leave();
267 
268 	/* in case it's not done by the backend */
269 	unlink(msgpath);
270 
271 	log_trace(TRACE_QUEUE,
272 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
273 	    msgid, r);
274 
275 	return (r);
276 
277 err:
278 	if (ifp)
279 		fclose(ifp);
280 	if (ofp)
281 		fclose(ofp);
282 	return 0;
283 }
284 
285 int
286 queue_message_corrupt(uint32_t msgid)
287 {
288 	int	r;
289 
290 	profile_enter("queue_message_corrupt");
291 	r = handler_message_corrupt(msgid);
292 	profile_leave();
293 
294 	log_trace(TRACE_QUEUE,
295 	    "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r);
296 
297 	return (r);
298 }
299 
300 int
301 queue_message_fd_r(uint32_t msgid)
302 {
303 	int	fdin = -1, fdout = -1, fd = -1;
304 	FILE	*ifp = NULL;
305 	FILE	*ofp = NULL;
306 
307 	profile_enter("queue_message_fd_r");
308 	fdin = handler_message_fd_r(msgid);
309 	profile_leave();
310 
311 	log_trace(TRACE_QUEUE,
312 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
313 
314 	if (fdin == -1)
315 		return (-1);
316 
317 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
318 		if ((fdout = mktmpfile()) == -1)
319 			goto err;
320 		if ((fd = dup(fdout)) == -1)
321 			goto err;
322 		if ((ifp = fdopen(fdin, "r")) == NULL)
323 			goto err;
324 		fdin = fd;
325 		fd = -1;
326 		if ((ofp = fdopen(fdout, "w+")) == NULL)
327 			goto err;
328 
329 		if (! crypto_decrypt_file(ifp, ofp))
330 			goto err;
331 
332 		fclose(ifp);
333 		ifp = NULL;
334 		fclose(ofp);
335 		ofp = NULL;
336 		lseek(fdin, SEEK_SET, 0);
337 	}
338 
339 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
340 		if ((fdout = mktmpfile()) == -1)
341 			goto err;
342 		if ((fd = dup(fdout)) == -1)
343 			goto err;
344 		if ((ifp = fdopen(fdin, "r")) == NULL)
345 			goto err;
346 		fdin = fd;
347 		fd = -1;
348 		if ((ofp = fdopen(fdout, "w+")) == NULL)
349 			goto err;
350 
351 		if (! uncompress_file(ifp, ofp))
352 			goto err;
353 
354 		fclose(ifp);
355 		ifp = NULL;
356 		fclose(ofp);
357 		ofp = NULL;
358 		lseek(fdin, SEEK_SET, 0);
359 	}
360 
361 	return (fdin);
362 
363 err:
364 	if (fd != -1)
365 		close(fd);
366 	if (fdin != -1)
367 		close(fdin);
368 	if (fdout != -1)
369 		close(fdout);
370 	if (ifp)
371 		fclose(ifp);
372 	if (ofp)
373 		fclose(ofp);
374 	return -1;
375 }
376 
377 int
378 queue_message_fd_rw(uint32_t msgid)
379 {
380 	char buf[PATH_MAX];
381 
382 	queue_message_path(msgid, buf, sizeof(buf));
383 
384 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
385 }
386 
387 static int
388 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
389 {
390 	char   *evp;
391 	size_t	evplen;
392 	size_t	complen;
393 	char	compbuf[sizeof(struct envelope)];
394 	size_t	enclen;
395 	char	encbuf[sizeof(struct envelope)];
396 
397 	evp = evpbuf;
398 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
399 	if (evplen == 0)
400 		return (0);
401 
402 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
403 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
404 		if (complen == 0)
405 			return (0);
406 		evp = compbuf;
407 		evplen = complen;
408 	}
409 
410 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
411 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
412 		if (enclen == 0)
413 			return (0);
414 		evp = encbuf;
415 		evplen = enclen;
416 	}
417 
418 	memmove(evpbuf, evp, evplen);
419 
420 	return (evplen);
421 }
422 
423 static int
424 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
425 {
426 	char		*evp;
427 	size_t		 evplen;
428 	char		 compbuf[sizeof(struct envelope)];
429 	size_t		 complen;
430 	char		 encbuf[sizeof(struct envelope)];
431 	size_t		 enclen;
432 
433 	evp = evpbuf;
434 	evplen = evpbufsize;
435 
436 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
437 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
438 		if (enclen == 0)
439 			return (0);
440 		evp = encbuf;
441 		evplen = enclen;
442 	}
443 
444 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
445 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
446 		if (complen == 0)
447 			return (0);
448 		evp = compbuf;
449 		evplen = complen;
450 	}
451 
452 	return (envelope_load_buffer(ep, evp, evplen));
453 }
454 
455 static void
456 queue_envelope_cache_add(struct envelope *e)
457 {
458 	struct envelope *cached;
459 
460 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
461 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
462 
463 	cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add");
464 	*cached = *e;
465 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
466 	tree_xset(&evpcache_tree, e->id, cached);
467 	stat_increment("queue.evpcache.size", 1);
468 }
469 
470 static void
471 queue_envelope_cache_update(struct envelope *e)
472 {
473 	struct envelope *cached;
474 
475 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
476 		queue_envelope_cache_add(e);
477 		stat_increment("queue.evpcache.update.missed", 1);
478 	} else {
479 		TAILQ_REMOVE(&evpcache_list, cached, entry);
480 		*cached = *e;
481 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
482 		stat_increment("queue.evpcache.update.hit", 1);
483 	}
484 }
485 
486 static void
487 queue_envelope_cache_del(uint64_t evpid)
488 {
489 	struct envelope *cached;
490 
491 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
492 		return;
493 
494 	TAILQ_REMOVE(&evpcache_list, cached, entry);
495 	free(cached);
496 	stat_decrement("queue.evpcache.size", 1);
497 }
498 
499 int
500 queue_envelope_create(struct envelope *ep)
501 {
502 	int		 r;
503 	char		 evpbuf[sizeof(struct envelope)];
504 	size_t		 evplen;
505 	uint64_t	 evpid;
506 	uint32_t	 msgid;
507 
508 	ep->creation = time(NULL);
509 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
510 	if (evplen == 0)
511 		return (0);
512 
513 	evpid = ep->id;
514 	msgid = evpid_to_msgid(evpid);
515 
516 	profile_enter("queue_envelope_create");
517 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
518 	profile_leave();
519 
520 	log_trace(TRACE_QUEUE,
521 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
522 	    evpid, evplen, r, ep->id);
523 
524 	if (!r) {
525 		ep->creation = 0;
526 		ep->id = 0;
527 	}
528 
529 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
530 		queue_envelope_cache_add(ep);
531 
532 	return (r);
533 }
534 
535 int
536 queue_envelope_delete(uint64_t evpid)
537 {
538 	int	r;
539 
540 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
541 		queue_envelope_cache_del(evpid);
542 
543 	profile_enter("queue_envelope_delete");
544 	r = handler_envelope_delete(evpid);
545 	profile_leave();
546 
547 	log_trace(TRACE_QUEUE,
548 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
549 	    evpid, r);
550 
551 	return (r);
552 }
553 
554 int
555 queue_envelope_load(uint64_t evpid, struct envelope *ep)
556 {
557 	const char	*e;
558 	char		 evpbuf[sizeof(struct envelope)];
559 	size_t		 evplen;
560 	struct envelope	*cached;
561 
562 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
563 	    (cached = tree_get(&evpcache_tree, evpid))) {
564 		*ep = *cached;
565 		stat_increment("queue.evpcache.load.hit", 1);
566 		return (1);
567 	}
568 
569 	ep->id = evpid;
570 	profile_enter("queue_envelope_load");
571 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
572 	profile_leave();
573 
574 	log_trace(TRACE_QUEUE,
575 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
576 	    evpid, evplen);
577 
578 	if (evplen == 0)
579 		return (0);
580 
581 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
582 		if ((e = envelope_validate(ep)) == NULL) {
583 			ep->id = evpid;
584 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
585 				queue_envelope_cache_add(ep);
586 				stat_increment("queue.evpcache.load.missed", 1);
587 			}
588 			return (1);
589 		}
590 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
591 		    ep->id, e);
592 	}
593 
594 	(void)queue_message_corrupt(evpid_to_msgid(evpid));
595 	return (0);
596 }
597 
598 int
599 queue_envelope_update(struct envelope *ep)
600 {
601 	char	evpbuf[sizeof(struct envelope)];
602 	size_t	evplen;
603 	int	r;
604 
605 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
606 	if (evplen == 0)
607 		return (0);
608 
609 	profile_enter("queue_envelope_update");
610 	r = handler_envelope_update(ep->id, evpbuf, evplen);
611 	profile_leave();
612 
613 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
614 		queue_envelope_cache_update(ep);
615 
616 	log_trace(TRACE_QUEUE,
617 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
618 	    ep->id, r);
619 
620 	return (r);
621 }
622 
623 int
624 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
625 {
626 	char		 evpbuf[sizeof(struct envelope)];
627 	uint64_t	 evpid;
628 	int		 r;
629 	const char	*e;
630 
631 	profile_enter("queue_message_walk");
632 	r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
633 	    msgid, done, data);
634 	profile_leave();
635 
636 	log_trace(TRACE_QUEUE,
637 	    "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
638 	    r, evpid);
639 
640 	if (r == -1)
641 		return (r);
642 
643 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
644 		if ((e = envelope_validate(ep)) == NULL) {
645 			ep->id = evpid;
646 			/*
647 			 * do not cache the envelope here, while discovering
648 			 * envelopes one could re-run discover on already
649 			 * scheduled envelopes which leads to triggering of
650 			 * strict checks in caching. Envelopes could anyway
651 			 * be loaded from backend if it isn't cached.
652 			 */
653 			return (1);
654 		}
655 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
656 		    ep->id, e);
657 		(void)queue_message_corrupt(evpid_to_msgid(evpid));
658 	}
659 
660 	return (-1);
661 }
662 
663 int
664 queue_envelope_walk(struct envelope *ep)
665 {
666 	const char	*e;
667 	uint64_t	 evpid;
668 	char		 evpbuf[sizeof(struct envelope)];
669 	int		 r;
670 
671 	profile_enter("queue_envelope_walk");
672 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
673 	profile_leave();
674 
675 	log_trace(TRACE_QUEUE,
676 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
677 	    r, evpid);
678 
679 	if (r == -1)
680 		return (r);
681 
682 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
683 		if ((e = envelope_validate(ep)) == NULL) {
684 			ep->id = evpid;
685 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
686 				queue_envelope_cache_add(ep);
687 			return (1);
688 		}
689 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
690 		    ep->id, e);
691 		(void)queue_message_corrupt(evpid_to_msgid(evpid));
692 	}
693 
694 	return (0);
695 }
696 
697 uint32_t
698 queue_generate_msgid(void)
699 {
700 	uint32_t msgid;
701 
702 	while ((msgid = arc4random()) == 0)
703 		;
704 
705 	return msgid;
706 }
707 
708 uint64_t
709 queue_generate_evpid(uint32_t msgid)
710 {
711 	uint32_t rnd;
712 	uint64_t evpid;
713 
714 	while ((rnd = arc4random()) == 0)
715 		;
716 
717 	evpid = msgid;
718 	evpid <<= 32;
719 	evpid |= rnd;
720 
721 	return evpid;
722 }
723 
724 static const char*
725 envelope_validate(struct envelope *ep)
726 {
727 	if (ep->version != SMTPD_ENVELOPE_VERSION)
728 		return "version mismatch";
729 
730 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
731 		return "invalid helo";
732 	if (ep->helo[0] == '\0')
733 		return "empty helo";
734 
735 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
736 		return "invalid hostname";
737 	if (ep->hostname[0] == '\0')
738 		return "empty hostname";
739 
740 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
741 		return "invalid error line";
742 
743 	return NULL;
744 }
745 
746 void
747 queue_api_on_close(int(*cb)(void))
748 {
749 	handler_close = cb;
750 }
751 
752 void
753 queue_api_on_message_create(int(*cb)(uint32_t *))
754 {
755 	handler_message_create = cb;
756 }
757 
758 void
759 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
760 {
761 	handler_message_commit = cb;
762 }
763 
764 void
765 queue_api_on_message_delete(int(*cb)(uint32_t))
766 {
767 	handler_message_delete = cb;
768 }
769 
770 void
771 queue_api_on_message_fd_r(int(*cb)(uint32_t))
772 {
773 	handler_message_fd_r = cb;
774 }
775 
776 void
777 queue_api_on_message_corrupt(int(*cb)(uint32_t))
778 {
779 	handler_message_corrupt = cb;
780 }
781 
782 void
783 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
784 {
785 	handler_envelope_create = cb;
786 }
787 
788 void
789 queue_api_on_envelope_delete(int(*cb)(uint64_t))
790 {
791 	handler_envelope_delete = cb;
792 }
793 
794 void
795 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
796 {
797 	handler_envelope_update = cb;
798 }
799 
800 void
801 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
802 {
803 	handler_envelope_load = cb;
804 }
805 
806 void
807 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
808 {
809 	handler_envelope_walk = cb;
810 }
811 
812 void
813 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
814     uint32_t, int *, void **))
815 {
816 	handler_message_walk = cb;
817 }
818