1 /* $OpenBSD: queue_proc.c,v 1.7 2018/05/14 15:23:05 gilles Exp $ */ 2 3 /* 4 * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <errno.h> 27 #include <event.h> 28 #include <fcntl.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <libgen.h> 32 #include <pwd.h> 33 #include <stdio.h> 34 #include <stdlib.h> 35 #include <string.h> 36 #include <time.h> 37 #include <unistd.h> 38 #include <limits.h> 39 40 #include "smtpd.h" 41 #include "log.h" 42 43 static struct imsgbuf ibuf; 44 static struct imsg imsg; 45 static size_t rlen; 46 static char *rdata; 47 48 static void 49 queue_proc_call(void) 50 { 51 ssize_t n; 52 53 if (imsg_flush(&ibuf) == -1) { 54 log_warn("warn: queue-proc: imsg_flush"); 55 fatalx("queue-proc: exiting"); 56 } 57 58 while (1) { 59 if ((n = imsg_get(&ibuf, &imsg)) == -1) { 60 log_warn("warn: queue-proc: imsg_get"); 61 break; 62 } 63 if (n) { 64 rlen = imsg.hdr.len - IMSG_HEADER_SIZE; 65 rdata = imsg.data; 66 67 if (imsg.hdr.type != PROC_QUEUE_OK) { 68 log_warnx("warn: queue-proc: bad response"); 69 break; 70 } 71 return; 72 } 73 74 if ((n = imsg_read(&ibuf)) == -1 && errno != EAGAIN) { 75 log_warn("warn: queue-proc: imsg_read"); 76 break; 77 } 78 79 if (n == 0) { 80 log_warnx("warn: queue-proc: pipe closed"); 81 break; 82 } 83 } 84 85 fatalx("queue-proc: exiting"); 86 } 87 88 static void 89 queue_proc_read(void *dst, size_t len) 90 { 91 if (len > rlen) { 92 log_warnx("warn: queue-proc: bad msg len"); 93 fatalx("queue-proc: exiting"); 94 } 95 96 memmove(dst, rdata, len); 97 rlen -= len; 98 rdata += len; 99 } 100 101 static void 102 queue_proc_end(void) 103 { 104 if (rlen) { 105 log_warnx("warn: queue-proc: bogus data"); 106 fatalx("queue-proc: exiting"); 107 } 108 imsg_free(&imsg); 109 } 110 111 /* 112 * API 113 */ 114 115 static int 116 queue_proc_close(void) 117 { 118 int r; 119 120 imsg_compose(&ibuf, PROC_QUEUE_CLOSE, 0, 0, -1, NULL, 0); 121 122 queue_proc_call(); 123 queue_proc_read(&r, sizeof(r)); 124 queue_proc_end(); 125 126 return (r); 127 } 128 129 static int 130 queue_proc_message_create(uint32_t *msgid) 131 { 132 int r; 133 134 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0); 135 136 queue_proc_call(); 137 queue_proc_read(&r, sizeof(r)); 138 if (r == 1) 139 queue_proc_read(msgid, sizeof(*msgid)); 140 queue_proc_end(); 141 142 return (r); 143 } 144 145 static int 146 queue_proc_message_commit(uint32_t msgid, const char *path) 147 { 148 int r, fd; 149 150 fd = open(path, O_RDONLY); 151 if (fd == -1) { 152 log_warn("queue-proc: open: %s", path); 153 return (0); 154 } 155 156 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid, 157 sizeof(msgid)); 158 159 queue_proc_call(); 160 queue_proc_read(&r, sizeof(r)); 161 queue_proc_end(); 162 163 return (r); 164 } 165 166 static int 167 queue_proc_message_delete(uint32_t msgid) 168 { 169 int r; 170 171 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid, 172 sizeof(msgid)); 173 174 queue_proc_call(); 175 queue_proc_read(&r, sizeof(r)); 176 queue_proc_end(); 177 178 return (r); 179 } 180 181 static int 182 queue_proc_message_fd_r(uint32_t msgid) 183 { 184 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid, 185 sizeof(msgid)); 186 187 queue_proc_call(); 188 queue_proc_end(); 189 190 return (imsg.fd); 191 } 192 193 static int 194 queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len, 195 uint64_t *evpid) 196 { 197 struct ibuf *b; 198 int r; 199 200 msgid = evpid_to_msgid(*evpid); 201 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0, 202 sizeof(msgid) + len); 203 if (imsg_add(b, &msgid, sizeof(msgid)) == -1 || 204 imsg_add(b, buf, len) == -1) 205 return (0); 206 imsg_close(&ibuf, b); 207 208 queue_proc_call(); 209 queue_proc_read(&r, sizeof(r)); 210 if (r == 1) 211 queue_proc_read(evpid, sizeof(*evpid)); 212 queue_proc_end(); 213 214 return (r); 215 } 216 217 static int 218 queue_proc_envelope_delete(uint64_t evpid) 219 { 220 int r; 221 222 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid, 223 sizeof(evpid)); 224 225 queue_proc_call(); 226 queue_proc_read(&r, sizeof(r)); 227 queue_proc_end(); 228 229 return (r); 230 } 231 232 static int 233 queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len) 234 { 235 struct ibuf *b; 236 int r; 237 238 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0, 239 len + sizeof(evpid)); 240 if (imsg_add(b, &evpid, sizeof(evpid)) == -1 || 241 imsg_add(b, buf, len) == -1) 242 return (0); 243 imsg_close(&ibuf, b); 244 245 queue_proc_call(); 246 queue_proc_read(&r, sizeof(r)); 247 queue_proc_end(); 248 249 return (r); 250 } 251 252 static int 253 queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len) 254 { 255 int r; 256 257 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid, 258 sizeof(evpid)); 259 260 queue_proc_call(); 261 262 if (rlen > len) { 263 log_warnx("warn: queue-proc: buf too small"); 264 fatalx("queue-proc: exiting"); 265 } 266 267 r = rlen; 268 queue_proc_read(buf, rlen); 269 queue_proc_end(); 270 271 return (r); 272 } 273 274 static int 275 queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len) 276 { 277 int r; 278 279 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0); 280 281 queue_proc_call(); 282 queue_proc_read(&r, sizeof(r)); 283 284 if (r > 0) { 285 queue_proc_read(evpid, sizeof(*evpid)); 286 if (rlen > len) { 287 log_warnx("warn: queue-proc: buf too small"); 288 fatalx("queue-proc: exiting"); 289 } 290 if (r != (int)rlen) { 291 log_warnx("warn: queue-proc: len mismatch"); 292 fatalx("queue-proc: exiting"); 293 } 294 queue_proc_read(buf, rlen); 295 } 296 queue_proc_end(); 297 298 return (r); 299 } 300 301 static int 302 queue_proc_init(struct passwd *pw, int server, const char *conf) 303 { 304 uint32_t version; 305 int fd; 306 307 fd = fork_proc_backend("queue", conf, "queue-proc"); 308 if (fd == -1) 309 fatalx("queue-proc: exiting"); 310 311 imsg_init(&ibuf, fd); 312 313 version = PROC_QUEUE_API_VERSION; 314 imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1, 315 &version, sizeof(version)); 316 317 queue_api_on_close(queue_proc_close); 318 queue_api_on_message_create(queue_proc_message_create); 319 queue_api_on_message_commit(queue_proc_message_commit); 320 queue_api_on_message_delete(queue_proc_message_delete); 321 queue_api_on_message_fd_r(queue_proc_message_fd_r); 322 queue_api_on_envelope_create(queue_proc_envelope_create); 323 queue_api_on_envelope_delete(queue_proc_envelope_delete); 324 queue_api_on_envelope_update(queue_proc_envelope_update); 325 queue_api_on_envelope_load(queue_proc_envelope_load); 326 queue_api_on_envelope_walk(queue_proc_envelope_walk); 327 328 queue_proc_call(); 329 queue_proc_end(); 330 331 return (1); 332 } 333 334 struct queue_backend queue_backend_proc = { 335 queue_proc_init, 336 }; 337