xref: /openbsd-src/usr.sbin/smtpd/scheduler.c (revision 4c1e55dc91edd6e69ccc60ce855900fbc12cf34f)
1 /*	$OpenBSD: scheduler.c,v 1.6 2012/07/10 11:13:40 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
7  *
8  * Permission to use, copy, modify, and distribute this software for any
9  * purpose with or without fee is hereby granted, provided that the above
10  * copyright notice and this permission notice appear in all copies.
11  *
12  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
13  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
14  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
15  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
16  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
17  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
18  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
19  */
20 
21 #include <sys/types.h>
22 #include <sys/queue.h>
23 #include <sys/tree.h>
24 #include <sys/param.h>
25 #include <sys/socket.h>
26 #include <sys/stat.h>
27 
28 #include <ctype.h>
29 #include <dirent.h>
30 #include <err.h>
31 #include <errno.h>
32 #include <event.h>
33 #include <imsg.h>
34 #include <inttypes.h>
35 #include <libgen.h>
36 #include <pwd.h>
37 #include <signal.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <time.h>
42 #include <unistd.h>
43 
44 #include "smtpd.h"
45 #include "log.h"
46 
47 static void scheduler_imsg(struct imsgev *, struct imsg *);
48 static void scheduler_shutdown(void);
49 static void scheduler_sig_handler(int, short, void *);
50 static void scheduler_setup_events(void);
51 static void scheduler_reset_events(void);
52 static void scheduler_disable_events(void);
53 static void scheduler_timeout(int, short, void *);
54 static void scheduler_remove(u_int64_t);
55 static void scheduler_remove_envelope(u_int64_t);
56 static int scheduler_process_envelope(u_int64_t);
57 static int scheduler_process_batch(enum delivery_type, u_int64_t);
58 static int scheduler_check_loop(struct envelope *);
59 static int scheduler_load_message(u_int32_t);
60 
61 static struct scheduler_backend *backend = NULL;
62 
63 extern const char *backend_scheduler;
64 
65 void
66 scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
67 {
68 	struct envelope	*e, bounce;
69 	struct scheduler_info	si;
70 
71 	log_imsg(PROC_SCHEDULER, iev->proc, imsg);
72 
73 	switch (imsg->hdr.type) {
74 	case IMSG_QUEUE_COMMIT_MESSAGE:
75 		e = imsg->data;
76 		log_trace(TRACE_SCHEDULER,
77 		    "scheduler: IMSG_QUEUE_COMMIT_MESSAGE: %016"PRIx64, e->id);
78 		scheduler_load_message(evpid_to_msgid(e->id));
79 		scheduler_reset_events();
80 		return;
81 
82 	case IMSG_QUEUE_DELIVERY_OK:
83 		stat_decrement(STATS_SCHEDULER);
84 		e = imsg->data;
85 		log_trace(TRACE_SCHEDULER,
86 		    "scheduler: IMSG_QUEUE_DELIVERY_OK: %016"PRIx64, e->id);
87 		backend->remove(e->id);
88 		queue_envelope_delete(e);
89 		return;
90 
91 	case IMSG_QUEUE_DELIVERY_TEMPFAIL:
92 		stat_decrement(STATS_SCHEDULER);
93 		e = imsg->data;
94 		log_trace(TRACE_SCHEDULER,
95 		    "scheduler: IMSG_QUEUE_DELIVERY_TEMPFAIL: %016"PRIx64, e->id);
96 		e->retry++;
97 		queue_envelope_update(e);
98 		scheduler_info(&si, e);
99 		backend->insert(&si);
100 		scheduler_reset_events();
101 		return;
102 
103 	case IMSG_QUEUE_DELIVERY_PERMFAIL:
104 		stat_decrement(STATS_SCHEDULER);
105 		e = imsg->data;
106 		log_trace(TRACE_SCHEDULER,
107 		    "scheduler: IMSG_QUEUE_DELIVERY_PERMFAIL: %016"PRIx64, e->id);
108 		if (e->type != D_BOUNCE && e->sender.user[0] != '\0') {
109 			bounce_record_message(e, &bounce);
110 			scheduler_info(&si, &bounce);
111 			backend->insert(&si);
112 			scheduler_reset_events();
113 		}
114 		backend->remove(e->id);
115 		queue_envelope_delete(e);
116 		return;
117 
118 	case IMSG_MDA_SESS_NEW:
119 		log_trace(TRACE_SCHEDULER, "scheduler: IMSG_MDA_SESS_NEW");
120 		stat_decrement(STATS_MDA_SESSION);
121 		if (env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE))
122 			env->sc_flags &= ~SMTPD_MDA_BUSY;
123 		scheduler_reset_events();
124 		return;
125 
126 	case IMSG_BATCH_DONE:
127 		log_trace(TRACE_SCHEDULER, "scheduler: IMSG_BATCH_DONE");
128 		stat_decrement(STATS_MTA_SESSION);
129 		if (env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE))
130 			env->sc_flags &= ~SMTPD_MTA_BUSY;
131 		scheduler_reset_events();
132 		return;
133 
134 	case IMSG_SMTP_ENQUEUE:
135 		e = imsg->data;
136 		log_trace(TRACE_SCHEDULER,
137 		    "scheduler: IMSG_SMTP_ENQUEUE: %016"PRIx64, e->id);
138 		if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) {
139 			queue_envelope_update(e);
140 			scheduler_info(&si, e);
141 			backend->insert(&si);
142 			scheduler_reset_events();
143 			return;
144 		}
145 		return;
146 
147 	case IMSG_QUEUE_PAUSE_MDA:
148 		log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_PAUSE_MDA");
149 		env->sc_flags |= SMTPD_MDA_PAUSED;
150 		return;
151 
152 	case IMSG_QUEUE_RESUME_MDA:
153 		log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_RESUME_MDA");
154 		env->sc_flags &= ~SMTPD_MDA_PAUSED;
155 		scheduler_reset_events();
156 		return;
157 
158 	case IMSG_QUEUE_PAUSE_MTA:
159 		log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_PAUSE_MTA");
160 		env->sc_flags |= SMTPD_MTA_PAUSED;
161 		return;
162 
163 	case IMSG_QUEUE_RESUME_MTA:
164 		log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_RESUME_MTA");
165 		env->sc_flags &= ~SMTPD_MTA_PAUSED;
166 		scheduler_reset_events();
167 		return;
168 
169 	case IMSG_CTL_VERBOSE:
170 		log_trace(TRACE_SCHEDULER, "scheduler: IMSG_CTL_VERBOSE");
171 		log_verbose(*(int *)imsg->data);
172 		return;
173 
174 	case IMSG_SCHEDULER_SCHEDULE:
175 		log_trace(TRACE_SCHEDULER,
176 		    "scheduler: IMSG_SCHEDULER_SCHEDULE: %016"PRIx64,
177 		    *(u_int64_t *)imsg->data);
178 		backend->force(*(u_int64_t *)imsg->data);
179 		scheduler_reset_events();
180 		return;
181 
182 	case IMSG_SCHEDULER_REMOVE:
183 		log_trace(TRACE_SCHEDULER,
184 		    "scheduler: IMSG_SCHEDULER_REMOVE: %016"PRIx64,
185 		    *(u_int64_t *)imsg->data);
186 		scheduler_remove(*(u_int64_t *)imsg->data);
187 		scheduler_reset_events();
188 		return;
189 
190 	}
191 
192 	errx(1, "scheduler_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
193 }
194 
195 void
196 scheduler_sig_handler(int sig, short event, void *p)
197 {
198 	switch (sig) {
199 	case SIGINT:
200 	case SIGTERM:
201 		scheduler_shutdown();
202 		break;
203 	default:
204 		fatalx("scheduler_sig_handler: unexpected signal");
205 	}
206 }
207 
208 void
209 scheduler_shutdown(void)
210 {
211 	log_info("scheduler handler exiting");
212 	_exit(0);
213 }
214 
215 void
216 scheduler_setup_events(void)
217 {
218 	struct timeval	 tv;
219 
220 	evtimer_set(&env->sc_ev, scheduler_timeout, NULL);
221 	tv.tv_sec = 0;
222 	tv.tv_usec = 10;
223 	evtimer_add(&env->sc_ev, &tv);
224 }
225 
226 void
227 scheduler_reset_events(void)
228 {
229 	struct timeval	 tv;
230 
231 	tv.tv_sec = 0;
232 	tv.tv_usec = 10;
233 	evtimer_add(&env->sc_ev, &tv);
234 }
235 
236 void
237 scheduler_disable_events(void)
238 {
239 	evtimer_del(&env->sc_ev);
240 }
241 
242 pid_t
243 scheduler(void)
244 {
245 	pid_t		 pid;
246 	struct passwd	*pw;
247 
248 	struct event	 ev_sigint;
249 	struct event	 ev_sigterm;
250 
251 	struct peer peers[] = {
252 		{ PROC_CONTROL,	imsg_dispatch },
253 		{ PROC_QUEUE,	imsg_dispatch }
254 	};
255 
256 	switch (pid = fork()) {
257 	case -1:
258 		fatal("scheduler: cannot fork");
259 	case 0:
260 		break;
261 	default:
262 		return (pid);
263 	}
264 
265 	purge_config(PURGE_EVERYTHING);
266 
267 	pw = env->sc_pw;
268 
269 	if (chroot(PATH_SPOOL) == -1)
270 		fatal("scheduler: chroot");
271 	if (chdir("/") == -1)
272 		fatal("scheduler: chdir(\"/\")");
273 
274 	smtpd_process = PROC_SCHEDULER;
275 	setproctitle("%s", env->sc_title[smtpd_process]);
276 
277 	if (setgroups(1, &pw->pw_gid) ||
278 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
279 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
280 		fatal("scheduler: cannot drop privileges");
281 
282 	/* see fdlimit()-related comment in queue.c */
283 	fdlimit(1.0);
284 	if ((env->sc_maxconn = availdesc() / 4) < 1)
285 		fatalx("scheduler: fd starvation");
286 
287 	env->sc_scheduler = scheduler_backend_lookup(backend_scheduler);
288 	if (env->sc_scheduler == NULL)
289 		errx(1, "cannot find scheduler backend \"%s\"", backend_scheduler);
290 	backend = env->sc_scheduler;
291 
292 	backend->init();
293 
294 	imsg_callback = scheduler_imsg;
295 	event_init();
296 
297 	signal_set(&ev_sigint, SIGINT, scheduler_sig_handler, NULL);
298 	signal_set(&ev_sigterm, SIGTERM, scheduler_sig_handler, NULL);
299 	signal_add(&ev_sigint, NULL);
300 	signal_add(&ev_sigterm, NULL);
301 	signal(SIGPIPE, SIG_IGN);
302 	signal(SIGHUP, SIG_IGN);
303 
304 	config_pipes(peers, nitems(peers));
305 	config_peers(peers, nitems(peers));
306 
307 	scheduler_setup_events();
308 	event_dispatch();
309 	scheduler_disable_events();
310 	scheduler_shutdown();
311 
312 	return (0);
313 }
314 
315 void
316 scheduler_timeout(int fd, short event, void *p)
317 {
318 	time_t		nsched;
319 	time_t		curtm;
320 	u_int64_t	evpid;
321 	static int	setup = 0;
322 	int		delay = 0;
323 	struct timeval	tv;
324 
325 	log_trace(TRACE_SCHEDULER, "scheduler: entering scheduler_timeout");
326 
327 	/* if we're not done setting up the scheduler, do it some more */
328 	if (! setup)
329 		setup = backend->setup();
330 
331 	/* we don't have a schedulable envelope ... sleep */
332 	if (! backend->next(&evpid, &nsched))
333 		goto scheduler_sleep;
334 
335 	/* is the envelope schedulable right away ? */
336 	curtm = time(NULL);
337 	if (nsched <= curtm) {
338 		/* yup */
339 		scheduler_process_envelope(evpid);
340 	}
341 	else {
342 		/* nope, so we can either keep the timeout delay to 0 if we
343 		 * are not done setting up the scheduler, or sleep until it
344 		 * is time to schedule that envelope otherwise.
345 		 */
346 		if (setup)
347 			delay = nsched - curtm;
348 	}
349 
350 	if (delay)
351 		log_info("scheduler: pausing for %d seconds", delay);
352 	tv.tv_sec = delay;
353 	tv.tv_usec = 0;
354 	evtimer_add(&env->sc_ev, &tv);
355 	return;
356 
357 scheduler_sleep:
358 	log_info("scheduler: sleeping");
359 	return;
360 }
361 
362 static int
363 scheduler_process_envelope(u_int64_t evpid)
364 {
365 	struct envelope	 envelope;
366 	size_t		 mta_av, mda_av, bnc_av;
367 	struct scheduler_info	si;
368 
369 	mta_av = env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE);
370 	mda_av = env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE);
371 	bnc_av = env->sc_maxconn - stat_get(STATS_SCHEDULER_BOUNCES, STAT_ACTIVE);
372 
373 	if (! queue_envelope_load(evpid, &envelope))
374 		return 0;
375 
376 	if (envelope.type == D_MDA)
377 		if (mda_av == 0) {
378 			env->sc_flags |= SMTPD_MDA_BUSY;
379 			return 0;
380 		}
381 
382 	if (envelope.type == D_MTA)
383 		if (mta_av == 0) {
384 			env->sc_flags |= SMTPD_MTA_BUSY;
385 			return 0;
386 		}
387 
388 	if (envelope.type == D_BOUNCE)
389 		if (bnc_av == 0) {
390 			env->sc_flags |= SMTPD_BOUNCE_BUSY;
391 			return 0;
392 		}
393 
394 	if (scheduler_check_loop(&envelope)) {
395 		struct envelope bounce;
396 
397 		envelope_set_errormsg(&envelope, "loop has been detected");
398 		if (bounce_record_message(&envelope, &bounce)) {
399 			scheduler_info(&si, &bounce);
400 			backend->insert(&si);
401 		}
402 		backend->remove(evpid);
403 		queue_envelope_delete(&envelope);
404 
405 		scheduler_reset_events();
406 
407 		return 0;
408 	}
409 
410 
411 	return scheduler_process_batch(envelope.type, evpid);
412 }
413 
414 static int
415 scheduler_process_batch(enum delivery_type type, u_int64_t evpid)
416 {
417 	struct envelope evp;
418 	void *batch;
419 	int fd;
420 
421 	batch = backend->batch(evpid);
422 	switch (type) {
423 	case D_BOUNCE:
424 		while (backend->fetch(batch, &evpid)) {
425 			if (! queue_envelope_load(evpid, &evp))
426 				goto end;
427 
428 			evp.lasttry = time(NULL);
429 			imsg_compose_event(env->sc_ievs[PROC_QUEUE],
430 			    IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp,
431 			    sizeof evp);
432 			backend->schedule(evpid);
433 		}
434 		stat_increment(STATS_SCHEDULER);
435 		stat_increment(STATS_SCHEDULER_BOUNCES);
436 		break;
437 
438 	case D_MDA:
439 		backend->fetch(batch, &evpid);
440 		if (! queue_envelope_load(evpid, &evp))
441 			goto end;
442 
443 		evp.lasttry = time(NULL);
444 		fd = queue_message_fd_r(evpid_to_msgid(evpid));
445 		imsg_compose_event(env->sc_ievs[PROC_QUEUE],
446 		    IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp,
447 		    sizeof evp);
448 		backend->schedule(evpid);
449 
450 		stat_increment(STATS_SCHEDULER);
451 		stat_increment(STATS_MDA_SESSION);
452 		break;
453 
454 	case D_MTA: {
455 		struct mta_batch mta_batch;
456 
457 		/* FIXME */
458 		if (! backend->fetch(batch, &evpid))
459 			goto end;
460 		if (! queue_envelope_load(evpid, &evp))
461 			goto end;
462 
463 		bzero(&mta_batch, sizeof mta_batch);
464 		mta_batch.id    = arc4random();
465 		mta_batch.relay = evp.agent.mta.relay;
466 
467 		imsg_compose_event(env->sc_ievs[PROC_QUEUE],
468 		    IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch,
469 		    sizeof mta_batch);
470 
471 		while (backend->fetch(batch, &evpid)) {
472 			if (! queue_envelope_load(evpid, &evp))
473 				goto end;
474 			evp.lasttry = time(NULL); /* FIXME */
475 			evp.batch_id = mta_batch.id;
476 
477 			imsg_compose_event(env->sc_ievs[PROC_QUEUE],
478 			    IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &evp,
479 			    sizeof evp);
480 
481 			backend->schedule(evpid);
482 			stat_increment(STATS_SCHEDULER);
483 		}
484 
485 		imsg_compose_event(env->sc_ievs[PROC_QUEUE],
486 		    IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, &mta_batch,
487 		    sizeof mta_batch);
488 
489 		stat_increment(STATS_MTA_SESSION);
490 		break;
491 	}
492 
493 	default:
494 		fatalx("scheduler_process_batchqueue: unknown type");
495 	}
496 
497 end:
498 	backend->close(batch);
499 	return 1;
500 }
501 
502 static int
503 scheduler_load_message(u_int32_t msgid)
504 {
505 	struct qwalk	*q;
506 	u_int64_t	 evpid;
507 	struct envelope	 envelope;
508 	struct scheduler_info	si;
509 
510 	q = qwalk_new(msgid);
511 	while (qwalk(q, &evpid)) {
512 		if (! queue_envelope_load(evpid, &envelope))
513 			continue;
514 		scheduler_info(&si, &envelope);
515 		backend->insert(&si);
516 	}
517  	qwalk_close(q);
518 
519 	return 1;
520 }
521 
522 static int
523 scheduler_check_loop(struct envelope *ep)
524 {
525 	int fd;
526 	FILE *fp;
527 	char *buf, *lbuf;
528 	size_t len;
529 	struct mailaddr maddr;
530 	int ret = 0;
531 	int rcvcount = 0;
532 
533 	fd = queue_message_fd_r(evpid_to_msgid(ep->id));
534 	if ((fp = fdopen(fd, "r")) == NULL)
535 		fatal("fdopen");
536 
537 	lbuf = NULL;
538 	while ((buf = fgetln(fp, &len))) {
539 		if (buf[len - 1] == '\n')
540 			buf[len - 1] = '\0';
541 		else {
542 			/* EOF without EOL, copy and add the NUL */
543 			if ((lbuf = malloc(len + 1)) == NULL)
544 				err(1, NULL);
545 			memcpy(lbuf, buf, len);
546 			lbuf[len] = '\0';
547 			buf = lbuf;
548 		}
549 
550 		if (strchr(buf, ':') == NULL && !isspace((int)*buf))
551 			break;
552 
553 		if (strncasecmp("Received: ", buf, 10) == 0) {
554 			rcvcount++;
555 			if (rcvcount == MAX_HOPS_COUNT) {
556 				ret = 1;
557 				break;
558 			}
559 		}
560 
561 		else if (strncasecmp("Delivered-To: ", buf, 14) == 0) {
562 			struct mailaddr dest;
563 
564 			bzero(&maddr, sizeof (struct mailaddr));
565 			if (! email_to_mailaddr(&maddr, buf + 14))
566 				continue;
567 
568 			dest = ep->dest;
569 			if (ep->type == D_BOUNCE)
570 				dest = ep->sender;
571 
572 			if (strcasecmp(maddr.user, dest.user) == 0 &&
573 			    strcasecmp(maddr.domain, dest.domain) == 0) {
574 				ret = 1;
575 				break;
576 			}
577 		}
578 	}
579 	free(lbuf);
580 
581 	fclose(fp);
582 	return ret;
583 }
584 
585 static void
586 scheduler_remove(u_int64_t id)
587 {
588 	void	*msg;
589 
590 	/* removing by evpid */
591 	if (id > 0xffffffffL) {
592 		scheduler_remove_envelope(id);
593 		return;
594 	}
595 
596 	/* removing by msgid */
597 	msg = backend->message(id);
598 	while (backend->fetch(msg, &id))
599 		scheduler_remove_envelope(id);
600 	backend->close(msg);
601 }
602 
603 static void
604 scheduler_remove_envelope(u_int64_t evpid)
605 {
606 	struct envelope evp;
607 
608 	evp.id = evpid;
609 	queue_envelope_delete(&evp);
610 	backend->remove(evpid);
611 }
612