xref: /openbsd-src/usr.sbin/smtpd/queue_proc.c (revision d1df930ffab53da22f3324c32bed7ac5709915e6)
1 /*	$OpenBSD: queue_proc.c,v 1.7 2018/05/14 15:23:05 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <errno.h>
27 #include <event.h>
28 #include <fcntl.h>
29 #include <imsg.h>
30 #include <inttypes.h>
31 #include <libgen.h>
32 #include <pwd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <time.h>
37 #include <unistd.h>
38 #include <limits.h>
39 
40 #include "smtpd.h"
41 #include "log.h"
42 
43 static struct imsgbuf	 ibuf;
44 static struct imsg	 imsg;
45 static size_t		 rlen;
46 static char		*rdata;
47 
48 static void
49 queue_proc_call(void)
50 {
51 	ssize_t	n;
52 
53 	if (imsg_flush(&ibuf) == -1) {
54 		log_warn("warn: queue-proc: imsg_flush");
55 		fatalx("queue-proc: exiting");
56 	}
57 
58 	while (1) {
59 		if ((n = imsg_get(&ibuf, &imsg)) == -1) {
60 			log_warn("warn: queue-proc: imsg_get");
61 			break;
62 		}
63 		if (n) {
64 			rlen = imsg.hdr.len - IMSG_HEADER_SIZE;
65 			rdata = imsg.data;
66 
67 			if (imsg.hdr.type != PROC_QUEUE_OK) {
68 				log_warnx("warn: queue-proc: bad response");
69 				break;
70 			}
71 			return;
72 		}
73 
74 		if ((n = imsg_read(&ibuf)) == -1 && errno != EAGAIN) {
75 			log_warn("warn: queue-proc: imsg_read");
76 			break;
77 		}
78 
79 		if (n == 0) {
80 			log_warnx("warn: queue-proc: pipe closed");
81 			break;
82 		}
83 	}
84 
85 	fatalx("queue-proc: exiting");
86 }
87 
88 static void
89 queue_proc_read(void *dst, size_t len)
90 {
91 	if (len > rlen) {
92 		log_warnx("warn: queue-proc: bad msg len");
93 		fatalx("queue-proc: exiting");
94 	}
95 
96 	memmove(dst, rdata, len);
97 	rlen -= len;
98 	rdata += len;
99 }
100 
101 static void
102 queue_proc_end(void)
103 {
104 	if (rlen) {
105 		log_warnx("warn: queue-proc: bogus data");
106 		fatalx("queue-proc: exiting");
107 	}
108 	imsg_free(&imsg);
109 }
110 
111 /*
112  * API
113  */
114 
115 static int
116 queue_proc_close(void)
117 {
118 	int	r;
119 
120 	imsg_compose(&ibuf, PROC_QUEUE_CLOSE, 0, 0, -1, NULL, 0);
121 
122 	queue_proc_call();
123 	queue_proc_read(&r, sizeof(r));
124 	queue_proc_end();
125 
126 	return (r);
127 }
128 
129 static int
130 queue_proc_message_create(uint32_t *msgid)
131 {
132 	int	r;
133 
134 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0);
135 
136 	queue_proc_call();
137 	queue_proc_read(&r, sizeof(r));
138 	if (r == 1)
139 		queue_proc_read(msgid, sizeof(*msgid));
140 	queue_proc_end();
141 
142 	return (r);
143 }
144 
145 static int
146 queue_proc_message_commit(uint32_t msgid, const char *path)
147 {
148 	int	r, fd;
149 
150 	fd = open(path, O_RDONLY);
151 	if (fd == -1) {
152 		log_warn("queue-proc: open: %s", path);
153 		return (0);
154 	}
155 
156 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid,
157 	    sizeof(msgid));
158 
159 	queue_proc_call();
160 	queue_proc_read(&r, sizeof(r));
161 	queue_proc_end();
162 
163 	return (r);
164 }
165 
166 static int
167 queue_proc_message_delete(uint32_t msgid)
168 {
169 	int	r;
170 
171 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid,
172 	    sizeof(msgid));
173 
174 	queue_proc_call();
175 	queue_proc_read(&r, sizeof(r));
176 	queue_proc_end();
177 
178 	return (r);
179 }
180 
181 static int
182 queue_proc_message_fd_r(uint32_t msgid)
183 {
184 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid,
185 	    sizeof(msgid));
186 
187 	queue_proc_call();
188 	queue_proc_end();
189 
190 	return (imsg.fd);
191 }
192 
193 static int
194 queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len,
195     uint64_t *evpid)
196 {
197 	struct ibuf	*b;
198 	int		 r;
199 
200 	msgid = evpid_to_msgid(*evpid);
201 	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0,
202 	    sizeof(msgid) + len);
203 	if (imsg_add(b, &msgid, sizeof(msgid)) == -1 ||
204 	    imsg_add(b, buf, len) == -1)
205 		return (0);
206 	imsg_close(&ibuf, b);
207 
208 	queue_proc_call();
209 	queue_proc_read(&r, sizeof(r));
210 	if (r == 1)
211 		queue_proc_read(evpid, sizeof(*evpid));
212 	queue_proc_end();
213 
214 	return (r);
215 }
216 
217 static int
218 queue_proc_envelope_delete(uint64_t evpid)
219 {
220 	int	r;
221 
222 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid,
223 	    sizeof(evpid));
224 
225 	queue_proc_call();
226 	queue_proc_read(&r, sizeof(r));
227 	queue_proc_end();
228 
229 	return (r);
230 }
231 
232 static int
233 queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len)
234 {
235 	struct ibuf	*b;
236 	int		 r;
237 
238 	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0,
239 	    len + sizeof(evpid));
240 	if (imsg_add(b, &evpid, sizeof(evpid)) == -1 ||
241 	    imsg_add(b, buf, len) == -1)
242 		return (0);
243 	imsg_close(&ibuf, b);
244 
245 	queue_proc_call();
246 	queue_proc_read(&r, sizeof(r));
247 	queue_proc_end();
248 
249 	return (r);
250 }
251 
252 static int
253 queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len)
254 {
255 	int	r;
256 
257 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid,
258 	    sizeof(evpid));
259 
260 	queue_proc_call();
261 
262 	if (rlen > len) {
263 		log_warnx("warn: queue-proc: buf too small");
264 		fatalx("queue-proc: exiting");
265 	}
266 
267 	r = rlen;
268 	queue_proc_read(buf, rlen);
269 	queue_proc_end();
270 
271 	return (r);
272 }
273 
274 static int
275 queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len)
276 {
277 	int	r;
278 
279 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0);
280 
281 	queue_proc_call();
282 	queue_proc_read(&r, sizeof(r));
283 
284 	if (r > 0) {
285 		queue_proc_read(evpid, sizeof(*evpid));
286 		if (rlen > len) {
287 			log_warnx("warn: queue-proc: buf too small");
288 			fatalx("queue-proc: exiting");
289 		}
290 		if (r != (int)rlen) {
291 			log_warnx("warn: queue-proc: len mismatch");
292 			fatalx("queue-proc: exiting");
293 		}
294 		queue_proc_read(buf, rlen);
295 	}
296 	queue_proc_end();
297 
298 	return (r);
299 }
300 
301 static int
302 queue_proc_init(struct passwd *pw, int server, const char *conf)
303 {
304 	uint32_t	version;
305 	int		fd;
306 
307 	fd = fork_proc_backend("queue", conf, "queue-proc");
308 	if (fd == -1)
309 		fatalx("queue-proc: exiting");
310 
311 	imsg_init(&ibuf, fd);
312 
313 	version = PROC_QUEUE_API_VERSION;
314 	imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1,
315 	    &version, sizeof(version));
316 
317 	queue_api_on_close(queue_proc_close);
318 	queue_api_on_message_create(queue_proc_message_create);
319 	queue_api_on_message_commit(queue_proc_message_commit);
320 	queue_api_on_message_delete(queue_proc_message_delete);
321 	queue_api_on_message_fd_r(queue_proc_message_fd_r);
322 	queue_api_on_envelope_create(queue_proc_envelope_create);
323 	queue_api_on_envelope_delete(queue_proc_envelope_delete);
324 	queue_api_on_envelope_update(queue_proc_envelope_update);
325 	queue_api_on_envelope_load(queue_proc_envelope_load);
326 	queue_api_on_envelope_walk(queue_proc_envelope_walk);
327 
328 	queue_proc_call();
329 	queue_proc_end();
330 
331 	return (1);
332 }
333 
334 struct queue_backend	queue_backend_proc = {
335 	queue_proc_init,
336 };
337