1 /* $OpenBSD: queue.c,v 1.106 2011/09/01 19:56:49 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * 7 * Permission to use, copy, modify, and distribute this software for any 8 * purpose with or without fee is hereby granted, provided that the above 9 * copyright notice and this permission notice appear in all copies. 10 * 11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 18 */ 19 20 #include <sys/types.h> 21 #include <sys/queue.h> 22 #include <sys/tree.h> 23 #include <sys/param.h> 24 #include <sys/socket.h> 25 #include <sys/stat.h> 26 27 #include <event.h> 28 #include <imsg.h> 29 #include <libgen.h> 30 #include <pwd.h> 31 #include <signal.h> 32 #include <stdio.h> 33 #include <stdlib.h> 34 #include <string.h> 35 #include <unistd.h> 36 37 #include "smtpd.h" 38 #include "log.h" 39 40 static void queue_imsg(struct imsgev *, struct imsg *); 41 static void queue_pass_to_runner(struct imsgev *, struct imsg *); 42 static void queue_shutdown(void); 43 static void queue_sig_handler(int, short, void *); 44 static void queue_purge(enum queue_kind, char *); 45 46 static void 47 queue_imsg(struct imsgev *iev, struct imsg *imsg) 48 { 49 struct submit_status ss; 50 struct envelope *e; 51 struct ramqueue_batch *rq_batch; 52 int fd, ret; 53 54 if (iev->proc == PROC_SMTP) { 55 e = imsg->data; 56 57 switch (imsg->hdr.type) { 58 case IMSG_QUEUE_CREATE_MESSAGE: 59 ss.id = e->session_id; 60 ss.code = 250; 61 ss.u.msgid = 0; 62 if (e->delivery.flags & DF_ENQUEUED) 63 ret = queue_message_create(Q_ENQUEUE, &ss.u.msgid); 64 else 65 ret = queue_message_create(Q_INCOMING, &ss.u.msgid); 66 if (ret == 0) 67 ss.code = 421; 68 imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, 69 &ss, sizeof ss); 70 return; 71 72 case IMSG_QUEUE_REMOVE_MESSAGE: 73 if (e->delivery.flags & DF_ENQUEUED) 74 queue_message_purge(Q_ENQUEUE, evpid_to_msgid(e->delivery.id)); 75 else 76 queue_message_purge(Q_INCOMING, evpid_to_msgid(e->delivery.id)); 77 return; 78 79 case IMSG_QUEUE_COMMIT_MESSAGE: 80 ss.id = e->session_id; 81 if (e->delivery.flags & DF_ENQUEUED) { 82 if (queue_message_commit(Q_ENQUEUE, evpid_to_msgid(e->delivery.id))) 83 stat_increment(STATS_QUEUE_LOCAL); 84 else 85 ss.code = 421; 86 } else { 87 if (queue_message_commit(Q_INCOMING, evpid_to_msgid(e->delivery.id))) 88 stat_increment(STATS_QUEUE_REMOTE); 89 else 90 ss.code = 421; 91 } 92 imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, 93 &ss, sizeof ss); 94 95 if (ss.code != 421) 96 queue_pass_to_runner(iev, imsg); 97 98 return; 99 100 case IMSG_QUEUE_MESSAGE_FILE: 101 ss.id = e->session_id; 102 if (e->delivery.flags & DF_ENQUEUED) 103 fd = queue_message_fd_rw(Q_ENQUEUE, evpid_to_msgid(e->delivery.id)); 104 else 105 fd = queue_message_fd_rw(Q_INCOMING, evpid_to_msgid(e->delivery.id)); 106 if (fd == -1) 107 ss.code = 421; 108 imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, 109 &ss, sizeof ss); 110 return; 111 112 case IMSG_SMTP_ENQUEUE: 113 queue_pass_to_runner(iev, imsg); 114 return; 115 } 116 } 117 118 if (iev->proc == PROC_LKA) { 119 e = imsg->data; 120 121 switch (imsg->hdr.type) { 122 case IMSG_QUEUE_SUBMIT_ENVELOPE: 123 ss.id = e->session_id; 124 125 /* Write to disk */ 126 if (e->delivery.flags & DF_ENQUEUED) 127 ret = queue_envelope_create(Q_ENQUEUE, e); 128 else 129 ret = queue_envelope_create(Q_INCOMING, e); 130 131 if (ret == 0) { 132 ss.code = 421; 133 imsg_compose_event(env->sc_ievs[PROC_SMTP], 134 IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss, 135 sizeof ss); 136 } 137 return; 138 139 case IMSG_QUEUE_COMMIT_ENVELOPES: 140 ss.id = e->session_id; 141 ss.code = 250; 142 imsg_compose_event(env->sc_ievs[PROC_SMTP], 143 IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, &ss, 144 sizeof ss); 145 return; 146 } 147 } 148 149 if (iev->proc == PROC_RUNNER) { 150 /* forward imsgs from runner on its behalf */ 151 imsg_compose_event(env->sc_ievs[imsg->hdr.peerid], imsg->hdr.type, 152 0, imsg->hdr.pid, imsg->fd, (char *)imsg->data, 153 imsg->hdr.len - sizeof imsg->hdr); 154 return; 155 } 156 157 if (iev->proc == PROC_MTA) { 158 switch (imsg->hdr.type) { 159 case IMSG_QUEUE_MESSAGE_FD: 160 rq_batch = imsg->data; 161 fd = queue_message_fd_r(Q_QUEUE, rq_batch->msgid); 162 imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FD, 0, 0, 163 fd, rq_batch, sizeof *rq_batch); 164 return; 165 166 case IMSG_QUEUE_MESSAGE_UPDATE: 167 case IMSG_BATCH_DONE: 168 queue_pass_to_runner(iev, imsg); 169 return; 170 } 171 } 172 173 if (iev->proc == PROC_MDA) { 174 switch (imsg->hdr.type) { 175 case IMSG_QUEUE_MESSAGE_UPDATE: 176 case IMSG_MDA_SESS_NEW: 177 queue_pass_to_runner(iev, imsg); 178 return; 179 } 180 } 181 182 if (iev->proc == PROC_CONTROL) { 183 switch (imsg->hdr.type) { 184 case IMSG_QUEUE_PAUSE_LOCAL: 185 case IMSG_QUEUE_PAUSE_OUTGOING: 186 case IMSG_QUEUE_RESUME_LOCAL: 187 case IMSG_QUEUE_RESUME_OUTGOING: 188 case IMSG_QUEUE_SCHEDULE: 189 case IMSG_QUEUE_REMOVE: 190 queue_pass_to_runner(iev, imsg); 191 return; 192 } 193 } 194 195 if (iev->proc == PROC_PARENT) { 196 switch (imsg->hdr.type) { 197 case IMSG_PARENT_ENQUEUE_OFFLINE: 198 queue_pass_to_runner(iev, imsg); 199 return; 200 201 case IMSG_CTL_VERBOSE: 202 log_verbose(*(int *)imsg->data); 203 queue_pass_to_runner(iev, imsg); 204 return; 205 } 206 } 207 208 fatalx("queue_imsg: unexpected imsg"); 209 } 210 211 static void 212 queue_pass_to_runner(struct imsgev *iev, struct imsg *imsg) 213 { 214 imsg_compose_event(env->sc_ievs[PROC_RUNNER], imsg->hdr.type, 215 iev->proc, imsg->hdr.pid, imsg->fd, imsg->data, 216 imsg->hdr.len - sizeof imsg->hdr); 217 } 218 219 static void 220 queue_sig_handler(int sig, short event, void *p) 221 { 222 switch (sig) { 223 case SIGINT: 224 case SIGTERM: 225 queue_shutdown(); 226 break; 227 default: 228 fatalx("queue_sig_handler: unexpected signal"); 229 } 230 } 231 232 static void 233 queue_shutdown(void) 234 { 235 log_info("queue handler exiting"); 236 _exit(0); 237 } 238 239 pid_t 240 queue(void) 241 { 242 pid_t pid; 243 struct passwd *pw; 244 245 struct event ev_sigint; 246 struct event ev_sigterm; 247 248 struct peer peers[] = { 249 { PROC_PARENT, imsg_dispatch }, 250 { PROC_CONTROL, imsg_dispatch }, 251 { PROC_SMTP, imsg_dispatch }, 252 { PROC_MDA, imsg_dispatch }, 253 { PROC_MTA, imsg_dispatch }, 254 { PROC_LKA, imsg_dispatch }, 255 { PROC_RUNNER, imsg_dispatch } 256 }; 257 258 switch (pid = fork()) { 259 case -1: 260 fatal("queue: cannot fork"); 261 case 0: 262 break; 263 default: 264 return (pid); 265 } 266 267 purge_config(PURGE_EVERYTHING); 268 269 pw = env->sc_pw; 270 271 if (chroot(PATH_SPOOL) == -1) 272 fatal("queue: chroot"); 273 if (chdir("/") == -1) 274 fatal("queue: chdir(\"/\")"); 275 276 smtpd_process = PROC_QUEUE; 277 setproctitle("%s", env->sc_title[smtpd_process]); 278 279 if (setgroups(1, &pw->pw_gid) || 280 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 281 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 282 fatal("queue: cannot drop privileges"); 283 284 imsg_callback = queue_imsg; 285 event_init(); 286 287 signal_set(&ev_sigint, SIGINT, queue_sig_handler, NULL); 288 signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, NULL); 289 signal_add(&ev_sigint, NULL); 290 signal_add(&ev_sigterm, NULL); 291 signal(SIGPIPE, SIG_IGN); 292 signal(SIGHUP, SIG_IGN); 293 294 /* 295 * queue opens fds for four purposes: smtp, mta, mda, and bounces. 296 * Therefore, use all available fd space and set the maxconn (=max 297 * session count for mta and mda) to a quarter of this value. 298 */ 299 fdlimit(1.0); 300 if ((env->sc_maxconn = availdesc() / 4) < 1) 301 fatalx("runner: fd starvation"); 302 303 config_pipes(peers, nitems(peers)); 304 config_peers(peers, nitems(peers)); 305 306 queue_purge(Q_INCOMING, PATH_INCOMING); 307 queue_purge(Q_ENQUEUE, PATH_ENQUEUE); 308 309 if (event_dispatch() < 0) 310 fatal("event_dispatch"); 311 queue_shutdown(); 312 313 return (0); 314 } 315 316 static void 317 queue_purge(enum queue_kind qkind, char *queuepath) 318 { 319 char path[MAXPATHLEN]; 320 struct qwalk *q; 321 322 q = qwalk_new(queuepath); 323 324 while (qwalk(q, path)) { 325 u_int32_t msgid; 326 327 if ((msgid = filename_to_msgid(basename(path))) == 0) { 328 log_warnx("queue_purge: invalid evpid"); 329 continue; 330 } 331 queue_message_purge(qkind, msgid); 332 } 333 334 qwalk_close(q); 335 } 336 337 void 338 queue_submit_envelope(struct envelope *ep) 339 { 340 imsg_compose_event(env->sc_ievs[PROC_QUEUE], 341 IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, 342 ep, sizeof(*ep)); 343 } 344 345 void 346 queue_commit_envelopes(struct envelope *ep) 347 { 348 imsg_compose_event(env->sc_ievs[PROC_QUEUE], 349 IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, 350 ep, sizeof(*ep)); 351 } 352