xref: /openbsd-src/usr.sbin/smtpd/queue_backend.c (revision 0b7734b3d77bb9b21afec6f4621cae6c805dbd45)
1 /*	$OpenBSD: queue_backend.c,v 1.62 2016/02/04 12:46:28 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <grp.h>
31 #include <imsg.h>
32 #include <limits.h>
33 #include <inttypes.h>
34 #include <libgen.h>
35 #include <pwd.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <time.h>
40 #include <unistd.h>
41 
42 #include "smtpd.h"
43 #include "log.h"
44 
45 static const char* envelope_validate(struct envelope *);
46 
47 extern struct queue_backend	queue_backend_fs;
48 extern struct queue_backend	queue_backend_null;
49 extern struct queue_backend	queue_backend_proc;
50 extern struct queue_backend	queue_backend_ram;
51 
52 static void queue_envelope_cache_add(struct envelope *);
53 static void queue_envelope_cache_update(struct envelope *);
54 static void queue_envelope_cache_del(uint64_t evpid);
55 
56 TAILQ_HEAD(evplst, envelope);
57 
58 static struct tree		evpcache_tree;
59 static struct evplst		evpcache_list;
60 static struct queue_backend	*backend;
61 
62 static int (*handler_close)(void);
63 static int (*handler_message_create)(uint32_t *);
64 static int (*handler_message_commit)(uint32_t, const char*);
65 static int (*handler_message_delete)(uint32_t);
66 static int (*handler_message_fd_r)(uint32_t);
67 static int (*handler_message_corrupt)(uint32_t);
68 static int (*handler_message_uncorrupt)(uint32_t);
69 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
70 static int (*handler_envelope_delete)(uint64_t);
71 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
72 static int (*handler_envelope_load)(uint64_t, char *, size_t);
73 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
74 static int (*handler_message_walk)(uint64_t *, char *, size_t,
75     uint32_t, int *, void **);
76 
77 #ifdef QUEUE_PROFILING
78 
79 static struct {
80 	struct timespec	 t0;
81 	const char	*name;
82 } profile;
83 
84 static inline void profile_enter(const char *name)
85 {
86 	if ((profiling & PROFILE_QUEUE) == 0)
87 		return;
88 
89 	profile.name = name;
90 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
91 }
92 
93 static inline void profile_leave(void)
94 {
95 	struct timespec	 t1, dt;
96 
97 	if ((profiling & PROFILE_QUEUE) == 0)
98 		return;
99 
100 	clock_gettime(CLOCK_MONOTONIC, &t1);
101 	timespecsub(&t1, &profile.t0, &dt);
102 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
103 	    (long long)dt.tv_sec, dt.tv_nsec);
104 }
105 #else
106 #define profile_enter(x)	do {} while (0)
107 #define profile_leave()		do {} while (0)
108 #endif
109 
110 static int
111 queue_message_path(uint32_t msgid, char *buf, size_t len)
112 {
113 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
114 }
115 
116 int
117 queue_init(const char *name, int server)
118 {
119 	struct passwd	*pwq;
120 	struct group	*gr;
121 	int		 r;
122 
123 	pwq = getpwnam(SMTPD_QUEUE_USER);
124 	if (pwq == NULL)
125 		errx(1, "unknown user %s", SMTPD_QUEUE_USER);
126 
127 	gr = getgrnam(SMTPD_QUEUE_GROUP);
128 	if (gr == NULL)
129 		errx(1, "unknown group %s", SMTPD_QUEUE_GROUP);
130 
131 	tree_init(&evpcache_tree);
132 	TAILQ_INIT(&evpcache_list);
133 
134 	if (!strcmp(name, "fs"))
135 		backend = &queue_backend_fs;
136 	else if (!strcmp(name, "null"))
137 		backend = &queue_backend_null;
138 	else if (!strcmp(name, "ram"))
139 		backend = &queue_backend_ram;
140 	else
141 		backend = &queue_backend_proc;
142 
143 	if (server) {
144 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
145 			errx(1, "error in spool directory setup");
146 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
147 			errx(1, "error in offline directory setup");
148 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
149 			errx(1, "error in purge directory setup");
150 
151 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
152 
153 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
154 			errx(1, "error in purge directory setup");
155 	}
156 
157 	r = backend->init(pwq, server, name);
158 
159 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
160 
161 	return (r);
162 }
163 
164 int
165 queue_close(void)
166 {
167 	if (handler_close)
168 		return (handler_close());
169 
170 	return (1);
171 }
172 
173 int
174 queue_message_create(uint32_t *msgid)
175 {
176 	int	r;
177 
178 	profile_enter("queue_message_create");
179 	r = handler_message_create(msgid);
180 	profile_leave();
181 
182 	log_trace(TRACE_QUEUE,
183 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
184 	    r, *msgid);
185 
186 	return (r);
187 }
188 
189 int
190 queue_message_delete(uint32_t msgid)
191 {
192 	char	msgpath[PATH_MAX];
193 	uint64_t evpid;
194 	void   *iter;
195 	int	r;
196 
197 	profile_enter("queue_message_delete");
198 	r = handler_message_delete(msgid);
199 	profile_leave();
200 
201 	/* in case the message is incoming */
202 	queue_message_path(msgid, msgpath, sizeof(msgpath));
203 	unlink(msgpath);
204 
205 	/* remove remaining envelopes from the cache if any (on rollback) */
206 	evpid = msgid_to_evpid(msgid);
207 	for (;;) {
208 		iter = NULL;
209 		if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
210 			break;
211 		if (evpid_to_msgid(evpid) != msgid)
212 			break;
213 		queue_envelope_cache_del(evpid);
214 	}
215 
216 	log_trace(TRACE_QUEUE,
217 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
218 
219 	return (r);
220 }
221 
222 int
223 queue_message_commit(uint32_t msgid)
224 {
225 	int	r;
226 	char	msgpath[PATH_MAX];
227 	char	tmppath[PATH_MAX];
228 	FILE	*ifp = NULL;
229 	FILE	*ofp = NULL;
230 
231 	profile_enter("queue_message_commit");
232 
233 	queue_message_path(msgid, msgpath, sizeof(msgpath));
234 
235 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
236 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
237 		ifp = fopen(msgpath, "r");
238 		ofp = fopen(tmppath, "w+");
239 		if (ifp == NULL || ofp == NULL)
240 			goto err;
241 		if (!compress_file(ifp, ofp))
242 			goto err;
243 		fclose(ifp);
244 		fclose(ofp);
245 		ifp = NULL;
246 		ofp = NULL;
247 
248 		if (rename(tmppath, msgpath) == -1) {
249 			if (errno == ENOSPC)
250 				return (0);
251 			unlink(tmppath);
252 			log_warn("rename");
253 			return (0);
254 		}
255 	}
256 
257 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
258 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
259 		ifp = fopen(msgpath, "r");
260 		ofp = fopen(tmppath, "w+");
261 		if (ifp == NULL || ofp == NULL)
262 			goto err;
263 		if (!crypto_encrypt_file(ifp, ofp))
264 			goto err;
265 		fclose(ifp);
266 		fclose(ofp);
267 		ifp = NULL;
268 		ofp = NULL;
269 
270 		if (rename(tmppath, msgpath) == -1) {
271 			if (errno == ENOSPC)
272 				return (0);
273 			unlink(tmppath);
274 			log_warn("rename");
275 			return (0);
276 		}
277 	}
278 
279 	r = handler_message_commit(msgid, msgpath);
280 	profile_leave();
281 
282 	/* in case it's not done by the backend */
283 	unlink(msgpath);
284 
285 	log_trace(TRACE_QUEUE,
286 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
287 	    msgid, r);
288 
289 	return (r);
290 
291 err:
292 	if (ifp)
293 		fclose(ifp);
294 	if (ofp)
295 		fclose(ofp);
296 	return 0;
297 }
298 
299 int
300 queue_message_corrupt(uint32_t msgid)
301 {
302 	int	r;
303 
304 	profile_enter("queue_message_corrupt");
305 	r = handler_message_corrupt(msgid);
306 	profile_leave();
307 
308 	log_trace(TRACE_QUEUE,
309 	    "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r);
310 
311 	return (r);
312 }
313 
314 int
315 queue_message_uncorrupt(uint32_t msgid)
316 {
317 	return handler_message_uncorrupt(msgid);
318 }
319 
320 int
321 queue_message_fd_r(uint32_t msgid)
322 {
323 	int	fdin = -1, fdout = -1, fd = -1;
324 	FILE	*ifp = NULL;
325 	FILE	*ofp = NULL;
326 
327 	profile_enter("queue_message_fd_r");
328 	fdin = handler_message_fd_r(msgid);
329 	profile_leave();
330 
331 	log_trace(TRACE_QUEUE,
332 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
333 
334 	if (fdin == -1)
335 		return (-1);
336 
337 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
338 		if ((fdout = mktmpfile()) == -1)
339 			goto err;
340 		if ((fd = dup(fdout)) == -1)
341 			goto err;
342 		if ((ifp = fdopen(fdin, "r")) == NULL)
343 			goto err;
344 		fdin = fd;
345 		fd = -1;
346 		if ((ofp = fdopen(fdout, "w+")) == NULL)
347 			goto err;
348 
349 		if (!crypto_decrypt_file(ifp, ofp))
350 			goto err;
351 
352 		fclose(ifp);
353 		ifp = NULL;
354 		fclose(ofp);
355 		ofp = NULL;
356 		lseek(fdin, SEEK_SET, 0);
357 	}
358 
359 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
360 		if ((fdout = mktmpfile()) == -1)
361 			goto err;
362 		if ((fd = dup(fdout)) == -1)
363 			goto err;
364 		if ((ifp = fdopen(fdin, "r")) == NULL)
365 			goto err;
366 		fdin = fd;
367 		fd = -1;
368 		if ((ofp = fdopen(fdout, "w+")) == NULL)
369 			goto err;
370 
371 		if (!uncompress_file(ifp, ofp))
372 			goto err;
373 
374 		fclose(ifp);
375 		ifp = NULL;
376 		fclose(ofp);
377 		ofp = NULL;
378 		lseek(fdin, SEEK_SET, 0);
379 	}
380 
381 	return (fdin);
382 
383 err:
384 	if (fd != -1)
385 		close(fd);
386 	if (fdin != -1)
387 		close(fdin);
388 	if (fdout != -1)
389 		close(fdout);
390 	if (ifp)
391 		fclose(ifp);
392 	if (ofp)
393 		fclose(ofp);
394 	return -1;
395 }
396 
397 int
398 queue_message_fd_rw(uint32_t msgid)
399 {
400 	char buf[PATH_MAX];
401 
402 	queue_message_path(msgid, buf, sizeof(buf));
403 
404 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
405 }
406 
407 static int
408 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
409 {
410 	char   *evp;
411 	size_t	evplen;
412 	size_t	complen;
413 	char	compbuf[sizeof(struct envelope)];
414 	size_t	enclen;
415 	char	encbuf[sizeof(struct envelope)];
416 
417 	evp = evpbuf;
418 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
419 	if (evplen == 0)
420 		return (0);
421 
422 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
423 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
424 		if (complen == 0)
425 			return (0);
426 		evp = compbuf;
427 		evplen = complen;
428 	}
429 
430 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
431 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
432 		if (enclen == 0)
433 			return (0);
434 		evp = encbuf;
435 		evplen = enclen;
436 	}
437 
438 	memmove(evpbuf, evp, evplen);
439 
440 	return (evplen);
441 }
442 
443 static int
444 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
445 {
446 	char		*evp;
447 	size_t		 evplen;
448 	char		 compbuf[sizeof(struct envelope)];
449 	size_t		 complen;
450 	char		 encbuf[sizeof(struct envelope)];
451 	size_t		 enclen;
452 
453 	evp = evpbuf;
454 	evplen = evpbufsize;
455 
456 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
457 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
458 		if (enclen == 0)
459 			return (0);
460 		evp = encbuf;
461 		evplen = enclen;
462 	}
463 
464 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
465 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
466 		if (complen == 0)
467 			return (0);
468 		evp = compbuf;
469 		evplen = complen;
470 	}
471 
472 	return (envelope_load_buffer(ep, evp, evplen));
473 }
474 
475 static void
476 queue_envelope_cache_add(struct envelope *e)
477 {
478 	struct envelope *cached;
479 
480 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
481 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
482 
483 	cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add");
484 	*cached = *e;
485 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
486 	tree_xset(&evpcache_tree, e->id, cached);
487 	stat_increment("queue.evpcache.size", 1);
488 }
489 
490 static void
491 queue_envelope_cache_update(struct envelope *e)
492 {
493 	struct envelope *cached;
494 
495 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
496 		queue_envelope_cache_add(e);
497 		stat_increment("queue.evpcache.update.missed", 1);
498 	} else {
499 		TAILQ_REMOVE(&evpcache_list, cached, entry);
500 		*cached = *e;
501 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
502 		stat_increment("queue.evpcache.update.hit", 1);
503 	}
504 }
505 
506 static void
507 queue_envelope_cache_del(uint64_t evpid)
508 {
509 	struct envelope *cached;
510 
511 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
512 		return;
513 
514 	TAILQ_REMOVE(&evpcache_list, cached, entry);
515 	free(cached);
516 	stat_decrement("queue.evpcache.size", 1);
517 }
518 
519 int
520 queue_envelope_create(struct envelope *ep)
521 {
522 	int		 r;
523 	char		 evpbuf[sizeof(struct envelope)];
524 	size_t		 evplen;
525 	uint64_t	 evpid;
526 	uint32_t	 msgid;
527 
528 	ep->creation = time(NULL);
529 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
530 	if (evplen == 0)
531 		return (0);
532 
533 	evpid = ep->id;
534 	msgid = evpid_to_msgid(evpid);
535 
536 	profile_enter("queue_envelope_create");
537 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
538 	profile_leave();
539 
540 	log_trace(TRACE_QUEUE,
541 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
542 	    evpid, evplen, r, ep->id);
543 
544 	if (!r) {
545 		ep->creation = 0;
546 		ep->id = 0;
547 	}
548 
549 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
550 		queue_envelope_cache_add(ep);
551 
552 	return (r);
553 }
554 
555 int
556 queue_envelope_delete(uint64_t evpid)
557 {
558 	int	r;
559 
560 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
561 		queue_envelope_cache_del(evpid);
562 
563 	profile_enter("queue_envelope_delete");
564 	r = handler_envelope_delete(evpid);
565 	profile_leave();
566 
567 	log_trace(TRACE_QUEUE,
568 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
569 	    evpid, r);
570 
571 	return (r);
572 }
573 
574 int
575 queue_envelope_load(uint64_t evpid, struct envelope *ep)
576 {
577 	const char	*e;
578 	char		 evpbuf[sizeof(struct envelope)];
579 	size_t		 evplen;
580 	struct envelope	*cached;
581 
582 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
583 	    (cached = tree_get(&evpcache_tree, evpid))) {
584 		*ep = *cached;
585 		stat_increment("queue.evpcache.load.hit", 1);
586 		return (1);
587 	}
588 
589 	ep->id = evpid;
590 	profile_enter("queue_envelope_load");
591 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
592 	profile_leave();
593 
594 	log_trace(TRACE_QUEUE,
595 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
596 	    evpid, evplen);
597 
598 	if (evplen == 0)
599 		return (0);
600 
601 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
602 		if ((e = envelope_validate(ep)) == NULL) {
603 			ep->id = evpid;
604 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
605 				queue_envelope_cache_add(ep);
606 				stat_increment("queue.evpcache.load.missed", 1);
607 			}
608 			return (1);
609 		}
610 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
611 		    ep->id, e);
612 	}
613 
614 	(void)queue_message_corrupt(evpid_to_msgid(evpid));
615 	return (0);
616 }
617 
618 int
619 queue_envelope_update(struct envelope *ep)
620 {
621 	char	evpbuf[sizeof(struct envelope)];
622 	size_t	evplen;
623 	int	r;
624 
625 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
626 	if (evplen == 0)
627 		return (0);
628 
629 	profile_enter("queue_envelope_update");
630 	r = handler_envelope_update(ep->id, evpbuf, evplen);
631 	profile_leave();
632 
633 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
634 		queue_envelope_cache_update(ep);
635 
636 	log_trace(TRACE_QUEUE,
637 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
638 	    ep->id, r);
639 
640 	return (r);
641 }
642 
643 int
644 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
645 {
646 	char		 evpbuf[sizeof(struct envelope)];
647 	uint64_t	 evpid;
648 	int		 r;
649 	const char	*e;
650 
651 	profile_enter("queue_message_walk");
652 	r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
653 	    msgid, done, data);
654 	profile_leave();
655 
656 	log_trace(TRACE_QUEUE,
657 	    "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
658 	    r, evpid);
659 
660 	if (r == -1)
661 		return (r);
662 
663 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
664 		if ((e = envelope_validate(ep)) == NULL) {
665 			ep->id = evpid;
666 			/*
667 			 * do not cache the envelope here, while discovering
668 			 * envelopes one could re-run discover on already
669 			 * scheduled envelopes which leads to triggering of
670 			 * strict checks in caching. Envelopes could anyway
671 			 * be loaded from backend if it isn't cached.
672 			 */
673 			return (1);
674 		}
675 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
676 		    ep->id, e);
677 		(void)queue_message_corrupt(evpid_to_msgid(evpid));
678 	}
679 
680 	return (0);
681 }
682 
683 int
684 queue_envelope_walk(struct envelope *ep)
685 {
686 	const char	*e;
687 	uint64_t	 evpid;
688 	char		 evpbuf[sizeof(struct envelope)];
689 	int		 r;
690 
691 	profile_enter("queue_envelope_walk");
692 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
693 	profile_leave();
694 
695 	log_trace(TRACE_QUEUE,
696 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
697 	    r, evpid);
698 
699 	if (r == -1)
700 		return (r);
701 
702 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
703 		if ((e = envelope_validate(ep)) == NULL) {
704 			ep->id = evpid;
705 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
706 				queue_envelope_cache_add(ep);
707 			return (1);
708 		}
709 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
710 		    ep->id, e);
711 		(void)queue_message_corrupt(evpid_to_msgid(evpid));
712 	}
713 
714 	return (0);
715 }
716 
717 uint32_t
718 queue_generate_msgid(void)
719 {
720 	uint32_t msgid;
721 
722 	while ((msgid = arc4random()) == 0)
723 		;
724 
725 	return msgid;
726 }
727 
728 uint64_t
729 queue_generate_evpid(uint32_t msgid)
730 {
731 	uint32_t rnd;
732 	uint64_t evpid;
733 
734 	while ((rnd = arc4random()) == 0)
735 		;
736 
737 	evpid = msgid;
738 	evpid <<= 32;
739 	evpid |= rnd;
740 
741 	return evpid;
742 }
743 
744 static const char*
745 envelope_validate(struct envelope *ep)
746 {
747 	if (ep->version != SMTPD_ENVELOPE_VERSION)
748 		return "version mismatch";
749 
750 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
751 		return "invalid helo";
752 	if (ep->helo[0] == '\0')
753 		return "empty helo";
754 
755 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
756 		return "invalid hostname";
757 	if (ep->hostname[0] == '\0')
758 		return "empty hostname";
759 
760 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
761 		return "invalid error line";
762 
763 	return NULL;
764 }
765 
766 void
767 queue_api_on_close(int(*cb)(void))
768 {
769 	handler_close = cb;
770 }
771 
772 void
773 queue_api_on_message_create(int(*cb)(uint32_t *))
774 {
775 	handler_message_create = cb;
776 }
777 
778 void
779 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
780 {
781 	handler_message_commit = cb;
782 }
783 
784 void
785 queue_api_on_message_delete(int(*cb)(uint32_t))
786 {
787 	handler_message_delete = cb;
788 }
789 
790 void
791 queue_api_on_message_fd_r(int(*cb)(uint32_t))
792 {
793 	handler_message_fd_r = cb;
794 }
795 
796 void
797 queue_api_on_message_corrupt(int(*cb)(uint32_t))
798 {
799 	handler_message_corrupt = cb;
800 }
801 
802 void
803 queue_api_on_message_uncorrupt(int(*cb)(uint32_t))
804 {
805 	handler_message_uncorrupt = cb;
806 }
807 
808 void
809 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
810 {
811 	handler_envelope_create = cb;
812 }
813 
814 void
815 queue_api_on_envelope_delete(int(*cb)(uint64_t))
816 {
817 	handler_envelope_delete = cb;
818 }
819 
820 void
821 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
822 {
823 	handler_envelope_update = cb;
824 }
825 
826 void
827 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
828 {
829 	handler_envelope_load = cb;
830 }
831 
832 void
833 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
834 {
835 	handler_envelope_walk = cb;
836 }
837 
838 void
839 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
840     uint32_t, int *, void **))
841 {
842 	handler_message_walk = cb;
843 }
844