xref: /openbsd-src/usr.sbin/smtpd/queue_backend.c (revision 48950c12d106c85f315112191a0228d7b83b9510)
1 /*	$OpenBSD: queue_backend.c,v 1.42 2013/01/26 09:37:23 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/param.h>
23 #include <sys/socket.h>
24 #include <sys/stat.h>
25 
26 #include <ctype.h>
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <imsg.h>
31 #include <inttypes.h>
32 #include <libgen.h>
33 #include <pwd.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 
40 #include "smtpd.h"
41 #include "log.h"
42 
43 static const char* envelope_validate(struct envelope *);
44 
45 extern struct queue_backend	queue_backend_fs;
46 extern struct queue_backend	queue_backend_null;
47 extern struct queue_backend	queue_backend_ram;
48 
49 static struct queue_backend	*backend;
50 
51 #ifdef QUEUE_PROFILING
52 
53 static struct {
54 	struct timespec	 t0;
55 	const char	*name;
56 } profile;
57 
58 static inline void profile_enter(const char *name)
59 {
60 	if ((profiling & PROFILE_QUEUE) == 0)
61 		return;
62 
63 	profile.name = name;
64 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
65 }
66 
67 static inline void profile_leave(void)
68 {
69 	struct timespec	 t1, dt;
70 
71 	if ((profiling & PROFILE_QUEUE) == 0)
72 		return;
73 
74 	clock_gettime(CLOCK_MONOTONIC, &t1);
75 	timespecsub(&t1, &profile.t0, &dt);
76 	log_debug("profile-queue: %s %li.%06li", profile.name,
77 	    dt.tv_sec * 1000000 + dt.tv_nsec / 1000000,
78 	    dt.tv_nsec % 1000000);
79 }
80 #else
81 #define profile_enter(x)	do {} while (0)
82 #define profile_leave()		do {} while (0)
83 #endif
84 
85 int
86 queue_message_incoming_path(uint32_t msgid, char *buf, size_t len)
87 {
88 	return bsnprintf(buf, len, "%s/%08x",
89 	    PATH_INCOMING,
90 	    msgid);
91 }
92 
93 int
94 queue_init(const char *name, int server)
95 {
96 	if (!strcmp(name, "fs"))
97 		backend = &queue_backend_fs;
98 	if (!strcmp(name, "null"))
99 		backend = &queue_backend_null;
100 	if (!strcmp(name, "ram"))
101 		backend = &queue_backend_ram;
102 
103 	if (backend == NULL) {
104 		log_warn("could not find queue backend \"%s\"", name);
105 		return (0);
106 	}
107 
108 	return backend->init(server);
109 }
110 
111 int
112 queue_message_create(uint32_t *msgid)
113 {
114 	int	r;
115 
116 	profile_enter("queue_message_create");
117 	r = backend->message(QOP_CREATE, msgid);
118 	profile_leave();
119 
120 	return (r);
121 }
122 
123 int
124 queue_message_delete(uint32_t msgid)
125 {
126 	int	r;
127 
128 	profile_enter("queue_message_delete");
129 	r = backend->message(QOP_DELETE, &msgid);
130 	profile_leave();
131 
132 	return (r);
133 }
134 
135 int
136 queue_message_commit(uint32_t msgid)
137 {
138 	int	r;
139 
140 	profile_enter("queue_message_commit");
141 	r = backend->message(QOP_COMMIT, &msgid);
142 	profile_leave();
143 
144 	return (r);
145 }
146 
147 int
148 queue_message_corrupt(uint32_t msgid)
149 {
150 	int	r;
151 
152 	profile_enter("queue_message_corrupt");
153 	r = backend->message(QOP_CORRUPT, &msgid);
154 	profile_leave();
155 
156 	return (r);
157 }
158 
159 int
160 queue_message_fd_r(uint32_t msgid)
161 {
162 	int	fdin = -1, fdout = -1, fd = -1;
163 	FILE	*ifp = NULL;
164 	FILE	*ofp = NULL;
165 
166 	profile_enter("queue_message_fd_r");
167 	fdin = backend->message(QOP_FD_R, &msgid);
168 	profile_leave();
169 
170 	if (fdin == -1)
171 		return (-1);
172 
173 	if (env->sc_queue_flags & QUEUE_COMPRESS) {
174 		if ((fdout = mktmpfile()) == -1)
175 			goto err;
176 		if ((fd = dup(fdout)) == -1)
177 			goto err;
178 		if ((ifp = fdopen(fdin, "r")) == NULL)
179 			goto err;
180 		fdin = fd;
181 		fd = -1;
182 		if ((ofp = fdopen(fdout, "w+")) == NULL)
183 			goto err;
184 		if (! uncompress_file(ifp, ofp))
185 			goto err;
186 		fclose(ifp);
187 		fclose(ofp);
188 		lseek(fdin, SEEK_SET, 0);
189 	}
190 
191 	return (fdin);
192 
193 err:
194 	if (fd != -1)
195 		close(fd);
196 	if (fdin != -1)
197 		close(fdin);
198 	if (fdout != -1)
199 		close(fdout);
200 	if (ifp)
201 		fclose(ifp);
202 	if (ofp)
203 		fclose(ofp);
204 	return -1;
205 }
206 
207 int
208 queue_message_fd_rw(uint32_t msgid)
209 {
210 	int	r;
211 
212 	profile_enter("queue_message_fd_rw");
213 	r = backend->message(QOP_FD_RW, &msgid);
214 	profile_leave();
215 
216 	return (r);
217 }
218 
219 static int
220 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
221 {
222 	char		 evpbufcom[sizeof(struct envelope)];
223 	char		*evp;
224 	size_t		 evplen;
225 
226 	evp = evpbuf;
227 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
228 	if (evplen == 0)
229 		return (0);
230 
231 	if (env->sc_queue_flags & QUEUE_COMPRESS) {
232 		evplen = compress_buffer(evp, evplen, evpbufcom,
233 		    sizeof evpbufcom);
234 		if (evplen == 0)
235 			return (0);
236 		evp = evpbufcom;
237 	}
238 
239 	memmove(evpbuf, evp, evplen);
240 
241 	return (evplen);
242 }
243 
244 static int
245 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
246 {
247 	char		 evpbufcom[sizeof(struct envelope)];
248 	char		*evp;
249 	size_t		 evplen;
250 
251 	evp = evpbuf;
252 	evplen = evpbufsize;
253 
254 	if (env->sc_queue_flags & QUEUE_COMPRESS) {
255 		evplen = uncompress_buffer(evp, evplen, evpbufcom,
256 		    sizeof evpbufcom);
257 		if (evplen == 0)
258 			return (0);
259 		evp = evpbufcom;
260 	}
261 
262 	return (envelope_load_buffer(ep, evp, evplen));
263 }
264 
265 int
266 queue_envelope_create(struct envelope *ep)
267 {
268 	int		 r;
269 	char		 evpbuf[sizeof(struct envelope)];
270 	size_t		 evplen;
271 
272 	ep->creation = time(NULL);
273 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
274 	if (evplen == 0)
275 		return (0);
276 
277 	profile_enter("queue_envelope_create");
278 	r = backend->envelope(QOP_CREATE, &ep->id, evpbuf, evplen);
279 	profile_leave();
280 	if (!r) {
281 		ep->creation = 0;
282 		ep->id = 0;
283 	}
284 	return (r);
285 }
286 
287 int
288 queue_envelope_delete(uint64_t evpid)
289 {
290 	int	r;
291 
292 	profile_enter("queue_envelope_delete");
293 	r = backend->envelope(QOP_DELETE, &evpid, NULL, 0);
294 	profile_leave();
295 
296 	return (r);
297 }
298 
299 int
300 queue_envelope_load(uint64_t evpid, struct envelope *ep)
301 {
302 	const char	*e;
303 	char		 evpbuf[sizeof(struct envelope)];
304 	size_t		 evplen;
305 
306 	ep->id = evpid;
307 	profile_enter("queue_envelope_load");
308 	evplen = backend->envelope(QOP_LOAD, &ep->id, evpbuf, sizeof evpbuf);
309 	profile_leave();
310 	if (evplen == 0)
311 		return (0);
312 
313 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
314 		if ((e = envelope_validate(ep)) == NULL) {
315 			ep->id = evpid;
316 			return (1);
317 		}
318 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
319 		    ep->id, e);
320 	}
321 	return (0);
322 }
323 
324 int
325 queue_envelope_update(struct envelope *ep)
326 {
327 	char	evpbuf[sizeof(struct envelope)];
328 	size_t	evplen;
329 	int	r;
330 
331 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
332 	if (evplen == 0)
333 		return (0);
334 
335 	profile_enter("queue_envelope_update");
336 	r = backend->envelope(QOP_UPDATE, &ep->id, evpbuf, evplen);
337 	profile_leave();
338 
339 	return (r);
340 }
341 
342 int
343 queue_envelope_walk(struct envelope *ep)
344 {
345 	const char	*e;
346 	uint64_t	 evpid;
347 	char		 evpbuf[sizeof(struct envelope)];
348 	int		 r;
349 
350 	profile_enter("queue_envelope_walk");
351 	r = backend->envelope(QOP_WALK, &evpid, evpbuf, sizeof evpbuf);
352 	profile_leave();
353 	if (r == -1 || r == 0)
354 		return (r);
355 
356 	if (queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
357 		if ((e = envelope_validate(ep)) == NULL) {
358 			ep->id = evpid;
359 			return (1);
360 		}
361 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
362 		    ep->id, e);
363 	}
364 	return (0);
365 }
366 
367 uint32_t
368 queue_generate_msgid(void)
369 {
370 	uint32_t msgid;
371 
372 	while ((msgid = arc4random_uniform(0xffffffff)) == 0)
373 		;
374 
375 	return msgid;
376 }
377 
378 uint64_t
379 queue_generate_evpid(uint32_t msgid)
380 {
381 	uint32_t rnd;
382 	uint64_t evpid;
383 
384 	while ((rnd = arc4random_uniform(0xffffffff)) == 0)
385 		;
386 
387 	evpid = msgid;
388 	evpid <<= 32;
389 	evpid |= rnd;
390 
391 	return evpid;
392 }
393 
394 static const char*
395 envelope_validate(struct envelope *ep)
396 {
397 	if (ep->version != SMTPD_ENVELOPE_VERSION)
398 		return "version mismatch";
399 
400 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
401 		return "invalid helo";
402 	if (ep->helo[0] == '\0')
403 		return "empty helo";
404 
405 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
406 		return "invalid hostname";
407 	if (ep->hostname[0] == '\0')
408 		return "empty hostname";
409 
410 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
411 		return "invalid error line";
412 
413 	return NULL;
414 }
415