1*0dcffd0dSop /* $OpenBSD: queue_backend.c,v 1.69 2023/05/31 16:51:46 op Exp $ */
22a81c1f8Sgilles
32a81c1f8Sgilles /*
465c4fdfbSgilles * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
52a81c1f8Sgilles *
62a81c1f8Sgilles * Permission to use, copy, modify, and distribute this software for any
72a81c1f8Sgilles * purpose with or without fee is hereby granted, provided that the above
82a81c1f8Sgilles * copyright notice and this permission notice appear in all copies.
92a81c1f8Sgilles *
102a81c1f8Sgilles * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
112a81c1f8Sgilles * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
122a81c1f8Sgilles * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
132a81c1f8Sgilles * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
142a81c1f8Sgilles * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
152a81c1f8Sgilles * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
162a81c1f8Sgilles * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
172a81c1f8Sgilles */
182a81c1f8Sgilles
190e8cc8ecSchl #include <errno.h>
20d1c74f7bSchl #include <fcntl.h>
218351d18bSgilles #include <grp.h>
22b5c4b6b5Schl #include <inttypes.h>
232a81c1f8Sgilles #include <pwd.h>
242a81c1f8Sgilles #include <stdlib.h>
252a81c1f8Sgilles #include <string.h>
26*0dcffd0dSop #include <time.h>
272a81c1f8Sgilles #include <unistd.h>
282a81c1f8Sgilles
292a81c1f8Sgilles #include "smtpd.h"
302a81c1f8Sgilles #include "log.h"
312a81c1f8Sgilles
32f29e586fSchl static const char* envelope_validate(struct envelope *);
33148c5951Sgilles
34c91bb5c0Seric extern struct queue_backend queue_backend_fs;
3565c4fdfbSgilles extern struct queue_backend queue_backend_null;
363f70ecafSeric extern struct queue_backend queue_backend_proc;
374d64cfa3Seric extern struct queue_backend queue_backend_ram;
387b5d776dSgilles
393f70ecafSeric static void queue_envelope_cache_add(struct envelope *);
403f70ecafSeric static void queue_envelope_cache_update(struct envelope *);
413f70ecafSeric static void queue_envelope_cache_del(uint64_t evpid);
423f70ecafSeric
433f70ecafSeric TAILQ_HEAD(evplst, envelope);
443f70ecafSeric
453f70ecafSeric static struct tree evpcache_tree;
463f70ecafSeric static struct evplst evpcache_list;
4765c4fdfbSgilles static struct queue_backend *backend;
4865c4fdfbSgilles
4998f67d16Seric static int (*handler_close)(void);
503f70ecafSeric static int (*handler_message_create)(uint32_t *);
513f70ecafSeric static int (*handler_message_commit)(uint32_t, const char*);
523f70ecafSeric static int (*handler_message_delete)(uint32_t);
533f70ecafSeric static int (*handler_message_fd_r)(uint32_t);
543f70ecafSeric static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
553f70ecafSeric static int (*handler_envelope_delete)(uint64_t);
563f70ecafSeric static int (*handler_envelope_update)(uint64_t, const char *, size_t);
573f70ecafSeric static int (*handler_envelope_load)(uint64_t, char *, size_t);
583f70ecafSeric static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
59a9835440Ssunil static int (*handler_message_walk)(uint64_t *, char *, size_t,
60a9835440Ssunil uint32_t, int *, void **);
613f70ecafSeric
6265c4fdfbSgilles #ifdef QUEUE_PROFILING
6365c4fdfbSgilles
6465c4fdfbSgilles static struct {
6565c4fdfbSgilles struct timespec t0;
6665c4fdfbSgilles const char *name;
6765c4fdfbSgilles } profile;
6865c4fdfbSgilles
profile_enter(const char * name)6965c4fdfbSgilles static inline void profile_enter(const char *name)
7065c4fdfbSgilles {
7165c4fdfbSgilles if ((profiling & PROFILE_QUEUE) == 0)
7265c4fdfbSgilles return;
7365c4fdfbSgilles
7465c4fdfbSgilles profile.name = name;
7565c4fdfbSgilles clock_gettime(CLOCK_MONOTONIC, &profile.t0);
7665c4fdfbSgilles }
7765c4fdfbSgilles
profile_leave(void)7865c4fdfbSgilles static inline void profile_leave(void)
7965c4fdfbSgilles {
8065c4fdfbSgilles struct timespec t1, dt;
8165c4fdfbSgilles
8265c4fdfbSgilles if ((profiling & PROFILE_QUEUE) == 0)
8365c4fdfbSgilles return;
8465c4fdfbSgilles
8565c4fdfbSgilles clock_gettime(CLOCK_MONOTONIC, &t1);
8665c4fdfbSgilles timespecsub(&t1, &profile.t0, &dt);
87e267740eSeric log_debug("profile-queue: %s %lld.%09ld", profile.name,
88e267740eSeric (long long)dt.tv_sec, dt.tv_nsec);
8965c4fdfbSgilles }
9065c4fdfbSgilles #else
9165c4fdfbSgilles #define profile_enter(x) do {} while (0)
9265c4fdfbSgilles #define profile_leave() do {} while (0)
9365c4fdfbSgilles #endif
9465c4fdfbSgilles
953f70ecafSeric static int
queue_message_path(uint32_t msgid,char * buf,size_t len)963f70ecafSeric queue_message_path(uint32_t msgid, char *buf, size_t len)
97d1c74f7bSchl {
983f70ecafSeric return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
99d1c74f7bSchl }
100d1c74f7bSchl
101d1c74f7bSchl int
queue_init(const char * name,int server)10265c4fdfbSgilles queue_init(const char *name, int server)
1032a81c1f8Sgilles {
10411d04e02Seric struct passwd *pwq;
1058351d18bSgilles struct group *gr;
106299c4efeSeric int r;
107299c4efeSeric
10811d04e02Seric pwq = getpwnam(SMTPD_QUEUE_USER);
10911d04e02Seric if (pwq == NULL)
110ff01b044Seric fatalx("unknown user %s", SMTPD_QUEUE_USER);
11111d04e02Seric
1128351d18bSgilles gr = getgrnam(SMTPD_QUEUE_GROUP);
1138351d18bSgilles if (gr == NULL)
114ff01b044Seric fatalx("unknown group %s", SMTPD_QUEUE_GROUP);
1158351d18bSgilles
1163f70ecafSeric tree_init(&evpcache_tree);
1173f70ecafSeric TAILQ_INIT(&evpcache_list);
1183f70ecafSeric
119945f2d46Seric if (!strcmp(name, "fs"))
12065c4fdfbSgilles backend = &queue_backend_fs;
12198f67d16Seric else if (!strcmp(name, "null"))
12265c4fdfbSgilles backend = &queue_backend_null;
12398f67d16Seric else if (!strcmp(name, "ram"))
12465c4fdfbSgilles backend = &queue_backend_ram;
12598f67d16Seric else
12698f67d16Seric backend = &queue_backend_proc;
12765c4fdfbSgilles
12811d04e02Seric if (server) {
12911d04e02Seric if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
130ff01b044Seric fatalx("error in spool directory setup");
1318351d18bSgilles if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
132ff01b044Seric fatalx("error in offline directory setup");
13311d04e02Seric if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
134ff01b044Seric fatalx("error in purge directory setup");
13511d04e02Seric
13611d04e02Seric mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
13711d04e02Seric
13811d04e02Seric if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
139ff01b044Seric fatalx("error in purge directory setup");
14011d04e02Seric }
14111d04e02Seric
14298f67d16Seric r = backend->init(pwq, server, name);
143299c4efeSeric
144d7bcae4dSeric log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
145299c4efeSeric
146299c4efeSeric return (r);
1472a81c1f8Sgilles }
1482a81c1f8Sgilles
1492a81c1f8Sgilles int
queue_close(void)15098f67d16Seric queue_close(void)
15198f67d16Seric {
15298f67d16Seric if (handler_close)
15398f67d16Seric return (handler_close());
15498f67d16Seric
15598f67d16Seric return (1);
15698f67d16Seric }
15798f67d16Seric
15898f67d16Seric int
queue_message_create(uint32_t * msgid)159d2241734Schl queue_message_create(uint32_t *msgid)
1602a81c1f8Sgilles {
16165c4fdfbSgilles int r;
16265c4fdfbSgilles
16365c4fdfbSgilles profile_enter("queue_message_create");
1643f70ecafSeric r = handler_message_create(msgid);
16565c4fdfbSgilles profile_leave();
16665c4fdfbSgilles
167299c4efeSeric log_trace(TRACE_QUEUE,
168d7bcae4dSeric "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
169299c4efeSeric r, *msgid);
170299c4efeSeric
17165c4fdfbSgilles return (r);
1722a81c1f8Sgilles }
1732a81c1f8Sgilles
1742a81c1f8Sgilles int
queue_message_delete(uint32_t msgid)175d2241734Schl queue_message_delete(uint32_t msgid)
1762a81c1f8Sgilles {
177b9fc9a72Sderaadt char msgpath[PATH_MAX];
1787119aab2Seric uint64_t evpid;
1797119aab2Seric void *iter;
18065c4fdfbSgilles int r;
18165c4fdfbSgilles
18265c4fdfbSgilles profile_enter("queue_message_delete");
1833f70ecafSeric r = handler_message_delete(msgid);
18465c4fdfbSgilles profile_leave();
18565c4fdfbSgilles
1863f70ecafSeric /* in case the message is incoming */
1873f70ecafSeric queue_message_path(msgid, msgpath, sizeof(msgpath));
1883f70ecafSeric unlink(msgpath);
1893f70ecafSeric
1907119aab2Seric /* remove remaining envelopes from the cache if any (on rollback) */
1917119aab2Seric evpid = msgid_to_evpid(msgid);
1927119aab2Seric for (;;) {
1937119aab2Seric iter = NULL;
1947119aab2Seric if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
1957119aab2Seric break;
1967119aab2Seric if (evpid_to_msgid(evpid) != msgid)
1977119aab2Seric break;
1987119aab2Seric queue_envelope_cache_del(evpid);
1997119aab2Seric }
2007119aab2Seric
201299c4efeSeric log_trace(TRACE_QUEUE,
202d7bcae4dSeric "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
203299c4efeSeric
20465c4fdfbSgilles return (r);
2052a81c1f8Sgilles }
2062a81c1f8Sgilles
2072a81c1f8Sgilles int
queue_message_commit(uint32_t msgid)208d2241734Schl queue_message_commit(uint32_t msgid)
2092a81c1f8Sgilles {
21065c4fdfbSgilles int r;
211b9fc9a72Sderaadt char msgpath[PATH_MAX];
212b9fc9a72Sderaadt char tmppath[PATH_MAX];
213299c4efeSeric FILE *ifp = NULL;
214299c4efeSeric FILE *ofp = NULL;
2150e8cc8ecSchl
21665c4fdfbSgilles profile_enter("queue_message_commit");
2173f70ecafSeric
2183f70ecafSeric queue_message_path(msgid, msgpath, sizeof(msgpath));
219299c4efeSeric
220299c4efeSeric if (env->sc_queue_flags & QUEUE_COMPRESSION) {
221299c4efeSeric bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
222299c4efeSeric ifp = fopen(msgpath, "r");
223299c4efeSeric ofp = fopen(tmppath, "w+");
224299c4efeSeric if (ifp == NULL || ofp == NULL)
225299c4efeSeric goto err;
226299c4efeSeric if (!compress_file(ifp, ofp))
227299c4efeSeric goto err;
228299c4efeSeric fclose(ifp);
229299c4efeSeric fclose(ofp);
230299c4efeSeric ifp = NULL;
231299c4efeSeric ofp = NULL;
232299c4efeSeric
233299c4efeSeric if (rename(tmppath, msgpath) == -1) {
234299c4efeSeric if (errno == ENOSPC)
235299c4efeSeric return (0);
2363f70ecafSeric unlink(tmppath);
2373f70ecafSeric log_warn("rename");
2383f70ecafSeric return (0);
239299c4efeSeric }
240299c4efeSeric }
241299c4efeSeric
2423f70ecafSeric if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
2433f70ecafSeric bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
2443f70ecafSeric ifp = fopen(msgpath, "r");
2453f70ecafSeric ofp = fopen(tmppath, "w+");
2463f70ecafSeric if (ifp == NULL || ofp == NULL)
2473f70ecafSeric goto err;
2483f70ecafSeric if (!crypto_encrypt_file(ifp, ofp))
2493f70ecafSeric goto err;
2503f70ecafSeric fclose(ifp);
2513f70ecafSeric fclose(ofp);
2523f70ecafSeric ifp = NULL;
2533f70ecafSeric ofp = NULL;
2543f70ecafSeric
2553f70ecafSeric if (rename(tmppath, msgpath) == -1) {
2563f70ecafSeric if (errno == ENOSPC)
2573f70ecafSeric return (0);
2583f70ecafSeric unlink(tmppath);
2593f70ecafSeric log_warn("rename");
2603f70ecafSeric return (0);
2613f70ecafSeric }
2623f70ecafSeric }
2633f70ecafSeric
2643f70ecafSeric r = handler_message_commit(msgid, msgpath);
26565c4fdfbSgilles profile_leave();
2660e8cc8ecSchl
2673f70ecafSeric /* in case it's not done by the backend */
2683f70ecafSeric unlink(msgpath);
2693f70ecafSeric
270299c4efeSeric log_trace(TRACE_QUEUE,
271d7bcae4dSeric "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
272299c4efeSeric msgid, r);
273299c4efeSeric
27465c4fdfbSgilles return (r);
275299c4efeSeric
276299c4efeSeric err:
277299c4efeSeric if (ifp)
278299c4efeSeric fclose(ifp);
279299c4efeSeric if (ofp)
280299c4efeSeric fclose(ofp);
281299c4efeSeric return 0;
2822a81c1f8Sgilles }
2832a81c1f8Sgilles
2842a81c1f8Sgilles int
queue_message_fd_r(uint32_t msgid)285d2241734Schl queue_message_fd_r(uint32_t msgid)
2862a81c1f8Sgilles {
28706cd614cSeric int fdin = -1, fdout = -1, fd = -1;
288e1829a2fSgilles FILE *ifp = NULL;
289e1829a2fSgilles FILE *ofp = NULL;
2900e8cc8ecSchl
29165c4fdfbSgilles profile_enter("queue_message_fd_r");
2923f70ecafSeric fdin = handler_message_fd_r(msgid);
29365c4fdfbSgilles profile_leave();
29465c4fdfbSgilles
295299c4efeSeric log_trace(TRACE_QUEUE,
296d7bcae4dSeric "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
297299c4efeSeric
29865c4fdfbSgilles if (fdin == -1)
29906cd614cSeric return (-1);
3000e8cc8ecSchl
3013f70ecafSeric if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
3023f70ecafSeric if ((fdout = mktmpfile()) == -1)
3033f70ecafSeric goto err;
3043f70ecafSeric if ((fd = dup(fdout)) == -1)
3053f70ecafSeric goto err;
3063f70ecafSeric if ((ifp = fdopen(fdin, "r")) == NULL)
3073f70ecafSeric goto err;
3083f70ecafSeric fdin = fd;
3093f70ecafSeric fd = -1;
3103f70ecafSeric if ((ofp = fdopen(fdout, "w+")) == NULL)
3113f70ecafSeric goto err;
3123f70ecafSeric
3133f70ecafSeric if (!crypto_decrypt_file(ifp, ofp))
3143f70ecafSeric goto err;
3153f70ecafSeric
3163f70ecafSeric fclose(ifp);
317598a04a7Sjsg ifp = NULL;
3183f70ecafSeric fclose(ofp);
319598a04a7Sjsg ofp = NULL;
3203f70ecafSeric lseek(fdin, SEEK_SET, 0);
3213f70ecafSeric }
3223f70ecafSeric
323299c4efeSeric if (env->sc_queue_flags & QUEUE_COMPRESSION) {
32406cd614cSeric if ((fdout = mktmpfile()) == -1)
32506cd614cSeric goto err;
32606cd614cSeric if ((fd = dup(fdout)) == -1)
32706cd614cSeric goto err;
32806cd614cSeric if ((ifp = fdopen(fdin, "r")) == NULL)
32906cd614cSeric goto err;
33006cd614cSeric fdin = fd;
33106cd614cSeric fd = -1;
33206cd614cSeric if ((ofp = fdopen(fdout, "w+")) == NULL)
3330e8cc8ecSchl goto err;
334299c4efeSeric
3357a9b97dfSchl if (!uncompress_file(ifp, ofp))
3367a9b97dfSchl goto err;
337299c4efeSeric
3387a9b97dfSchl fclose(ifp);
339598a04a7Sjsg ifp = NULL;
34006cd614cSeric fclose(ofp);
341598a04a7Sjsg ofp = NULL;
34206cd614cSeric lseek(fdin, SEEK_SET, 0);
3430e8cc8ecSchl }
3440e8cc8ecSchl
3450e8cc8ecSchl return (fdin);
3460e8cc8ecSchl
3470e8cc8ecSchl err:
34806cd614cSeric if (fd != -1)
34906cd614cSeric close(fd);
3500e8cc8ecSchl if (fdin != -1)
3510e8cc8ecSchl close(fdin);
3520e8cc8ecSchl if (fdout != -1)
3530e8cc8ecSchl close(fdout);
354e1829a2fSgilles if (ifp)
355e1829a2fSgilles fclose(ifp);
356e1829a2fSgilles if (ofp)
357e1829a2fSgilles fclose(ofp);
3580e8cc8ecSchl return -1;
3592a81c1f8Sgilles }
3602a81c1f8Sgilles
3612a81c1f8Sgilles int
queue_message_fd_rw(uint32_t msgid)362d2241734Schl queue_message_fd_rw(uint32_t msgid)
3632a81c1f8Sgilles {
364953aae25Sderaadt char buf[PATH_MAX];
365d1c74f7bSchl
3663f70ecafSeric queue_message_path(msgid, buf, sizeof(buf));
367d1c74f7bSchl
3683f70ecafSeric return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
3692a81c1f8Sgilles }
3702a81c1f8Sgilles
37133d46017Schl static int
queue_envelope_dump_buffer(struct envelope * ep,char * evpbuf,size_t evpbufsize)37233d46017Schl queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
37333d46017Schl {
3740e8cc8ecSchl char *evp;
3750e8cc8ecSchl size_t evplen;
376299c4efeSeric size_t complen;
377299c4efeSeric char compbuf[sizeof(struct envelope)];
3783f70ecafSeric size_t enclen;
3793f70ecafSeric char encbuf[sizeof(struct envelope)];
3800e8cc8ecSchl
3810e8cc8ecSchl evp = evpbuf;
3820e8cc8ecSchl evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
3830e8cc8ecSchl if (evplen == 0)
3840e8cc8ecSchl return (0);
3850e8cc8ecSchl
386299c4efeSeric if (env->sc_queue_flags & QUEUE_COMPRESSION) {
387299c4efeSeric complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
388299c4efeSeric if (complen == 0)
3890e8cc8ecSchl return (0);
390299c4efeSeric evp = compbuf;
391299c4efeSeric evplen = complen;
3920e8cc8ecSchl }
393fb3e4771Sgilles
3943f70ecafSeric if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
3953f70ecafSeric enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
3963f70ecafSeric if (enclen == 0)
3973f70ecafSeric return (0);
3983f70ecafSeric evp = encbuf;
3993f70ecafSeric evplen = enclen;
4003f70ecafSeric }
4013f70ecafSeric
4020e8cc8ecSchl memmove(evpbuf, evp, evplen);
4030e8cc8ecSchl
4040e8cc8ecSchl return (evplen);
40533d46017Schl }
40633d46017Schl
40733d46017Schl static int
queue_envelope_load_buffer(struct envelope * ep,char * evpbuf,size_t evpbufsize)40833d46017Schl queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
40933d46017Schl {
4100e8cc8ecSchl char *evp;
4110e8cc8ecSchl size_t evplen;
412299c4efeSeric char compbuf[sizeof(struct envelope)];
413299c4efeSeric size_t complen;
4143f70ecafSeric char encbuf[sizeof(struct envelope)];
4153f70ecafSeric size_t enclen;
4160e8cc8ecSchl
4170e8cc8ecSchl evp = evpbuf;
4180e8cc8ecSchl evplen = evpbufsize;
4190e8cc8ecSchl
4203f70ecafSeric if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
4213f70ecafSeric enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
4223f70ecafSeric if (enclen == 0)
4233f70ecafSeric return (0);
4243f70ecafSeric evp = encbuf;
4253f70ecafSeric evplen = enclen;
4263f70ecafSeric }
4273f70ecafSeric
428299c4efeSeric if (env->sc_queue_flags & QUEUE_COMPRESSION) {
429299c4efeSeric complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
430299c4efeSeric if (complen == 0)
4310e8cc8ecSchl return (0);
432299c4efeSeric evp = compbuf;
433299c4efeSeric evplen = complen;
43433d46017Schl }
43533d46017Schl
4360e8cc8ecSchl return (envelope_load_buffer(ep, evp, evplen));
4370e8cc8ecSchl }
43833d46017Schl
4393f70ecafSeric static void
queue_envelope_cache_add(struct envelope * e)4403f70ecafSeric queue_envelope_cache_add(struct envelope *e)
4413f70ecafSeric {
4423f70ecafSeric struct envelope *cached;
4433f70ecafSeric
4443f70ecafSeric while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
4453f70ecafSeric queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
4463f70ecafSeric
447118c16f3Sgilles cached = xcalloc(1, sizeof *cached);
4483f70ecafSeric *cached = *e;
4493f70ecafSeric TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
4503f70ecafSeric tree_xset(&evpcache_tree, e->id, cached);
4513f70ecafSeric stat_increment("queue.evpcache.size", 1);
4523f70ecafSeric }
4533f70ecafSeric
4543f70ecafSeric static void
queue_envelope_cache_update(struct envelope * e)4553f70ecafSeric queue_envelope_cache_update(struct envelope *e)
4563f70ecafSeric {
4573f70ecafSeric struct envelope *cached;
4583f70ecafSeric
4593f70ecafSeric if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
4603f70ecafSeric queue_envelope_cache_add(e);
4613f70ecafSeric stat_increment("queue.evpcache.update.missed", 1);
4623f70ecafSeric } else {
4633f70ecafSeric TAILQ_REMOVE(&evpcache_list, cached, entry);
4643f70ecafSeric *cached = *e;
4653f70ecafSeric TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
4663f70ecafSeric stat_increment("queue.evpcache.update.hit", 1);
4673f70ecafSeric }
4683f70ecafSeric }
4693f70ecafSeric
4703f70ecafSeric static void
queue_envelope_cache_del(uint64_t evpid)4713f70ecafSeric queue_envelope_cache_del(uint64_t evpid)
4723f70ecafSeric {
4733f70ecafSeric struct envelope *cached;
4743f70ecafSeric
4753f70ecafSeric if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
4763f70ecafSeric return;
4773f70ecafSeric
4783f70ecafSeric TAILQ_REMOVE(&evpcache_list, cached, entry);
4793f70ecafSeric free(cached);
4803f70ecafSeric stat_decrement("queue.evpcache.size", 1);
4813f70ecafSeric }
4823f70ecafSeric
4832a81c1f8Sgilles int
queue_envelope_create(struct envelope * ep)4840bd12636Seric queue_envelope_create(struct envelope *ep)
4852a81c1f8Sgilles {
4868dbeaf78Seric int r;
48733d46017Schl char evpbuf[sizeof(struct envelope)];
48833d46017Schl size_t evplen;
489299c4efeSeric uint64_t evpid;
4903f70ecafSeric uint32_t msgid;
4918dbeaf78Seric
4928dbeaf78Seric ep->creation = time(NULL);
49333d46017Schl evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
49433d46017Schl if (evplen == 0)
49533d46017Schl return (0);
49633d46017Schl
497299c4efeSeric evpid = ep->id;
4983f70ecafSeric msgid = evpid_to_msgid(evpid);
499299c4efeSeric
50065c4fdfbSgilles profile_enter("queue_envelope_create");
5013f70ecafSeric r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
50265c4fdfbSgilles profile_leave();
503299c4efeSeric
504299c4efeSeric log_trace(TRACE_QUEUE,
505d7bcae4dSeric "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
506299c4efeSeric evpid, evplen, r, ep->id);
507299c4efeSeric
5088dbeaf78Seric if (!r) {
5098dbeaf78Seric ep->creation = 0;
5108dbeaf78Seric ep->id = 0;
5118dbeaf78Seric }
512299c4efeSeric
5133f70ecafSeric if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
5143f70ecafSeric queue_envelope_cache_add(ep);
5153f70ecafSeric
5168dbeaf78Seric return (r);
5172a81c1f8Sgilles }
5182a81c1f8Sgilles
5192a81c1f8Sgilles int
queue_envelope_delete(uint64_t evpid)52065c4fdfbSgilles queue_envelope_delete(uint64_t evpid)
5212a81c1f8Sgilles {
52265c4fdfbSgilles int r;
52365c4fdfbSgilles
5243f70ecafSeric if (env->sc_queue_flags & QUEUE_EVPCACHE)
5253f70ecafSeric queue_envelope_cache_del(evpid);
5263f70ecafSeric
52765c4fdfbSgilles profile_enter("queue_envelope_delete");
5283f70ecafSeric r = handler_envelope_delete(evpid);
52965c4fdfbSgilles profile_leave();
53065c4fdfbSgilles
531299c4efeSeric log_trace(TRACE_QUEUE,
532d7bcae4dSeric "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
533299c4efeSeric evpid, r);
534299c4efeSeric
53565c4fdfbSgilles return (r);
5362a81c1f8Sgilles }
5372a81c1f8Sgilles
5382a81c1f8Sgilles int
queue_envelope_load(uint64_t evpid,struct envelope * ep)539d2241734Schl queue_envelope_load(uint64_t evpid, struct envelope *ep)
5402a81c1f8Sgilles {
541d97aaa5bSeric const char *e;
54233d46017Schl char evpbuf[sizeof(struct envelope)];
54333d46017Schl size_t evplen;
5443f70ecafSeric struct envelope *cached;
5453f70ecafSeric
5463f70ecafSeric if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
5473f70ecafSeric (cached = tree_get(&evpcache_tree, evpid))) {
5483f70ecafSeric *ep = *cached;
5493f70ecafSeric stat_increment("queue.evpcache.load.hit", 1);
5503f70ecafSeric return (1);
5513f70ecafSeric }
552d97aaa5bSeric
553148c5951Sgilles ep->id = evpid;
55465c4fdfbSgilles profile_enter("queue_envelope_load");
5553f70ecafSeric evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
55665c4fdfbSgilles profile_leave();
557299c4efeSeric
558299c4efeSeric log_trace(TRACE_QUEUE,
559299c4efeSeric "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
560299c4efeSeric evpid, evplen);
561299c4efeSeric
56233d46017Schl if (evplen == 0)
56333d46017Schl return (0);
56433d46017Schl
56533d46017Schl if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
56610fd240eSeric if ((e = envelope_validate(ep)) == NULL) {
567cc6d91c9Seric ep->id = evpid;
5683f70ecafSeric if (env->sc_queue_flags & QUEUE_EVPCACHE) {
5693f70ecafSeric queue_envelope_cache_add(ep);
5703f70ecafSeric stat_increment("queue.evpcache.load.missed", 1);
5713f70ecafSeric }
572cc6d91c9Seric return (1);
573cc6d91c9Seric }
574ef15259eSgilles log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
575ef15259eSgilles evpid, e);
576d97aaa5bSeric }
577cc6d91c9Seric return (0);
5782a81c1f8Sgilles }
5792a81c1f8Sgilles
5802a81c1f8Sgilles int
queue_envelope_update(struct envelope * ep)5810bd12636Seric queue_envelope_update(struct envelope *ep)
5822a81c1f8Sgilles {
58333d46017Schl char evpbuf[sizeof(struct envelope)];
58433d46017Schl size_t evplen;
58565c4fdfbSgilles int r;
58633d46017Schl
58733d46017Schl evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
58833d46017Schl if (evplen == 0)
58933d46017Schl return (0);
59033d46017Schl
59165c4fdfbSgilles profile_enter("queue_envelope_update");
5923f70ecafSeric r = handler_envelope_update(ep->id, evpbuf, evplen);
59365c4fdfbSgilles profile_leave();
59465c4fdfbSgilles
5953f70ecafSeric if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
5963f70ecafSeric queue_envelope_cache_update(ep);
5973f70ecafSeric
598299c4efeSeric log_trace(TRACE_QUEUE,
599d7bcae4dSeric "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
600299c4efeSeric ep->id, r);
601299c4efeSeric
60265c4fdfbSgilles return (r);
6032a81c1f8Sgilles }
604148c5951Sgilles
6057b5d776dSgilles int
queue_message_walk(struct envelope * ep,uint32_t msgid,int * done,void ** data)606a9835440Ssunil queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
607a9835440Ssunil {
608a9835440Ssunil char evpbuf[sizeof(struct envelope)];
609a9835440Ssunil uint64_t evpid;
610a9835440Ssunil int r;
611a9835440Ssunil const char *e;
612a9835440Ssunil
613a9835440Ssunil profile_enter("queue_message_walk");
614a9835440Ssunil r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
615a9835440Ssunil msgid, done, data);
616a9835440Ssunil profile_leave();
617a9835440Ssunil
618a9835440Ssunil log_trace(TRACE_QUEUE,
619a9835440Ssunil "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
620a9835440Ssunil r, evpid);
621a9835440Ssunil
622a9835440Ssunil if (r == -1)
623a9835440Ssunil return (r);
624a9835440Ssunil
625a9835440Ssunil if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
626a9835440Ssunil if ((e = envelope_validate(ep)) == NULL) {
627a9835440Ssunil ep->id = evpid;
628a9835440Ssunil /*
629a9835440Ssunil * do not cache the envelope here, while discovering
630a9835440Ssunil * envelopes one could re-run discover on already
631a9835440Ssunil * scheduled envelopes which leads to triggering of
632a9835440Ssunil * strict checks in caching. Envelopes could anyway
633a9835440Ssunil * be loaded from backend if it isn't cached.
634a9835440Ssunil */
635a9835440Ssunil return (1);
636a9835440Ssunil }
637ef15259eSgilles log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
638ef15259eSgilles evpid, e);
639a9835440Ssunil }
640e65271f7Ssunil return (0);
641a9835440Ssunil }
642a9835440Ssunil
643a9835440Ssunil int
queue_envelope_walk(struct envelope * ep)6444d64cfa3Seric queue_envelope_walk(struct envelope *ep)
6457b5d776dSgilles {
6464d64cfa3Seric const char *e;
6474d64cfa3Seric uint64_t evpid;
6484d64cfa3Seric char evpbuf[sizeof(struct envelope)];
6494d64cfa3Seric int r;
6507b5d776dSgilles
65165c4fdfbSgilles profile_enter("queue_envelope_walk");
6523f70ecafSeric r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
65365c4fdfbSgilles profile_leave();
654299c4efeSeric
655299c4efeSeric log_trace(TRACE_QUEUE,
656d7bcae4dSeric "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
657299c4efeSeric r, evpid);
658299c4efeSeric
659299c4efeSeric if (r == -1)
6604d64cfa3Seric return (r);
6614d64cfa3Seric
662299c4efeSeric if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
6634d64cfa3Seric if ((e = envelope_validate(ep)) == NULL) {
6644d64cfa3Seric ep->id = evpid;
6653f70ecafSeric if (env->sc_queue_flags & QUEUE_EVPCACHE)
6663f70ecafSeric queue_envelope_cache_add(ep);
6674d64cfa3Seric return (1);
6684d64cfa3Seric }
669ef15259eSgilles log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
670ef15259eSgilles evpid, e);
6714d64cfa3Seric }
6724d64cfa3Seric return (0);
6737b5d776dSgilles }
6747b5d776dSgilles
675d2241734Schl uint32_t
queue_generate_msgid(void)6767b5d776dSgilles queue_generate_msgid(void)
6777b5d776dSgilles {
678d2241734Schl uint32_t msgid;
6797b5d776dSgilles
6806351c710Sgilles while ((msgid = arc4random()) == 0)
6819f0761fdSeric ;
6827b5d776dSgilles
6837b5d776dSgilles return msgid;
6847b5d776dSgilles }
6857b5d776dSgilles
686d2241734Schl uint64_t
queue_generate_evpid(uint32_t msgid)687d2241734Schl queue_generate_evpid(uint32_t msgid)
6887b5d776dSgilles {
689d2241734Schl uint32_t rnd;
690d2241734Schl uint64_t evpid;
6917b5d776dSgilles
6926351c710Sgilles while ((rnd = arc4random()) == 0)
6939f0761fdSeric ;
6947b5d776dSgilles
6957b5d776dSgilles evpid = msgid;
6967b5d776dSgilles evpid <<= 32;
6977b5d776dSgilles evpid |= rnd;
6987b5d776dSgilles
6997b5d776dSgilles return evpid;
7007b5d776dSgilles }
7017b5d776dSgilles
702d97aaa5bSeric static const char*
envelope_validate(struct envelope * ep)703f29e586fSchl envelope_validate(struct envelope *ep)
704148c5951Sgilles {
705148c5951Sgilles if (ep->version != SMTPD_ENVELOPE_VERSION)
706d97aaa5bSeric return "version mismatch";
707148c5951Sgilles
708d97aaa5bSeric if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
709d97aaa5bSeric return "invalid helo";
710148c5951Sgilles if (ep->helo[0] == '\0')
711d97aaa5bSeric return "empty helo";
712148c5951Sgilles
713d97aaa5bSeric if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
714d97aaa5bSeric return "invalid hostname";
715148c5951Sgilles if (ep->hostname[0] == '\0')
716d97aaa5bSeric return "empty hostname";
717148c5951Sgilles
718d97aaa5bSeric if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
719d97aaa5bSeric return "invalid error line";
720148c5951Sgilles
72138b53eb3Seric if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL)
72238b53eb3Seric return "unknown dispatcher";
72338b53eb3Seric
724d97aaa5bSeric return NULL;
725148c5951Sgilles }
7263f70ecafSeric
7273f70ecafSeric void
queue_api_on_close(int (* cb)(void))72898f67d16Seric queue_api_on_close(int(*cb)(void))
72998f67d16Seric {
73098f67d16Seric handler_close = cb;
73198f67d16Seric }
73298f67d16Seric
73398f67d16Seric void
queue_api_on_message_create(int (* cb)(uint32_t *))7343f70ecafSeric queue_api_on_message_create(int(*cb)(uint32_t *))
7353f70ecafSeric {
7363f70ecafSeric handler_message_create = cb;
7373f70ecafSeric }
7383f70ecafSeric
7393f70ecafSeric void
queue_api_on_message_commit(int (* cb)(uint32_t,const char *))7403f70ecafSeric queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
7413f70ecafSeric {
7423f70ecafSeric handler_message_commit = cb;
7433f70ecafSeric }
7443f70ecafSeric
7453f70ecafSeric void
queue_api_on_message_delete(int (* cb)(uint32_t))7463f70ecafSeric queue_api_on_message_delete(int(*cb)(uint32_t))
7473f70ecafSeric {
7483f70ecafSeric handler_message_delete = cb;
7493f70ecafSeric }
7503f70ecafSeric
7513f70ecafSeric void
queue_api_on_message_fd_r(int (* cb)(uint32_t))7523f70ecafSeric queue_api_on_message_fd_r(int(*cb)(uint32_t))
7533f70ecafSeric {
7543f70ecafSeric handler_message_fd_r = cb;
7553f70ecafSeric }
7563f70ecafSeric
7573f70ecafSeric void
queue_api_on_envelope_create(int (* cb)(uint32_t,const char *,size_t,uint64_t *))7583f70ecafSeric queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
7593f70ecafSeric {
7603f70ecafSeric handler_envelope_create = cb;
7613f70ecafSeric }
7623f70ecafSeric
7633f70ecafSeric void
queue_api_on_envelope_delete(int (* cb)(uint64_t))7643f70ecafSeric queue_api_on_envelope_delete(int(*cb)(uint64_t))
7653f70ecafSeric {
7663f70ecafSeric handler_envelope_delete = cb;
7673f70ecafSeric }
7683f70ecafSeric
7693f70ecafSeric void
queue_api_on_envelope_update(int (* cb)(uint64_t,const char *,size_t))7703f70ecafSeric queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
7713f70ecafSeric {
7723f70ecafSeric handler_envelope_update = cb;
7733f70ecafSeric }
7743f70ecafSeric
7753f70ecafSeric void
queue_api_on_envelope_load(int (* cb)(uint64_t,char *,size_t))7763f70ecafSeric queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
7773f70ecafSeric {
7783f70ecafSeric handler_envelope_load = cb;
7793f70ecafSeric }
7803f70ecafSeric
7813f70ecafSeric void
queue_api_on_envelope_walk(int (* cb)(uint64_t *,char *,size_t))7823f70ecafSeric queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
7833f70ecafSeric {
7843f70ecafSeric handler_envelope_walk = cb;
7853f70ecafSeric }
786a9835440Ssunil
787a9835440Ssunil void
queue_api_on_message_walk(int (* cb)(uint64_t *,char *,size_t,uint32_t,int *,void **))788a9835440Ssunil queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
789a9835440Ssunil uint32_t, int *, void **))
790a9835440Ssunil {
791a9835440Ssunil handler_message_walk = cb;
792a9835440Ssunil }
793