xref: /openbsd-src/usr.sbin/smtpd/scheduler.c (revision f2da64fbbbf1b03f09f390ab01267c93dfd77c4c)
1 /*	$OpenBSD: scheduler.c,v 1.55 2016/09/08 12:06:43 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
7  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
8  *
9  * Permission to use, copy, modify, and distribute this software for any
10  * purpose with or without fee is hereby granted, provided that the above
11  * copyright notice and this permission notice appear in all copies.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
14  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
16  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
19  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20  */
21 
22 #include <sys/types.h>
23 #include <sys/queue.h>
24 #include <sys/tree.h>
25 #include <sys/socket.h>
26 #include <sys/stat.h>
27 
28 #include <ctype.h>
29 #include <dirent.h>
30 #include <err.h>
31 #include <errno.h>
32 #include <event.h>
33 #include <imsg.h>
34 #include <inttypes.h>
35 #include <libgen.h>
36 #include <pwd.h>
37 #include <signal.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <time.h>
42 #include <unistd.h>
43 #include <limits.h>
44 
45 #include "smtpd.h"
46 #include "log.h"
47 
48 static void scheduler_imsg(struct mproc *, struct imsg *);
49 static void scheduler_shutdown(void);
50 static void scheduler_reset_events(void);
51 static void scheduler_timeout(int, short, void *);
52 
53 static struct scheduler_backend *backend = NULL;
54 static struct event		 ev;
55 static size_t			 ninflight = 0;
56 static int			*types;
57 static uint64_t			*evpids;
58 static uint32_t			*msgids;
59 static struct evpstate		*state;
60 
61 extern const char *backend_scheduler;
62 
63 void
64 scheduler_imsg(struct mproc *p, struct imsg *imsg)
65 {
66 	struct bounce_req_msg	 req;
67 	struct envelope		 evp;
68 	struct scheduler_info	 si;
69 	struct msg		 m;
70 	uint64_t		 evpid, id, holdq;
71 	uint32_t		 msgid;
72 	uint32_t       		 inflight;
73 	size_t			 n, i;
74 	time_t			 timestamp;
75 	int			 v, r, type;
76 
77 	if (imsg == NULL)
78 		scheduler_shutdown();
79 
80 	switch (imsg->hdr.type) {
81 
82 	case IMSG_QUEUE_ENVELOPE_SUBMIT:
83 		m_msg(&m, imsg);
84 		m_get_envelope(&m, &evp);
85 		m_end(&m);
86 		log_trace(TRACE_SCHEDULER,
87 		    "scheduler: inserting evp:%016" PRIx64, evp.id);
88 		scheduler_info(&si, &evp);
89 		stat_increment("scheduler.envelope.incoming", 1);
90 		backend->insert(&si);
91 		return;
92 
93 	case IMSG_QUEUE_MESSAGE_COMMIT:
94 		m_msg(&m, imsg);
95 		m_get_msgid(&m, &msgid);
96 		m_end(&m);
97 		log_trace(TRACE_SCHEDULER,
98 		    "scheduler: committing msg:%08" PRIx32, msgid);
99 		n = backend->commit(msgid);
100 		stat_decrement("scheduler.envelope.incoming", n);
101 		stat_increment("scheduler.envelope", n);
102 		scheduler_reset_events();
103 		return;
104 
105 	case IMSG_QUEUE_DISCOVER_EVPID:
106 		m_msg(&m, imsg);
107 		m_get_envelope(&m, &evp);
108 		m_end(&m);
109 		r = backend->query(evp.id);
110 		if (r) {
111 			log_debug("debug: scheduler: evp:%016" PRIx64
112 			    " already scheduled", evp.id);
113 			return;
114 		}
115 		log_trace(TRACE_SCHEDULER,
116 		    "scheduler: discovering evp:%016" PRIx64, evp.id);
117 		scheduler_info(&si, &evp);
118 		stat_increment("scheduler.envelope.incoming", 1);
119 		backend->insert(&si);
120 		return;
121 
122 	case IMSG_QUEUE_DISCOVER_MSGID:
123 		m_msg(&m, imsg);
124 		m_get_msgid(&m, &msgid);
125 		m_end(&m);
126 		r = backend->query(msgid);
127 		if (r) {
128 			log_debug("debug: scheduler: msgid:%08" PRIx32
129 			    " already scheduled", msgid);
130 			return;
131 		}
132 		log_trace(TRACE_SCHEDULER,
133 		    "scheduler: committing msg:%08" PRIx32, msgid);
134 		n = backend->commit(msgid);
135 		stat_decrement("scheduler.envelope.incoming", n);
136 		stat_increment("scheduler.envelope", n);
137 		scheduler_reset_events();
138 		return;
139 
140 	case IMSG_QUEUE_MESSAGE_ROLLBACK:
141 		m_msg(&m, imsg);
142 		m_get_msgid(&m, &msgid);
143 		m_end(&m);
144 		log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32,
145 		    msgid);
146 		n = backend->rollback(msgid);
147 		stat_decrement("scheduler.envelope.incoming", n);
148 		scheduler_reset_events();
149 		return;
150 
151 	case IMSG_QUEUE_ENVELOPE_REMOVE:
152 		m_msg(&m, imsg);
153 		m_get_evpid(&m, &evpid);
154 		m_get_u32(&m, &inflight);
155 		m_end(&m);
156 		log_trace(TRACE_SCHEDULER,
157 		    "scheduler: queue requested removal of evp:%016" PRIx64,
158 		    evpid);
159 		stat_decrement("scheduler.envelope", 1);
160 		if (!inflight)
161 			backend->remove(evpid);
162 		else {
163 			backend->delete(evpid);
164 			ninflight -= 1;
165 			stat_decrement("scheduler.envelope.inflight", 1);
166 		}
167 
168 		scheduler_reset_events();
169 		return;
170 
171 	case IMSG_QUEUE_ENVELOPE_ACK:
172 		m_msg(&m, imsg);
173 		m_get_evpid(&m, &evpid);
174 		m_end(&m);
175 		log_trace(TRACE_SCHEDULER,
176 		    "scheduler: queue ack removal of evp:%016" PRIx64,
177 		    evpid);
178 		ninflight -= 1;
179 		stat_decrement("scheduler.envelope.inflight", 1);
180 		scheduler_reset_events();
181 		return;
182 
183 	case IMSG_QUEUE_DELIVERY_OK:
184 		m_msg(&m, imsg);
185 		m_get_evpid(&m, &evpid);
186 		m_end(&m);
187 		log_trace(TRACE_SCHEDULER,
188 		    "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid);
189 		backend->delete(evpid);
190 		ninflight -= 1;
191 		stat_increment("scheduler.delivery.ok", 1);
192 		stat_decrement("scheduler.envelope.inflight", 1);
193 		stat_decrement("scheduler.envelope", 1);
194 		scheduler_reset_events();
195 		return;
196 
197 	case IMSG_QUEUE_DELIVERY_TEMPFAIL:
198 		m_msg(&m, imsg);
199 		m_get_envelope(&m, &evp);
200 		m_end(&m);
201 		log_trace(TRACE_SCHEDULER,
202 		    "scheduler: updating evp:%016" PRIx64, evp.id);
203 		scheduler_info(&si, &evp);
204 		backend->update(&si);
205 		ninflight -= 1;
206 		stat_increment("scheduler.delivery.tempfail", 1);
207 		stat_decrement("scheduler.envelope.inflight", 1);
208 
209 		for (i = 0; i < MAX_BOUNCE_WARN; i++) {
210 			if (env->sc_bounce_warn[i] == 0)
211 				break;
212 			timestamp = si.creation + env->sc_bounce_warn[i];
213 			if (si.nexttry >= timestamp &&
214 			    si.lastbounce < timestamp) {
215 	    			req.evpid = evp.id;
216 				req.timestamp = timestamp;
217 				req.bounce.type = B_WARNING;
218 				req.bounce.delay = env->sc_bounce_warn[i];
219 				req.bounce.expire = si.expire;
220 				m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1,
221 				    &req, sizeof req);
222 				break;
223 			}
224 		}
225 		scheduler_reset_events();
226 		return;
227 
228 	case IMSG_QUEUE_DELIVERY_PERMFAIL:
229 		m_msg(&m, imsg);
230 		m_get_evpid(&m, &evpid);
231 		m_end(&m);
232 		log_trace(TRACE_SCHEDULER,
233 		    "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid);
234 		backend->delete(evpid);
235 		ninflight -= 1;
236 		stat_increment("scheduler.delivery.permfail", 1);
237 		stat_decrement("scheduler.envelope.inflight", 1);
238 		stat_decrement("scheduler.envelope", 1);
239 		scheduler_reset_events();
240 		return;
241 
242 	case IMSG_QUEUE_DELIVERY_LOOP:
243 		m_msg(&m, imsg);
244 		m_get_evpid(&m, &evpid);
245 		m_end(&m);
246 		log_trace(TRACE_SCHEDULER,
247 		    "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid);
248 		backend->delete(evpid);
249 		ninflight -= 1;
250 		stat_increment("scheduler.delivery.loop", 1);
251 		stat_decrement("scheduler.envelope.inflight", 1);
252 		stat_decrement("scheduler.envelope", 1);
253 		scheduler_reset_events();
254 		return;
255 
256 	case IMSG_QUEUE_HOLDQ_HOLD:
257 		m_msg(&m, imsg);
258 		m_get_evpid(&m, &evpid);
259 		m_get_id(&m, &holdq);
260 		m_end(&m);
261 		log_trace(TRACE_SCHEDULER,
262 		    "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64,
263 		    evpid, holdq);
264 		backend->hold(evpid, holdq);
265 		ninflight -= 1;
266 		stat_decrement("scheduler.envelope.inflight", 1);
267 		scheduler_reset_events();
268 		return;
269 
270 	case IMSG_QUEUE_HOLDQ_RELEASE:
271 		m_msg(&m, imsg);
272 		m_get_int(&m, &type);
273 		m_get_id(&m, &holdq);
274 		m_get_int(&m, &r);
275 		m_end(&m);
276 		log_trace(TRACE_SCHEDULER,
277 		    "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")",
278 		    r, type, holdq);
279 		backend->release(type, holdq, r);
280 		scheduler_reset_events();
281 		return;
282 
283 	case IMSG_CTL_PAUSE_MDA:
284 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mda");
285 		env->sc_flags |= SMTPD_MDA_PAUSED;
286 		return;
287 
288 	case IMSG_CTL_RESUME_MDA:
289 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mda");
290 		env->sc_flags &= ~SMTPD_MDA_PAUSED;
291 		scheduler_reset_events();
292 		return;
293 
294 	case IMSG_CTL_PAUSE_MTA:
295 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mta");
296 		env->sc_flags |= SMTPD_MTA_PAUSED;
297 		return;
298 
299 	case IMSG_CTL_RESUME_MTA:
300 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mta");
301 		env->sc_flags &= ~SMTPD_MTA_PAUSED;
302 		scheduler_reset_events();
303 		return;
304 
305 	case IMSG_CTL_VERBOSE:
306 		m_msg(&m, imsg);
307 		m_get_int(&m, &v);
308 		m_end(&m);
309 		log_verbose(v);
310 		return;
311 
312 	case IMSG_CTL_PROFILE:
313 		m_msg(&m, imsg);
314 		m_get_int(&m, &v);
315 		m_end(&m);
316 		profiling = v;
317 		return;
318 
319 	case IMSG_CTL_LIST_MESSAGES:
320 		msgid = *(uint32_t *)(imsg->data);
321 		n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size);
322 		m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1,
323 		    msgids, n * sizeof (*msgids));
324 		return;
325 
326 	case IMSG_CTL_LIST_ENVELOPES:
327 		id = *(uint64_t *)(imsg->data);
328 		n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size);
329 		for (i = 0; i < n; i++) {
330 			m_create(p_queue, IMSG_CTL_LIST_ENVELOPES,
331 			    imsg->hdr.peerid, 0, -1);
332 			m_add_evpid(p_queue, state[i].evpid);
333 			m_add_int(p_queue, state[i].flags);
334 			m_add_time(p_queue, state[i].time);
335 			m_close(p_queue);
336 		}
337 		m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES,
338 		    imsg->hdr.peerid, 0, -1, NULL, 0);
339 		return;
340 
341 	case IMSG_CTL_SCHEDULE:
342 		id = *(uint64_t *)(imsg->data);
343 		if (id <= 0xffffffffL)
344 			log_debug("debug: scheduler: "
345 			    "scheduling msg:%08" PRIx64, id);
346 		else
347 			log_debug("debug: scheduler: "
348 			    "scheduling evp:%016" PRIx64, id);
349 		r = backend->schedule(id);
350 		scheduler_reset_events();
351 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
352 		    0, -1, NULL, 0);
353 		return;
354 
355 	case IMSG_QUEUE_ENVELOPE_SCHEDULE:
356 		id = *(uint64_t *)(imsg->data);
357 		backend->schedule(id);
358 		scheduler_reset_events();
359 		return;
360 
361 	case IMSG_CTL_REMOVE:
362 		id = *(uint64_t *)(imsg->data);
363 		if (id <= 0xffffffffL)
364 			log_debug("debug: scheduler: "
365 			    "removing msg:%08" PRIx64, id);
366 		else
367 			log_debug("debug: scheduler: "
368 			    "removing evp:%016" PRIx64, id);
369 		r = backend->remove(id);
370 		scheduler_reset_events();
371 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
372 		    0, -1, NULL, 0);
373 		return;
374 
375 	case IMSG_CTL_PAUSE_EVP:
376 		id = *(uint64_t *)(imsg->data);
377 		if (id <= 0xffffffffL)
378 			log_debug("debug: scheduler: "
379 			    "suspending msg:%08" PRIx64, id);
380 		else
381 			log_debug("debug: scheduler: "
382 			    "suspending evp:%016" PRIx64, id);
383 		r = backend->suspend(id);
384 		scheduler_reset_events();
385 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
386 		    0, -1, NULL, 0);
387 		return;
388 
389 	case IMSG_CTL_RESUME_EVP:
390 		id = *(uint64_t *)(imsg->data);
391 		if (id <= 0xffffffffL)
392 			log_debug("debug: scheduler: "
393 			    "resuming msg:%08" PRIx64, id);
394 		else
395 			log_debug("debug: scheduler: "
396 			    "resuming evp:%016" PRIx64, id);
397 		r = backend->resume(id);
398 		scheduler_reset_events();
399 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
400 		    0, -1, NULL, 0);
401 		return;
402 	}
403 
404 	errx(1, "scheduler_imsg: unexpected %s imsg",
405 	    imsg_to_str(imsg->hdr.type));
406 }
407 
408 static void
409 scheduler_shutdown(void)
410 {
411 	log_debug("debug: scheduler agent exiting");
412 	_exit(0);
413 }
414 
415 static void
416 scheduler_reset_events(void)
417 {
418 	struct timeval	 tv;
419 
420 	evtimer_del(&ev);
421 	tv.tv_sec = 0;
422 	tv.tv_usec = 0;
423 	evtimer_add(&ev, &tv);
424 }
425 
426 int
427 scheduler(void)
428 {
429 	struct passwd	*pw;
430 
431 	backend = scheduler_backend_lookup(backend_scheduler);
432 	if (backend == NULL)
433 		errx(1, "cannot find scheduler backend \"%s\"",
434 		    backend_scheduler);
435 
436 	purge_config(PURGE_EVERYTHING);
437 
438 	if ((pw = getpwnam(SMTPD_USER)) == NULL)
439 		fatalx("unknown user " SMTPD_USER);
440 
441 	config_process(PROC_SCHEDULER);
442 
443 	backend->init(backend_scheduler);
444 
445 	if (chroot(PATH_CHROOT) == -1)
446 		fatal("scheduler: chroot");
447 	if (chdir("/") == -1)
448 		fatal("scheduler: chdir(\"/\")");
449 
450 	if (setgroups(1, &pw->pw_gid) ||
451 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
452 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
453 		fatal("scheduler: cannot drop privileges");
454 
455 	evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids, "scheduler: init evpids");
456 	types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types, "scheduler: init types");
457 	msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids, "scheduler: list msg");
458 	state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state, "scheduler: list evp");
459 
460 	imsg_callback = scheduler_imsg;
461 	event_init();
462 
463 	signal(SIGINT, SIG_IGN);
464 	signal(SIGTERM, SIG_IGN);
465 	signal(SIGPIPE, SIG_IGN);
466 	signal(SIGHUP, SIG_IGN);
467 
468 	config_peer(PROC_CONTROL);
469 	config_peer(PROC_QUEUE);
470 
471 	evtimer_set(&ev, scheduler_timeout, NULL);
472 	scheduler_reset_events();
473 
474 	if (pledge("stdio", NULL) == -1)
475 		err(1, "pledge");
476 
477 	event_dispatch();
478 	fatalx("exited event loop");
479 
480 	return (0);
481 }
482 
483 static void
484 scheduler_timeout(int fd, short event, void *p)
485 {
486 	struct timeval		tv;
487 	size_t			i;
488 	size_t			d_inflight;
489 	size_t			d_envelope;
490 	size_t			d_removed;
491 	size_t			d_expired;
492 	size_t			d_updated;
493 	size_t			count;
494 	int			mask, r, delay;
495 
496 	tv.tv_sec = 0;
497 	tv.tv_usec = 0;
498 
499 	mask = SCHED_UPDATE;
500 
501 	if (ninflight <  env->sc_scheduler_max_inflight) {
502 		mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE;
503 		if (!(env->sc_flags & SMTPD_MDA_PAUSED))
504 			mask |= SCHED_MDA;
505 		if (!(env->sc_flags & SMTPD_MTA_PAUSED))
506 			mask |= SCHED_MTA;
507 	}
508 
509 	count = env->sc_scheduler_max_schedule;
510 
511 	log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count);
512 
513 	r = backend->batch(mask, &delay, &count, evpids, types);
514 
515 	log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count);
516 
517 	if (r < 0)
518 		fatalx("scheduler: error in batch handler");
519 
520 	if (r == 0) {
521 
522 		if (delay < -1)
523 			fatalx("scheduler: invalid delay %d", delay);
524 
525 		if (delay == -1) {
526 			log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
527 			return;
528 		}
529 
530 		tv.tv_sec = delay;
531 		tv.tv_usec = 0;
532 		log_trace(TRACE_SCHEDULER,
533 		    "scheduler: waiting for %s", duration_to_text(tv.tv_sec));
534 		evtimer_add(&ev, &tv);
535 		return;
536 	}
537 
538 	d_inflight = 0;
539 	d_envelope = 0;
540 	d_removed = 0;
541 	d_expired = 0;
542 	d_updated = 0;
543 
544 	for (i = 0; i < count; i++) {
545 		switch(types[i]) {
546 		case SCHED_REMOVE:
547 			log_debug("debug: scheduler: evp:%016" PRIx64
548 			    " removed", evpids[i]);
549 			m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1);
550 			m_add_evpid(p_queue, evpids[i]);
551 			m_close(p_queue);
552 			d_envelope += 1;
553 			d_removed += 1;
554 			d_inflight += 1;
555 			break;
556 
557 		case SCHED_EXPIRE:
558 			log_debug("debug: scheduler: evp:%016" PRIx64
559 			    " expired", evpids[i]);
560 			m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1);
561 			m_add_evpid(p_queue, evpids[i]);
562 			m_close(p_queue);
563 			d_envelope += 1;
564 			d_expired += 1;
565 			d_inflight += 1;
566 			break;
567 
568 		case SCHED_UPDATE:
569 			log_debug("debug: scheduler: evp:%016" PRIx64
570 			    " scheduled (update)", evpids[i]);
571 			d_updated += 1;
572 			break;
573 
574 		case SCHED_BOUNCE:
575 			log_debug("debug: scheduler: evp:%016" PRIx64
576 			    " scheduled (bounce)", evpids[i]);
577 			m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1);
578 			m_add_evpid(p_queue, evpids[i]);
579 			m_close(p_queue);
580 			d_inflight += 1;
581 			break;
582 
583 		case SCHED_MDA:
584 			log_debug("debug: scheduler: evp:%016" PRIx64
585 			    " scheduled (mda)", evpids[i]);
586 			m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1);
587 			m_add_evpid(p_queue, evpids[i]);
588 			m_close(p_queue);
589 			d_inflight += 1;
590 			break;
591 
592 		case SCHED_MTA:
593 			log_debug("debug: scheduler: evp:%016" PRIx64
594 			    " scheduled (mta)", evpids[i]);
595 			m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1);
596 			m_add_evpid(p_queue, evpids[i]);
597 			m_close(p_queue);
598 			d_inflight += 1;
599 			break;
600 		}
601 	}
602 
603 	stat_decrement("scheduler.envelope", d_envelope);
604 	stat_increment("scheduler.envelope.inflight", d_inflight);
605 	stat_increment("scheduler.envelope.expired", d_expired);
606 	stat_increment("scheduler.envelope.removed", d_removed);
607 	stat_increment("scheduler.envelope.updated", d_updated);
608 
609 	ninflight += d_inflight;
610 
611 	tv.tv_sec = 0;
612 	tv.tv_usec = 0;
613 	evtimer_add(&ev, &tv);
614 }
615