1 /* $OpenBSD: queue_backend.c,v 1.42 2013/01/26 09:37:23 gilles Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/param.h> 23 #include <sys/socket.h> 24 #include <sys/stat.h> 25 26 #include <ctype.h> 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <imsg.h> 31 #include <inttypes.h> 32 #include <libgen.h> 33 #include <pwd.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <time.h> 38 #include <unistd.h> 39 40 #include "smtpd.h" 41 #include "log.h" 42 43 static const char* envelope_validate(struct envelope *); 44 45 extern struct queue_backend queue_backend_fs; 46 extern struct queue_backend queue_backend_null; 47 extern struct queue_backend queue_backend_ram; 48 49 static struct queue_backend *backend; 50 51 #ifdef QUEUE_PROFILING 52 53 static struct { 54 struct timespec t0; 55 const char *name; 56 } profile; 57 58 static inline void profile_enter(const char *name) 59 { 60 if ((profiling & PROFILE_QUEUE) == 0) 61 return; 62 63 profile.name = name; 64 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 65 } 66 67 static inline void profile_leave(void) 68 { 69 struct timespec t1, dt; 70 71 if ((profiling & PROFILE_QUEUE) == 0) 72 return; 73 74 clock_gettime(CLOCK_MONOTONIC, &t1); 75 timespecsub(&t1, &profile.t0, &dt); 76 log_debug("profile-queue: %s %li.%06li", profile.name, 77 dt.tv_sec * 1000000 + dt.tv_nsec / 1000000, 78 dt.tv_nsec % 1000000); 79 } 80 #else 81 #define profile_enter(x) do {} while (0) 82 #define profile_leave() do {} while (0) 83 #endif 84 85 int 86 queue_message_incoming_path(uint32_t msgid, char *buf, size_t len) 87 { 88 return bsnprintf(buf, len, "%s/%08x", 89 PATH_INCOMING, 90 msgid); 91 } 92 93 int 94 queue_init(const char *name, int server) 95 { 96 if (!strcmp(name, "fs")) 97 backend = &queue_backend_fs; 98 if (!strcmp(name, "null")) 99 backend = &queue_backend_null; 100 if (!strcmp(name, "ram")) 101 backend = &queue_backend_ram; 102 103 if (backend == NULL) { 104 log_warn("could not find queue backend \"%s\"", name); 105 return (0); 106 } 107 108 return backend->init(server); 109 } 110 111 int 112 queue_message_create(uint32_t *msgid) 113 { 114 int r; 115 116 profile_enter("queue_message_create"); 117 r = backend->message(QOP_CREATE, msgid); 118 profile_leave(); 119 120 return (r); 121 } 122 123 int 124 queue_message_delete(uint32_t msgid) 125 { 126 int r; 127 128 profile_enter("queue_message_delete"); 129 r = backend->message(QOP_DELETE, &msgid); 130 profile_leave(); 131 132 return (r); 133 } 134 135 int 136 queue_message_commit(uint32_t msgid) 137 { 138 int r; 139 140 profile_enter("queue_message_commit"); 141 r = backend->message(QOP_COMMIT, &msgid); 142 profile_leave(); 143 144 return (r); 145 } 146 147 int 148 queue_message_corrupt(uint32_t msgid) 149 { 150 int r; 151 152 profile_enter("queue_message_corrupt"); 153 r = backend->message(QOP_CORRUPT, &msgid); 154 profile_leave(); 155 156 return (r); 157 } 158 159 int 160 queue_message_fd_r(uint32_t msgid) 161 { 162 int fdin = -1, fdout = -1, fd = -1; 163 FILE *ifp = NULL; 164 FILE *ofp = NULL; 165 166 profile_enter("queue_message_fd_r"); 167 fdin = backend->message(QOP_FD_R, &msgid); 168 profile_leave(); 169 170 if (fdin == -1) 171 return (-1); 172 173 if (env->sc_queue_flags & QUEUE_COMPRESS) { 174 if ((fdout = mktmpfile()) == -1) 175 goto err; 176 if ((fd = dup(fdout)) == -1) 177 goto err; 178 if ((ifp = fdopen(fdin, "r")) == NULL) 179 goto err; 180 fdin = fd; 181 fd = -1; 182 if ((ofp = fdopen(fdout, "w+")) == NULL) 183 goto err; 184 if (! uncompress_file(ifp, ofp)) 185 goto err; 186 fclose(ifp); 187 fclose(ofp); 188 lseek(fdin, SEEK_SET, 0); 189 } 190 191 return (fdin); 192 193 err: 194 if (fd != -1) 195 close(fd); 196 if (fdin != -1) 197 close(fdin); 198 if (fdout != -1) 199 close(fdout); 200 if (ifp) 201 fclose(ifp); 202 if (ofp) 203 fclose(ofp); 204 return -1; 205 } 206 207 int 208 queue_message_fd_rw(uint32_t msgid) 209 { 210 int r; 211 212 profile_enter("queue_message_fd_rw"); 213 r = backend->message(QOP_FD_RW, &msgid); 214 profile_leave(); 215 216 return (r); 217 } 218 219 static int 220 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 221 { 222 char evpbufcom[sizeof(struct envelope)]; 223 char *evp; 224 size_t evplen; 225 226 evp = evpbuf; 227 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 228 if (evplen == 0) 229 return (0); 230 231 if (env->sc_queue_flags & QUEUE_COMPRESS) { 232 evplen = compress_buffer(evp, evplen, evpbufcom, 233 sizeof evpbufcom); 234 if (evplen == 0) 235 return (0); 236 evp = evpbufcom; 237 } 238 239 memmove(evpbuf, evp, evplen); 240 241 return (evplen); 242 } 243 244 static int 245 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 246 { 247 char evpbufcom[sizeof(struct envelope)]; 248 char *evp; 249 size_t evplen; 250 251 evp = evpbuf; 252 evplen = evpbufsize; 253 254 if (env->sc_queue_flags & QUEUE_COMPRESS) { 255 evplen = uncompress_buffer(evp, evplen, evpbufcom, 256 sizeof evpbufcom); 257 if (evplen == 0) 258 return (0); 259 evp = evpbufcom; 260 } 261 262 return (envelope_load_buffer(ep, evp, evplen)); 263 } 264 265 int 266 queue_envelope_create(struct envelope *ep) 267 { 268 int r; 269 char evpbuf[sizeof(struct envelope)]; 270 size_t evplen; 271 272 ep->creation = time(NULL); 273 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 274 if (evplen == 0) 275 return (0); 276 277 profile_enter("queue_envelope_create"); 278 r = backend->envelope(QOP_CREATE, &ep->id, evpbuf, evplen); 279 profile_leave(); 280 if (!r) { 281 ep->creation = 0; 282 ep->id = 0; 283 } 284 return (r); 285 } 286 287 int 288 queue_envelope_delete(uint64_t evpid) 289 { 290 int r; 291 292 profile_enter("queue_envelope_delete"); 293 r = backend->envelope(QOP_DELETE, &evpid, NULL, 0); 294 profile_leave(); 295 296 return (r); 297 } 298 299 int 300 queue_envelope_load(uint64_t evpid, struct envelope *ep) 301 { 302 const char *e; 303 char evpbuf[sizeof(struct envelope)]; 304 size_t evplen; 305 306 ep->id = evpid; 307 profile_enter("queue_envelope_load"); 308 evplen = backend->envelope(QOP_LOAD, &ep->id, evpbuf, sizeof evpbuf); 309 profile_leave(); 310 if (evplen == 0) 311 return (0); 312 313 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 314 if ((e = envelope_validate(ep)) == NULL) { 315 ep->id = evpid; 316 return (1); 317 } 318 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 319 ep->id, e); 320 } 321 return (0); 322 } 323 324 int 325 queue_envelope_update(struct envelope *ep) 326 { 327 char evpbuf[sizeof(struct envelope)]; 328 size_t evplen; 329 int r; 330 331 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 332 if (evplen == 0) 333 return (0); 334 335 profile_enter("queue_envelope_update"); 336 r = backend->envelope(QOP_UPDATE, &ep->id, evpbuf, evplen); 337 profile_leave(); 338 339 return (r); 340 } 341 342 int 343 queue_envelope_walk(struct envelope *ep) 344 { 345 const char *e; 346 uint64_t evpid; 347 char evpbuf[sizeof(struct envelope)]; 348 int r; 349 350 profile_enter("queue_envelope_walk"); 351 r = backend->envelope(QOP_WALK, &evpid, evpbuf, sizeof evpbuf); 352 profile_leave(); 353 if (r == -1 || r == 0) 354 return (r); 355 356 if (queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 357 if ((e = envelope_validate(ep)) == NULL) { 358 ep->id = evpid; 359 return (1); 360 } 361 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 362 ep->id, e); 363 } 364 return (0); 365 } 366 367 uint32_t 368 queue_generate_msgid(void) 369 { 370 uint32_t msgid; 371 372 while ((msgid = arc4random_uniform(0xffffffff)) == 0) 373 ; 374 375 return msgid; 376 } 377 378 uint64_t 379 queue_generate_evpid(uint32_t msgid) 380 { 381 uint32_t rnd; 382 uint64_t evpid; 383 384 while ((rnd = arc4random_uniform(0xffffffff)) == 0) 385 ; 386 387 evpid = msgid; 388 evpid <<= 32; 389 evpid |= rnd; 390 391 return evpid; 392 } 393 394 static const char* 395 envelope_validate(struct envelope *ep) 396 { 397 if (ep->version != SMTPD_ENVELOPE_VERSION) 398 return "version mismatch"; 399 400 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 401 return "invalid helo"; 402 if (ep->helo[0] == '\0') 403 return "empty helo"; 404 405 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 406 return "invalid hostname"; 407 if (ep->hostname[0] == '\0') 408 return "empty hostname"; 409 410 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 411 return "invalid error line"; 412 413 return NULL; 414 } 415