xref: /openbsd-src/usr.sbin/smtpd/queue_backend.c (revision d05af802c82022e40b2b220b4c42e85cbc9a596e)
1 /*	$OpenBSD: queue_backend.c,v 1.59 2015/11/05 09:14:31 sunil Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <grp.h>
31 #include <imsg.h>
32 #include <limits.h>
33 #include <inttypes.h>
34 #include <libgen.h>
35 #include <pwd.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <time.h>
40 #include <unistd.h>
41 
42 #include "smtpd.h"
43 #include "log.h"
44 
45 static const char* envelope_validate(struct envelope *);
46 
47 extern struct queue_backend	queue_backend_fs;
48 extern struct queue_backend	queue_backend_null;
49 extern struct queue_backend	queue_backend_proc;
50 extern struct queue_backend	queue_backend_ram;
51 
52 static void queue_envelope_cache_add(struct envelope *);
53 static void queue_envelope_cache_update(struct envelope *);
54 static void queue_envelope_cache_del(uint64_t evpid);
55 
56 TAILQ_HEAD(evplst, envelope);
57 
58 static struct tree		evpcache_tree;
59 static struct evplst		evpcache_list;
60 static struct queue_backend	*backend;
61 
62 static int (*handler_close)(void);
63 static int (*handler_message_create)(uint32_t *);
64 static int (*handler_message_commit)(uint32_t, const char*);
65 static int (*handler_message_delete)(uint32_t);
66 static int (*handler_message_fd_r)(uint32_t);
67 static int (*handler_message_corrupt)(uint32_t);
68 static int (*handler_message_uncorrupt)(uint32_t);
69 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
70 static int (*handler_envelope_delete)(uint64_t);
71 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
72 static int (*handler_envelope_load)(uint64_t, char *, size_t);
73 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
74 static int (*handler_message_walk)(uint64_t *, char *, size_t,
75     uint32_t, int *, void **);
76 
77 #ifdef QUEUE_PROFILING
78 
79 static struct {
80 	struct timespec	 t0;
81 	const char	*name;
82 } profile;
83 
84 static inline void profile_enter(const char *name)
85 {
86 	if ((profiling & PROFILE_QUEUE) == 0)
87 		return;
88 
89 	profile.name = name;
90 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
91 }
92 
93 static inline void profile_leave(void)
94 {
95 	struct timespec	 t1, dt;
96 
97 	if ((profiling & PROFILE_QUEUE) == 0)
98 		return;
99 
100 	clock_gettime(CLOCK_MONOTONIC, &t1);
101 	timespecsub(&t1, &profile.t0, &dt);
102 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
103 	    (long long)dt.tv_sec, dt.tv_nsec);
104 }
105 #else
106 #define profile_enter(x)	do {} while (0)
107 #define profile_leave()		do {} while (0)
108 #endif
109 
110 static int
111 queue_message_path(uint32_t msgid, char *buf, size_t len)
112 {
113 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
114 }
115 
116 int
117 queue_init(const char *name, int server)
118 {
119 	struct passwd	*pwq;
120 	struct group	*gr;
121 	int		 r;
122 
123 	pwq = getpwnam(SMTPD_QUEUE_USER);
124 	if (pwq == NULL)
125 		errx(1, "unknown user %s", SMTPD_QUEUE_USER);
126 
127 	gr = getgrnam(SMTPD_QUEUE_GROUP);
128 	if (gr == NULL)
129 		errx(1, "unknown group %s", SMTPD_QUEUE_GROUP);
130 
131 	tree_init(&evpcache_tree);
132 	TAILQ_INIT(&evpcache_list);
133 
134 	if (!strcmp(name, "fs"))
135 		backend = &queue_backend_fs;
136 	else if (!strcmp(name, "null"))
137 		backend = &queue_backend_null;
138 	else if (!strcmp(name, "ram"))
139 		backend = &queue_backend_ram;
140 	else
141 		backend = &queue_backend_proc;
142 
143 	if (server) {
144 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
145 			errx(1, "error in spool directory setup");
146 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
147 			errx(1, "error in offline directory setup");
148 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
149 			errx(1, "error in purge directory setup");
150 
151 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
152 
153 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
154 			errx(1, "error in purge directory setup");
155 	}
156 
157 	r = backend->init(pwq, server, name);
158 
159 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
160 
161 	return (r);
162 }
163 
164 int
165 queue_close(void)
166 {
167 	if (handler_close)
168 		return (handler_close());
169 
170 	return (1);
171 }
172 
173 int
174 queue_message_create(uint32_t *msgid)
175 {
176 	int	r;
177 
178 	profile_enter("queue_message_create");
179 	r = handler_message_create(msgid);
180 	profile_leave();
181 
182 	log_trace(TRACE_QUEUE,
183 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
184 	    r, *msgid);
185 
186 	return (r);
187 }
188 
189 int
190 queue_message_delete(uint32_t msgid)
191 {
192 	char	msgpath[PATH_MAX];
193 	int	r;
194 
195 	profile_enter("queue_message_delete");
196 	r = handler_message_delete(msgid);
197 	profile_leave();
198 
199 	/* in case the message is incoming */
200 	queue_message_path(msgid, msgpath, sizeof(msgpath));
201 	unlink(msgpath);
202 
203 	log_trace(TRACE_QUEUE,
204 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
205 
206 	return (r);
207 }
208 
209 int
210 queue_message_commit(uint32_t msgid)
211 {
212 	int	r;
213 	char	msgpath[PATH_MAX];
214 	char	tmppath[PATH_MAX];
215 	FILE	*ifp = NULL;
216 	FILE	*ofp = NULL;
217 
218 	profile_enter("queue_message_commit");
219 
220 	queue_message_path(msgid, msgpath, sizeof(msgpath));
221 
222 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
223 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
224 		ifp = fopen(msgpath, "r");
225 		ofp = fopen(tmppath, "w+");
226 		if (ifp == NULL || ofp == NULL)
227 			goto err;
228 		if (! compress_file(ifp, ofp))
229 			goto err;
230 		fclose(ifp);
231 		fclose(ofp);
232 		ifp = NULL;
233 		ofp = NULL;
234 
235 		if (rename(tmppath, msgpath) == -1) {
236 			if (errno == ENOSPC)
237 				return (0);
238 			unlink(tmppath);
239 			log_warn("rename");
240 			return (0);
241 		}
242 	}
243 
244 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
245 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
246 		ifp = fopen(msgpath, "r");
247 		ofp = fopen(tmppath, "w+");
248 		if (ifp == NULL || ofp == NULL)
249 			goto err;
250 		if (! crypto_encrypt_file(ifp, ofp))
251 			goto err;
252 		fclose(ifp);
253 		fclose(ofp);
254 		ifp = NULL;
255 		ofp = NULL;
256 
257 		if (rename(tmppath, msgpath) == -1) {
258 			if (errno == ENOSPC)
259 				return (0);
260 			unlink(tmppath);
261 			log_warn("rename");
262 			return (0);
263 		}
264 	}
265 
266 	r = handler_message_commit(msgid, msgpath);
267 	profile_leave();
268 
269 	/* in case it's not done by the backend */
270 	unlink(msgpath);
271 
272 	log_trace(TRACE_QUEUE,
273 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
274 	    msgid, r);
275 
276 	return (r);
277 
278 err:
279 	if (ifp)
280 		fclose(ifp);
281 	if (ofp)
282 		fclose(ofp);
283 	return 0;
284 }
285 
286 int
287 queue_message_corrupt(uint32_t msgid)
288 {
289 	int	r;
290 
291 	profile_enter("queue_message_corrupt");
292 	r = handler_message_corrupt(msgid);
293 	profile_leave();
294 
295 	log_trace(TRACE_QUEUE,
296 	    "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r);
297 
298 	return (r);
299 }
300 
301 int
302 queue_message_uncorrupt(uint32_t msgid)
303 {
304 	return handler_message_uncorrupt(msgid);
305 }
306 
307 int
308 queue_message_fd_r(uint32_t msgid)
309 {
310 	int	fdin = -1, fdout = -1, fd = -1;
311 	FILE	*ifp = NULL;
312 	FILE	*ofp = NULL;
313 
314 	profile_enter("queue_message_fd_r");
315 	fdin = handler_message_fd_r(msgid);
316 	profile_leave();
317 
318 	log_trace(TRACE_QUEUE,
319 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
320 
321 	if (fdin == -1)
322 		return (-1);
323 
324 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
325 		if ((fdout = mktmpfile()) == -1)
326 			goto err;
327 		if ((fd = dup(fdout)) == -1)
328 			goto err;
329 		if ((ifp = fdopen(fdin, "r")) == NULL)
330 			goto err;
331 		fdin = fd;
332 		fd = -1;
333 		if ((ofp = fdopen(fdout, "w+")) == NULL)
334 			goto err;
335 
336 		if (! crypto_decrypt_file(ifp, ofp))
337 			goto err;
338 
339 		fclose(ifp);
340 		ifp = NULL;
341 		fclose(ofp);
342 		ofp = NULL;
343 		lseek(fdin, SEEK_SET, 0);
344 	}
345 
346 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
347 		if ((fdout = mktmpfile()) == -1)
348 			goto err;
349 		if ((fd = dup(fdout)) == -1)
350 			goto err;
351 		if ((ifp = fdopen(fdin, "r")) == NULL)
352 			goto err;
353 		fdin = fd;
354 		fd = -1;
355 		if ((ofp = fdopen(fdout, "w+")) == NULL)
356 			goto err;
357 
358 		if (! uncompress_file(ifp, ofp))
359 			goto err;
360 
361 		fclose(ifp);
362 		ifp = NULL;
363 		fclose(ofp);
364 		ofp = NULL;
365 		lseek(fdin, SEEK_SET, 0);
366 	}
367 
368 	return (fdin);
369 
370 err:
371 	if (fd != -1)
372 		close(fd);
373 	if (fdin != -1)
374 		close(fdin);
375 	if (fdout != -1)
376 		close(fdout);
377 	if (ifp)
378 		fclose(ifp);
379 	if (ofp)
380 		fclose(ofp);
381 	return -1;
382 }
383 
384 int
385 queue_message_fd_rw(uint32_t msgid)
386 {
387 	char buf[PATH_MAX];
388 
389 	queue_message_path(msgid, buf, sizeof(buf));
390 
391 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
392 }
393 
394 static int
395 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
396 {
397 	char   *evp;
398 	size_t	evplen;
399 	size_t	complen;
400 	char	compbuf[sizeof(struct envelope)];
401 	size_t	enclen;
402 	char	encbuf[sizeof(struct envelope)];
403 
404 	evp = evpbuf;
405 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
406 	if (evplen == 0)
407 		return (0);
408 
409 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
410 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
411 		if (complen == 0)
412 			return (0);
413 		evp = compbuf;
414 		evplen = complen;
415 	}
416 
417 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
418 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
419 		if (enclen == 0)
420 			return (0);
421 		evp = encbuf;
422 		evplen = enclen;
423 	}
424 
425 	memmove(evpbuf, evp, evplen);
426 
427 	return (evplen);
428 }
429 
430 static int
431 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
432 {
433 	char		*evp;
434 	size_t		 evplen;
435 	char		 compbuf[sizeof(struct envelope)];
436 	size_t		 complen;
437 	char		 encbuf[sizeof(struct envelope)];
438 	size_t		 enclen;
439 
440 	evp = evpbuf;
441 	evplen = evpbufsize;
442 
443 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
444 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
445 		if (enclen == 0)
446 			return (0);
447 		evp = encbuf;
448 		evplen = enclen;
449 	}
450 
451 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
452 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
453 		if (complen == 0)
454 			return (0);
455 		evp = compbuf;
456 		evplen = complen;
457 	}
458 
459 	return (envelope_load_buffer(ep, evp, evplen));
460 }
461 
462 static void
463 queue_envelope_cache_add(struct envelope *e)
464 {
465 	struct envelope *cached;
466 
467 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
468 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
469 
470 	cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add");
471 	*cached = *e;
472 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
473 	tree_xset(&evpcache_tree, e->id, cached);
474 	stat_increment("queue.evpcache.size", 1);
475 }
476 
477 static void
478 queue_envelope_cache_update(struct envelope *e)
479 {
480 	struct envelope *cached;
481 
482 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
483 		queue_envelope_cache_add(e);
484 		stat_increment("queue.evpcache.update.missed", 1);
485 	} else {
486 		TAILQ_REMOVE(&evpcache_list, cached, entry);
487 		*cached = *e;
488 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
489 		stat_increment("queue.evpcache.update.hit", 1);
490 	}
491 }
492 
493 static void
494 queue_envelope_cache_del(uint64_t evpid)
495 {
496 	struct envelope *cached;
497 
498 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
499 		return;
500 
501 	TAILQ_REMOVE(&evpcache_list, cached, entry);
502 	free(cached);
503 	stat_decrement("queue.evpcache.size", 1);
504 }
505 
506 int
507 queue_envelope_create(struct envelope *ep)
508 {
509 	int		 r;
510 	char		 evpbuf[sizeof(struct envelope)];
511 	size_t		 evplen;
512 	uint64_t	 evpid;
513 	uint32_t	 msgid;
514 
515 	ep->creation = time(NULL);
516 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
517 	if (evplen == 0)
518 		return (0);
519 
520 	evpid = ep->id;
521 	msgid = evpid_to_msgid(evpid);
522 
523 	profile_enter("queue_envelope_create");
524 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
525 	profile_leave();
526 
527 	log_trace(TRACE_QUEUE,
528 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
529 	    evpid, evplen, r, ep->id);
530 
531 	if (!r) {
532 		ep->creation = 0;
533 		ep->id = 0;
534 	}
535 
536 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
537 		queue_envelope_cache_add(ep);
538 
539 	return (r);
540 }
541 
542 int
543 queue_envelope_delete(uint64_t evpid)
544 {
545 	int	r;
546 
547 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
548 		queue_envelope_cache_del(evpid);
549 
550 	profile_enter("queue_envelope_delete");
551 	r = handler_envelope_delete(evpid);
552 	profile_leave();
553 
554 	log_trace(TRACE_QUEUE,
555 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
556 	    evpid, r);
557 
558 	return (r);
559 }
560 
561 int
562 queue_envelope_load(uint64_t evpid, struct envelope *ep)
563 {
564 	const char	*e;
565 	char		 evpbuf[sizeof(struct envelope)];
566 	size_t		 evplen;
567 	struct envelope	*cached;
568 
569 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
570 	    (cached = tree_get(&evpcache_tree, evpid))) {
571 		*ep = *cached;
572 		stat_increment("queue.evpcache.load.hit", 1);
573 		return (1);
574 	}
575 
576 	ep->id = evpid;
577 	profile_enter("queue_envelope_load");
578 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
579 	profile_leave();
580 
581 	log_trace(TRACE_QUEUE,
582 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
583 	    evpid, evplen);
584 
585 	if (evplen == 0)
586 		return (0);
587 
588 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
589 		if ((e = envelope_validate(ep)) == NULL) {
590 			ep->id = evpid;
591 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
592 				queue_envelope_cache_add(ep);
593 				stat_increment("queue.evpcache.load.missed", 1);
594 			}
595 			return (1);
596 		}
597 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
598 		    ep->id, e);
599 	}
600 
601 	(void)queue_message_corrupt(evpid_to_msgid(evpid));
602 	return (0);
603 }
604 
605 int
606 queue_envelope_update(struct envelope *ep)
607 {
608 	char	evpbuf[sizeof(struct envelope)];
609 	size_t	evplen;
610 	int	r;
611 
612 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
613 	if (evplen == 0)
614 		return (0);
615 
616 	profile_enter("queue_envelope_update");
617 	r = handler_envelope_update(ep->id, evpbuf, evplen);
618 	profile_leave();
619 
620 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
621 		queue_envelope_cache_update(ep);
622 
623 	log_trace(TRACE_QUEUE,
624 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
625 	    ep->id, r);
626 
627 	return (r);
628 }
629 
630 int
631 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
632 {
633 	char		 evpbuf[sizeof(struct envelope)];
634 	uint64_t	 evpid;
635 	int		 r;
636 	const char	*e;
637 
638 	profile_enter("queue_message_walk");
639 	r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
640 	    msgid, done, data);
641 	profile_leave();
642 
643 	log_trace(TRACE_QUEUE,
644 	    "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
645 	    r, evpid);
646 
647 	if (r == -1)
648 		return (r);
649 
650 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
651 		if ((e = envelope_validate(ep)) == NULL) {
652 			ep->id = evpid;
653 			/*
654 			 * do not cache the envelope here, while discovering
655 			 * envelopes one could re-run discover on already
656 			 * scheduled envelopes which leads to triggering of
657 			 * strict checks in caching. Envelopes could anyway
658 			 * be loaded from backend if it isn't cached.
659 			 */
660 			return (1);
661 		}
662 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
663 		    ep->id, e);
664 		(void)queue_message_corrupt(evpid_to_msgid(evpid));
665 	}
666 
667 	return (0);
668 }
669 
670 int
671 queue_envelope_walk(struct envelope *ep)
672 {
673 	const char	*e;
674 	uint64_t	 evpid;
675 	char		 evpbuf[sizeof(struct envelope)];
676 	int		 r;
677 
678 	profile_enter("queue_envelope_walk");
679 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
680 	profile_leave();
681 
682 	log_trace(TRACE_QUEUE,
683 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
684 	    r, evpid);
685 
686 	if (r == -1)
687 		return (r);
688 
689 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
690 		if ((e = envelope_validate(ep)) == NULL) {
691 			ep->id = evpid;
692 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
693 				queue_envelope_cache_add(ep);
694 			return (1);
695 		}
696 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
697 		    ep->id, e);
698 		(void)queue_message_corrupt(evpid_to_msgid(evpid));
699 	}
700 
701 	return (0);
702 }
703 
704 uint32_t
705 queue_generate_msgid(void)
706 {
707 	uint32_t msgid;
708 
709 	while ((msgid = arc4random()) == 0)
710 		;
711 
712 	return msgid;
713 }
714 
715 uint64_t
716 queue_generate_evpid(uint32_t msgid)
717 {
718 	uint32_t rnd;
719 	uint64_t evpid;
720 
721 	while ((rnd = arc4random()) == 0)
722 		;
723 
724 	evpid = msgid;
725 	evpid <<= 32;
726 	evpid |= rnd;
727 
728 	return evpid;
729 }
730 
731 static const char*
732 envelope_validate(struct envelope *ep)
733 {
734 	if (ep->version != SMTPD_ENVELOPE_VERSION)
735 		return "version mismatch";
736 
737 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
738 		return "invalid helo";
739 	if (ep->helo[0] == '\0')
740 		return "empty helo";
741 
742 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
743 		return "invalid hostname";
744 	if (ep->hostname[0] == '\0')
745 		return "empty hostname";
746 
747 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
748 		return "invalid error line";
749 
750 	return NULL;
751 }
752 
753 void
754 queue_api_on_close(int(*cb)(void))
755 {
756 	handler_close = cb;
757 }
758 
759 void
760 queue_api_on_message_create(int(*cb)(uint32_t *))
761 {
762 	handler_message_create = cb;
763 }
764 
765 void
766 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
767 {
768 	handler_message_commit = cb;
769 }
770 
771 void
772 queue_api_on_message_delete(int(*cb)(uint32_t))
773 {
774 	handler_message_delete = cb;
775 }
776 
777 void
778 queue_api_on_message_fd_r(int(*cb)(uint32_t))
779 {
780 	handler_message_fd_r = cb;
781 }
782 
783 void
784 queue_api_on_message_corrupt(int(*cb)(uint32_t))
785 {
786 	handler_message_corrupt = cb;
787 }
788 
789 void
790 queue_api_on_message_uncorrupt(int(*cb)(uint32_t))
791 {
792 	handler_message_uncorrupt = cb;
793 }
794 
795 void
796 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
797 {
798 	handler_envelope_create = cb;
799 }
800 
801 void
802 queue_api_on_envelope_delete(int(*cb)(uint64_t))
803 {
804 	handler_envelope_delete = cb;
805 }
806 
807 void
808 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
809 {
810 	handler_envelope_update = cb;
811 }
812 
813 void
814 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
815 {
816 	handler_envelope_load = cb;
817 }
818 
819 void
820 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
821 {
822 	handler_envelope_walk = cb;
823 }
824 
825 void
826 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
827     uint32_t, int *, void **))
828 {
829 	handler_message_walk = cb;
830 }
831