1 /* $OpenBSD: queue_proc.c,v 1.3 2014/07/08 15:45:32 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/param.h> 23 #include <sys/socket.h> 24 #include <sys/stat.h> 25 26 #include <ctype.h> 27 #include <event.h> 28 #include <fcntl.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <libgen.h> 32 #include <pwd.h> 33 #include <stdio.h> 34 #include <stdlib.h> 35 #include <string.h> 36 #include <time.h> 37 #include <unistd.h> 38 39 #include "smtpd.h" 40 #include "log.h" 41 42 static struct imsgbuf ibuf; 43 static struct imsg imsg; 44 static size_t rlen; 45 static char *rdata; 46 47 static void 48 queue_proc_call(void) 49 { 50 ssize_t n; 51 52 if (imsg_flush(&ibuf) == -1) { 53 log_warn("warn: queue-proc: imsg_flush"); 54 fatalx("queue-proc: exiting"); 55 } 56 57 while (1) { 58 if ((n = imsg_get(&ibuf, &imsg)) == -1) { 59 log_warn("warn: queue-proc: imsg_get"); 60 break; 61 } 62 if (n) { 63 rlen = imsg.hdr.len - IMSG_HEADER_SIZE; 64 rdata = imsg.data; 65 66 if (imsg.hdr.type != PROC_QUEUE_OK) { 67 log_warnx("warn: queue-proc: bad response"); 68 break; 69 } 70 return; 71 } 72 73 if ((n = imsg_read(&ibuf)) == -1) { 74 log_warn("warn: queue-proc: imsg_read"); 75 break; 76 } 77 78 if (n == 0) { 79 log_warnx("warn: queue-proc: pipe closed"); 80 break; 81 } 82 } 83 84 fatalx("queue-proc: exiting"); 85 } 86 87 static void 88 queue_proc_read(void *dst, size_t len) 89 { 90 if (len > rlen) { 91 log_warnx("warn: queue-proc: bad msg len"); 92 fatalx("queue-proc: exiting"); 93 } 94 95 memmove(dst, rdata, len); 96 rlen -= len; 97 rdata += len; 98 } 99 100 static void 101 queue_proc_end(void) 102 { 103 if (rlen) { 104 log_warnx("warn: queue-proc: bogus data"); 105 fatalx("queue-proc: exiting"); 106 } 107 imsg_free(&imsg); 108 } 109 110 /* 111 * API 112 */ 113 114 static int 115 queue_proc_close(void) 116 { 117 int r; 118 119 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CORRUPT, 0, 0, -1, NULL, 0); 120 121 queue_proc_call(); 122 queue_proc_read(&r, sizeof(r)); 123 queue_proc_end(); 124 125 return (r); 126 } 127 128 static int 129 queue_proc_message_create(uint32_t *msgid) 130 { 131 int r; 132 133 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0); 134 135 queue_proc_call(); 136 queue_proc_read(&r, sizeof(r)); 137 if (r == 1) 138 queue_proc_read(msgid, sizeof(*msgid)); 139 queue_proc_end(); 140 141 return (r); 142 } 143 144 static int 145 queue_proc_message_commit(uint32_t msgid, const char *path) 146 { 147 int r, fd; 148 149 fd = open(path, O_RDONLY); 150 if (fd == -1) { 151 log_warn("queue-proc: open: %s", path); 152 return (0); 153 } 154 155 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid, 156 sizeof(msgid)); 157 158 queue_proc_call(); 159 queue_proc_read(&r, sizeof(r)); 160 queue_proc_end(); 161 162 return (r); 163 } 164 165 static int 166 queue_proc_message_delete(uint32_t msgid) 167 { 168 int r; 169 170 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid, 171 sizeof(msgid)); 172 173 queue_proc_call(); 174 queue_proc_read(&r, sizeof(r)); 175 queue_proc_end(); 176 177 return (r); 178 } 179 180 static int 181 queue_proc_message_fd_r(uint32_t msgid) 182 { 183 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid, 184 sizeof(msgid)); 185 186 queue_proc_call(); 187 queue_proc_end(); 188 189 return (imsg.fd); 190 } 191 192 static int 193 queue_proc_message_corrupt(uint32_t msgid) 194 { 195 int r; 196 197 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CORRUPT, 0, 0, -1, &msgid, 198 sizeof(msgid)); 199 200 queue_proc_call(); 201 queue_proc_read(&r, sizeof(r)); 202 queue_proc_end(); 203 204 return (r); 205 } 206 207 static int 208 queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len, 209 uint64_t *evpid) 210 { 211 struct ibuf *b; 212 int r; 213 214 msgid = evpid_to_msgid(*evpid); 215 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0, 216 sizeof(msgid) + len); 217 if (imsg_add(b, &msgid, sizeof(msgid)) == -1 || 218 imsg_add(b, buf, len) == -1) 219 return (0); 220 imsg_close(&ibuf, b); 221 222 queue_proc_call(); 223 queue_proc_read(&r, sizeof(r)); 224 if (r == 1) 225 queue_proc_read(evpid, sizeof(*evpid)); 226 queue_proc_end(); 227 228 return (r); 229 } 230 231 static int 232 queue_proc_envelope_delete(uint64_t evpid) 233 { 234 int r; 235 236 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid, 237 sizeof(evpid)); 238 239 queue_proc_call(); 240 queue_proc_read(&r, sizeof(r)); 241 queue_proc_end(); 242 243 return (r); 244 } 245 246 static int 247 queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len) 248 { 249 struct ibuf *b; 250 int r; 251 252 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0, 253 len + sizeof(evpid)); 254 if (imsg_add(b, &evpid, sizeof(evpid)) == -1 || 255 imsg_add(b, buf, len) == -1) 256 return (0); 257 imsg_close(&ibuf, b); 258 259 queue_proc_call(); 260 queue_proc_read(&r, sizeof(r)); 261 queue_proc_end(); 262 263 return (r); 264 } 265 266 static int 267 queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len) 268 { 269 int r; 270 271 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid, 272 sizeof(evpid)); 273 274 queue_proc_call(); 275 276 if (rlen > len) { 277 log_warnx("warn: queue-proc: buf too small"); 278 fatalx("queue-proc: exiting"); 279 } 280 281 r = rlen; 282 queue_proc_read(buf, rlen); 283 queue_proc_end(); 284 285 return (r); 286 } 287 288 static int 289 queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len) 290 { 291 int r; 292 293 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0); 294 295 queue_proc_call(); 296 queue_proc_read(&r, sizeof(r)); 297 298 if (r > 0) { 299 queue_proc_read(evpid, sizeof(*evpid)); 300 if (rlen > len) { 301 log_warnx("warn: queue-proc: buf too small"); 302 fatalx("queue-proc: exiting"); 303 } 304 if (r != (int)rlen) { 305 log_warnx("warn: queue-proc: len mismatch"); 306 fatalx("queue-proc: exiting"); 307 } 308 queue_proc_read(buf, rlen); 309 } 310 queue_proc_end(); 311 312 return (r); 313 } 314 315 static int 316 queue_proc_init(struct passwd *pw, int server, const char *conf) 317 { 318 uint32_t version; 319 int fd; 320 321 fd = fork_proc_backend("queue", conf, "queue-proc"); 322 if (fd == -1) 323 fatalx("queue-proc: exiting"); 324 325 imsg_init(&ibuf, fd); 326 327 version = PROC_QUEUE_API_VERSION; 328 imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1, 329 &version, sizeof(version)); 330 331 queue_api_on_close(queue_proc_close); 332 queue_api_on_message_create(queue_proc_message_create); 333 queue_api_on_message_commit(queue_proc_message_commit); 334 queue_api_on_message_delete(queue_proc_message_delete); 335 queue_api_on_message_fd_r(queue_proc_message_fd_r); 336 queue_api_on_message_corrupt(queue_proc_message_corrupt); 337 queue_api_on_envelope_create(queue_proc_envelope_create); 338 queue_api_on_envelope_delete(queue_proc_envelope_delete); 339 queue_api_on_envelope_update(queue_proc_envelope_update); 340 queue_api_on_envelope_load(queue_proc_envelope_load); 341 queue_api_on_envelope_walk(queue_proc_envelope_walk); 342 343 queue_proc_call(); 344 queue_proc_end(); 345 346 return (1); 347 } 348 349 struct queue_backend queue_backend_proc = { 350 queue_proc_init, 351 }; 352