xref: /openbsd-src/usr.sbin/smtpd/queue_proc.c (revision 50b7afb2c2c0993b0894d4e34bf857cb13ed9c80)
1 /*	$OpenBSD: queue_proc.c,v 1.3 2014/07/08 15:45:32 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/param.h>
23 #include <sys/socket.h>
24 #include <sys/stat.h>
25 
26 #include <ctype.h>
27 #include <event.h>
28 #include <fcntl.h>
29 #include <imsg.h>
30 #include <inttypes.h>
31 #include <libgen.h>
32 #include <pwd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <time.h>
37 #include <unistd.h>
38 
39 #include "smtpd.h"
40 #include "log.h"
41 
42 static struct imsgbuf	 ibuf;
43 static struct imsg	 imsg;
44 static size_t		 rlen;
45 static char		*rdata;
46 
47 static void
48 queue_proc_call(void)
49 {
50 	ssize_t	n;
51 
52 	if (imsg_flush(&ibuf) == -1) {
53 		log_warn("warn: queue-proc: imsg_flush");
54 		fatalx("queue-proc: exiting");
55 	}
56 
57 	while (1) {
58 		if ((n = imsg_get(&ibuf, &imsg)) == -1) {
59 			log_warn("warn: queue-proc: imsg_get");
60 			break;
61 		}
62 		if (n) {
63 			rlen = imsg.hdr.len - IMSG_HEADER_SIZE;
64 			rdata = imsg.data;
65 
66 			if (imsg.hdr.type != PROC_QUEUE_OK) {
67 				log_warnx("warn: queue-proc: bad response");
68 				break;
69 			}
70 			return;
71 		}
72 
73 		if ((n = imsg_read(&ibuf)) == -1) {
74 			log_warn("warn: queue-proc: imsg_read");
75 			break;
76 		}
77 
78 		if (n == 0) {
79 			log_warnx("warn: queue-proc: pipe closed");
80 			break;
81 		}
82 	}
83 
84 	fatalx("queue-proc: exiting");
85 }
86 
87 static void
88 queue_proc_read(void *dst, size_t len)
89 {
90 	if (len > rlen) {
91 		log_warnx("warn: queue-proc: bad msg len");
92 		fatalx("queue-proc: exiting");
93 	}
94 
95 	memmove(dst, rdata, len);
96 	rlen -= len;
97 	rdata += len;
98 }
99 
100 static void
101 queue_proc_end(void)
102 {
103 	if (rlen) {
104 		log_warnx("warn: queue-proc: bogus data");
105 		fatalx("queue-proc: exiting");
106 	}
107 	imsg_free(&imsg);
108 }
109 
110 /*
111  * API
112  */
113 
114 static int
115 queue_proc_close(void)
116 {
117 	int	r;
118 
119 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CORRUPT, 0, 0, -1, NULL, 0);
120 
121 	queue_proc_call();
122 	queue_proc_read(&r, sizeof(r));
123 	queue_proc_end();
124 
125 	return (r);
126 }
127 
128 static int
129 queue_proc_message_create(uint32_t *msgid)
130 {
131 	int	r;
132 
133 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0);
134 
135 	queue_proc_call();
136 	queue_proc_read(&r, sizeof(r));
137 	if (r == 1)
138 		queue_proc_read(msgid, sizeof(*msgid));
139 	queue_proc_end();
140 
141 	return (r);
142 }
143 
144 static int
145 queue_proc_message_commit(uint32_t msgid, const char *path)
146 {
147 	int	r, fd;
148 
149 	fd = open(path, O_RDONLY);
150 	if (fd == -1) {
151 		log_warn("queue-proc: open: %s", path);
152 		return (0);
153 	}
154 
155 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid,
156 	    sizeof(msgid));
157 
158 	queue_proc_call();
159 	queue_proc_read(&r, sizeof(r));
160 	queue_proc_end();
161 
162 	return (r);
163 }
164 
165 static int
166 queue_proc_message_delete(uint32_t msgid)
167 {
168 	int	r;
169 
170 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid,
171 	    sizeof(msgid));
172 
173 	queue_proc_call();
174 	queue_proc_read(&r, sizeof(r));
175 	queue_proc_end();
176 
177 	return (r);
178 }
179 
180 static int
181 queue_proc_message_fd_r(uint32_t msgid)
182 {
183 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid,
184 	    sizeof(msgid));
185 
186 	queue_proc_call();
187 	queue_proc_end();
188 
189 	return (imsg.fd);
190 }
191 
192 static int
193 queue_proc_message_corrupt(uint32_t msgid)
194 {
195 	int	r;
196 
197 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CORRUPT, 0, 0, -1, &msgid,
198 	    sizeof(msgid));
199 
200 	queue_proc_call();
201 	queue_proc_read(&r, sizeof(r));
202 	queue_proc_end();
203 
204 	return (r);
205 }
206 
207 static int
208 queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len,
209     uint64_t *evpid)
210 {
211 	struct ibuf	*b;
212 	int		 r;
213 
214 	msgid = evpid_to_msgid(*evpid);
215 	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0,
216 	    sizeof(msgid) + len);
217 	if (imsg_add(b, &msgid, sizeof(msgid)) == -1 ||
218 	    imsg_add(b, buf, len) == -1)
219 		return (0);
220 	imsg_close(&ibuf, b);
221 
222 	queue_proc_call();
223 	queue_proc_read(&r, sizeof(r));
224 	if (r == 1)
225 		queue_proc_read(evpid, sizeof(*evpid));
226 	queue_proc_end();
227 
228 	return (r);
229 }
230 
231 static int
232 queue_proc_envelope_delete(uint64_t evpid)
233 {
234 	int	r;
235 
236 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid,
237 	    sizeof(evpid));
238 
239 	queue_proc_call();
240 	queue_proc_read(&r, sizeof(r));
241 	queue_proc_end();
242 
243 	return (r);
244 }
245 
246 static int
247 queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len)
248 {
249 	struct ibuf	*b;
250 	int		 r;
251 
252 	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0,
253 	    len + sizeof(evpid));
254 	if (imsg_add(b, &evpid, sizeof(evpid)) == -1 ||
255 	    imsg_add(b, buf, len) == -1)
256 		return (0);
257 	imsg_close(&ibuf, b);
258 
259 	queue_proc_call();
260 	queue_proc_read(&r, sizeof(r));
261 	queue_proc_end();
262 
263 	return (r);
264 }
265 
266 static int
267 queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len)
268 {
269 	int	r;
270 
271 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid,
272 	    sizeof(evpid));
273 
274 	queue_proc_call();
275 
276 	if (rlen > len) {
277 		log_warnx("warn: queue-proc: buf too small");
278 		fatalx("queue-proc: exiting");
279 	}
280 
281 	r = rlen;
282 	queue_proc_read(buf, rlen);
283 	queue_proc_end();
284 
285 	return (r);
286 }
287 
288 static int
289 queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len)
290 {
291 	int	r;
292 
293 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0);
294 
295 	queue_proc_call();
296 	queue_proc_read(&r, sizeof(r));
297 
298 	if (r > 0) {
299 		queue_proc_read(evpid, sizeof(*evpid));
300 		if (rlen > len) {
301 			log_warnx("warn: queue-proc: buf too small");
302 			fatalx("queue-proc: exiting");
303 		}
304 		if (r != (int)rlen) {
305 			log_warnx("warn: queue-proc: len mismatch");
306 			fatalx("queue-proc: exiting");
307 		}
308 		queue_proc_read(buf, rlen);
309 	}
310 	queue_proc_end();
311 
312 	return (r);
313 }
314 
315 static int
316 queue_proc_init(struct passwd *pw, int server, const char *conf)
317 {
318 	uint32_t	version;
319 	int		fd;
320 
321 	fd = fork_proc_backend("queue", conf, "queue-proc");
322 	if (fd == -1)
323 		fatalx("queue-proc: exiting");
324 
325 	imsg_init(&ibuf, fd);
326 
327 	version = PROC_QUEUE_API_VERSION;
328 	imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1,
329 	    &version, sizeof(version));
330 
331 	queue_api_on_close(queue_proc_close);
332 	queue_api_on_message_create(queue_proc_message_create);
333 	queue_api_on_message_commit(queue_proc_message_commit);
334 	queue_api_on_message_delete(queue_proc_message_delete);
335 	queue_api_on_message_fd_r(queue_proc_message_fd_r);
336 	queue_api_on_message_corrupt(queue_proc_message_corrupt);
337 	queue_api_on_envelope_create(queue_proc_envelope_create);
338 	queue_api_on_envelope_delete(queue_proc_envelope_delete);
339 	queue_api_on_envelope_update(queue_proc_envelope_update);
340 	queue_api_on_envelope_load(queue_proc_envelope_load);
341 	queue_api_on_envelope_walk(queue_proc_envelope_walk);
342 
343 	queue_proc_call();
344 	queue_proc_end();
345 
346 	return (1);
347 }
348 
349 struct queue_backend	queue_backend_proc = {
350 	queue_proc_init,
351 };
352