xref: /netbsd-src/external/bsd/ntp/dist/sntp/libevent/bufferevent_ratelim.c (revision 6a493d6bc668897c91594964a732d38505b70cbb)
1 /*	$NetBSD: bufferevent_ratelim.c,v 1.1.1.1 2013/12/27 23:31:16 christos Exp $	*/
2 
3 /*
4  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
5  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. The name of the author may not be used to endorse or promote products
17  *    derived from this software without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 #include "evconfig-private.h"
31 
32 #include <sys/types.h>
33 #include <limits.h>
34 #include <string.h>
35 #include <stdlib.h>
36 
37 #include "event2/event.h"
38 #include "event2/event_struct.h"
39 #include "event2/util.h"
40 #include "event2/bufferevent.h"
41 #include "event2/bufferevent_struct.h"
42 #include "event2/buffer.h"
43 
44 #include "ratelim-internal.h"
45 
46 #include "bufferevent-internal.h"
47 #include "mm-internal.h"
48 #include "util-internal.h"
49 #include "event-internal.h"
50 
51 int
52 ev_token_bucket_init_(struct ev_token_bucket *bucket,
53     const struct ev_token_bucket_cfg *cfg,
54     ev_uint32_t current_tick,
55     int reinitialize)
56 {
57 	if (reinitialize) {
58 		/* on reinitialization, we only clip downwards, since we've
59 		   already used who-knows-how-much bandwidth this tick.  We
60 		   leave "last_updated" as it is; the next update will add the
61 		   appropriate amount of bandwidth to the bucket.
62 		*/
63 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
64 			bucket->read_limit = cfg->read_maximum;
65 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
66 			bucket->write_limit = cfg->write_maximum;
67 	} else {
68 		bucket->read_limit = cfg->read_rate;
69 		bucket->write_limit = cfg->write_rate;
70 		bucket->last_updated = current_tick;
71 	}
72 	return 0;
73 }
74 
75 int
76 ev_token_bucket_update_(struct ev_token_bucket *bucket,
77     const struct ev_token_bucket_cfg *cfg,
78     ev_uint32_t current_tick)
79 {
80 	/* It's okay if the tick number overflows, since we'll just
81 	 * wrap around when we do the unsigned substraction. */
82 	unsigned n_ticks = current_tick - bucket->last_updated;
83 
84 	/* Make sure some ticks actually happened, and that time didn't
85 	 * roll back. */
86 	if (n_ticks == 0 || n_ticks > INT_MAX)
87 		return 0;
88 
89 	/* Naively, we would say
90 		bucket->limit += n_ticks * cfg->rate;
91 
92 		if (bucket->limit > cfg->maximum)
93 			bucket->limit = cfg->maximum;
94 
95 	   But we're worried about overflow, so we do it like this:
96 	*/
97 
98 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
99 		bucket->read_limit = cfg->read_maximum;
100 	else
101 		bucket->read_limit += n_ticks * cfg->read_rate;
102 
103 
104 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
105 		bucket->write_limit = cfg->write_maximum;
106 	else
107 		bucket->write_limit += n_ticks * cfg->write_rate;
108 
109 
110 	bucket->last_updated = current_tick;
111 
112 	return 1;
113 }
114 
115 static inline void
116 bufferevent_update_buckets(struct bufferevent_private *bev)
117 {
118 	/* Must hold lock on bev. */
119 	struct timeval now;
120 	unsigned tick;
121 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
122 	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
123 	if (tick != bev->rate_limiting->limit.last_updated)
124 		ev_token_bucket_update_(&bev->rate_limiting->limit,
125 		    bev->rate_limiting->cfg, tick);
126 }
127 
128 ev_uint32_t
129 ev_token_bucket_get_tick_(const struct timeval *tv,
130     const struct ev_token_bucket_cfg *cfg)
131 {
132 	/* This computation uses two multiplies and a divide.  We could do
133 	 * fewer if we knew that the tick length was an integer number of
134 	 * seconds, or if we knew it divided evenly into a second.  We should
135 	 * investigate that more.
136 	 */
137 
138 	/* We cast to an ev_uint64_t first, since we don't want to overflow
139 	 * before we do the final divide. */
140 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
141 	return (unsigned)(msec / cfg->msec_per_tick);
142 }
143 
144 struct ev_token_bucket_cfg *
145 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
146     size_t write_rate, size_t write_burst,
147     const struct timeval *tick_len)
148 {
149 	struct ev_token_bucket_cfg *r;
150 	struct timeval g;
151 	if (! tick_len) {
152 		g.tv_sec = 1;
153 		g.tv_usec = 0;
154 		tick_len = &g;
155 	}
156 	if (read_rate > read_burst || write_rate > write_burst ||
157 	    read_rate < 1 || write_rate < 1)
158 		return NULL;
159 	if (read_rate > EV_RATE_LIMIT_MAX ||
160 	    write_rate > EV_RATE_LIMIT_MAX ||
161 	    read_burst > EV_RATE_LIMIT_MAX ||
162 	    write_burst > EV_RATE_LIMIT_MAX)
163 		return NULL;
164 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
165 	if (!r)
166 		return NULL;
167 	r->read_rate = read_rate;
168 	r->write_rate = write_rate;
169 	r->read_maximum = read_burst;
170 	r->write_maximum = write_burst;
171 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
172 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
173 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
174 	return r;
175 }
176 
177 void
178 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
179 {
180 	mm_free(cfg);
181 }
182 
183 /* Default values for max_single_read & max_single_write variables. */
184 #define MAX_SINGLE_READ_DEFAULT 16384
185 #define MAX_SINGLE_WRITE_DEFAULT 16384
186 
187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189 
190 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
191 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
192 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
193 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
194 
195 /** Helper: figure out the maximum amount we should write if is_write, or
196     the maximum amount we should read if is_read.  Return that maximum, or
197     0 if our bucket is wholly exhausted.
198  */
199 static inline ev_ssize_t
200 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
201 {
202 	/* needs lock on bev. */
203 	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
204 
205 #define LIM(x)						\
206 	(is_write ? (x).write_limit : (x).read_limit)
207 
208 #define GROUP_SUSPENDED(g)			\
209 	(is_write ? (g)->write_suspended : (g)->read_suspended)
210 
211 	/* Sets max_so_far to MIN(x, max_so_far) */
212 #define CLAMPTO(x)				\
213 	do {					\
214 		if (max_so_far > (x))		\
215 			max_so_far = (x);	\
216 	} while (0);
217 
218 	if (!bev->rate_limiting)
219 		return max_so_far;
220 
221 	/* If rate-limiting is enabled at all, update the appropriate
222 	   bucket, and take the smaller of our rate limit and the group
223 	   rate limit.
224 	 */
225 
226 	if (bev->rate_limiting->cfg) {
227 		bufferevent_update_buckets(bev);
228 		max_so_far = LIM(bev->rate_limiting->limit);
229 	}
230 	if (bev->rate_limiting->group) {
231 		struct bufferevent_rate_limit_group *g =
232 		    bev->rate_limiting->group;
233 		ev_ssize_t share;
234 		LOCK_GROUP(g);
235 		if (GROUP_SUSPENDED(g)) {
236 			/* We can get here if we failed to lock this
237 			 * particular bufferevent while suspending the whole
238 			 * group. */
239 			if (is_write)
240 				bufferevent_suspend_write_(&bev->bev,
241 				    BEV_SUSPEND_BW_GROUP);
242 			else
243 				bufferevent_suspend_read_(&bev->bev,
244 				    BEV_SUSPEND_BW_GROUP);
245 			share = 0;
246 		} else {
247 			/* XXXX probably we should divide among the active
248 			 * members, not the total members. */
249 			share = LIM(g->rate_limit) / g->n_members;
250 			if (share < g->min_share)
251 				share = g->min_share;
252 		}
253 		UNLOCK_GROUP(g);
254 		CLAMPTO(share);
255 	}
256 
257 	if (max_so_far < 0)
258 		max_so_far = 0;
259 	return max_so_far;
260 }
261 
262 ev_ssize_t
263 bufferevent_get_read_max_(struct bufferevent_private *bev)
264 {
265 	return bufferevent_get_rlim_max_(bev, 0);
266 }
267 
268 ev_ssize_t
269 bufferevent_get_write_max_(struct bufferevent_private *bev)
270 {
271 	return bufferevent_get_rlim_max_(bev, 1);
272 }
273 
274 int
275 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
276 {
277 	/* XXXXX Make sure all users of this function check its return value */
278 	int r = 0;
279 	/* need to hold lock on bev */
280 	if (!bev->rate_limiting)
281 		return 0;
282 
283 	if (bev->rate_limiting->cfg) {
284 		bev->rate_limiting->limit.read_limit -= bytes;
285 		if (bev->rate_limiting->limit.read_limit <= 0) {
286 			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
287 			if (event_add(&bev->rate_limiting->refill_bucket_event,
288 				&bev->rate_limiting->cfg->tick_timeout) < 0)
289 				r = -1;
290 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
291 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
292 				event_del(&bev->rate_limiting->refill_bucket_event);
293 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
294 		}
295 	}
296 
297 	if (bev->rate_limiting->group) {
298 		LOCK_GROUP(bev->rate_limiting->group);
299 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300 		bev->rate_limiting->group->total_read += bytes;
301 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302 			bev_group_suspend_reading_(bev->rate_limiting->group);
303 		} else if (bev->rate_limiting->group->read_suspended) {
304 			bev_group_unsuspend_reading_(bev->rate_limiting->group);
305 		}
306 		UNLOCK_GROUP(bev->rate_limiting->group);
307 	}
308 
309 	return r;
310 }
311 
312 int
313 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
314 {
315 	/* XXXXX Make sure all users of this function check its return value */
316 	int r = 0;
317 	/* need to hold lock */
318 	if (!bev->rate_limiting)
319 		return 0;
320 
321 	if (bev->rate_limiting->cfg) {
322 		bev->rate_limiting->limit.write_limit -= bytes;
323 		if (bev->rate_limiting->limit.write_limit <= 0) {
324 			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
325 			if (event_add(&bev->rate_limiting->refill_bucket_event,
326 				&bev->rate_limiting->cfg->tick_timeout) < 0)
327 				r = -1;
328 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
329 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
330 				event_del(&bev->rate_limiting->refill_bucket_event);
331 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
332 		}
333 	}
334 
335 	if (bev->rate_limiting->group) {
336 		LOCK_GROUP(bev->rate_limiting->group);
337 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338 		bev->rate_limiting->group->total_written += bytes;
339 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340 			bev_group_suspend_writing_(bev->rate_limiting->group);
341 		} else if (bev->rate_limiting->group->write_suspended) {
342 			bev_group_unsuspend_writing_(bev->rate_limiting->group);
343 		}
344 		UNLOCK_GROUP(bev->rate_limiting->group);
345 	}
346 
347 	return r;
348 }
349 
350 /** Stop reading on every bufferevent in <b>g</b> */
351 static int
352 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
353 {
354 	/* Needs group lock */
355 	struct bufferevent_private *bev;
356 	g->read_suspended = 1;
357 	g->pending_unsuspend_read = 0;
358 
359 	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
360 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
361 	   the bufferevent locks.  If we are unable to lock any individual
362 	   bufferevent, it will find out later when it looks at its limit
363 	   and sees that its group is suspended.)
364 	*/
365 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
367 			bufferevent_suspend_read_(&bev->bev,
368 			    BEV_SUSPEND_BW_GROUP);
369 			EVLOCK_UNLOCK(bev->lock, 0);
370 		}
371 	}
372 	return 0;
373 }
374 
375 /** Stop writing on every bufferevent in <b>g</b> */
376 static int
377 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
378 {
379 	/* Needs group lock */
380 	struct bufferevent_private *bev;
381 	g->write_suspended = 1;
382 	g->pending_unsuspend_write = 0;
383 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
385 			bufferevent_suspend_write_(&bev->bev,
386 			    BEV_SUSPEND_BW_GROUP);
387 			EVLOCK_UNLOCK(bev->lock, 0);
388 		}
389 	}
390 	return 0;
391 }
392 
393 /** Timer callback invoked on a single bufferevent with one or more exhausted
394     buckets when they are ready to refill. */
395 static void
396 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
397 {
398 	unsigned tick;
399 	struct timeval now;
400 	struct bufferevent_private *bev = arg;
401 	int again = 0;
402 	BEV_LOCK(&bev->bev);
403 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404 		BEV_UNLOCK(&bev->bev);
405 		return;
406 	}
407 
408 	/* First, update the bucket */
409 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410 	tick = ev_token_bucket_get_tick_(&now,
411 	    bev->rate_limiting->cfg);
412 	ev_token_bucket_update_(&bev->rate_limiting->limit,
413 	    bev->rate_limiting->cfg,
414 	    tick);
415 
416 	/* Now unsuspend any read/write operations as appropriate. */
417 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418 		if (bev->rate_limiting->limit.read_limit > 0)
419 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
420 		else
421 			again = 1;
422 	}
423 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424 		if (bev->rate_limiting->limit.write_limit > 0)
425 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
426 		else
427 			again = 1;
428 	}
429 	if (again) {
430 		/* One or more of the buckets may need another refill if they
431 		   started negative.
432 
433 		   XXXX if we need to be quiet for more ticks, we should
434 		   maybe figure out what timeout we really want.
435 		*/
436 		/* XXXX Handle event_add failure somehow */
437 		event_add(&bev->rate_limiting->refill_bucket_event,
438 		    &bev->rate_limiting->cfg->tick_timeout);
439 	}
440 	BEV_UNLOCK(&bev->bev);
441 }
442 
443 /** Helper: grab a random element from a bufferevent group.
444  *
445  * Requires that we hold the lock on the group.
446  */
447 static struct bufferevent_private *
448 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
449 {
450 	int which;
451 	struct bufferevent_private *bev;
452 
453 	/* requires group lock */
454 
455 	if (!group->n_members)
456 		return NULL;
457 
458 	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
459 
460 	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
461 
462 	bev = LIST_FIRST(&group->members);
463 	while (which--)
464 		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
465 
466 	return bev;
467 }
468 
469 /** Iterate over the elements of a rate-limiting group 'g' with a random
470     starting point, assigning each to the variable 'bev', and executing the
471     block 'block'.
472 
473     We do this in a half-baked effort to get fairness among group members.
474     XXX Round-robin or some kind of priority queue would be even more fair.
475  */
476 #define FOREACH_RANDOM_ORDER(block)			\
477 	do {						\
478 		first = bev_group_random_element_(g);	\
479 		for (bev = first; bev != LIST_END(&g->members); \
480 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
481 			block ;					 \
482 		}						 \
483 		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
484 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
485 			block ;						\
486 		}							\
487 	} while (0)
488 
489 static void
490 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
491 {
492 	int again = 0;
493 	struct bufferevent_private *bev, *first;
494 
495 	g->read_suspended = 0;
496 	FOREACH_RANDOM_ORDER({
497 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
498 			bufferevent_unsuspend_read_(&bev->bev,
499 			    BEV_SUSPEND_BW_GROUP);
500 			EVLOCK_UNLOCK(bev->lock, 0);
501 		} else {
502 			again = 1;
503 		}
504 	});
505 	g->pending_unsuspend_read = again;
506 }
507 
508 static void
509 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
510 {
511 	int again = 0;
512 	struct bufferevent_private *bev, *first;
513 	g->write_suspended = 0;
514 
515 	FOREACH_RANDOM_ORDER({
516 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
517 			bufferevent_unsuspend_write_(&bev->bev,
518 			    BEV_SUSPEND_BW_GROUP);
519 			EVLOCK_UNLOCK(bev->lock, 0);
520 		} else {
521 			again = 1;
522 		}
523 	});
524 	g->pending_unsuspend_write = again;
525 }
526 
527 /** Callback invoked every tick to add more elements to the group bucket
528     and unsuspend group members as needed.
529  */
530 static void
531 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
532 {
533 	struct bufferevent_rate_limit_group *g = arg;
534 	unsigned tick;
535 	struct timeval now;
536 
537 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
538 
539 	LOCK_GROUP(g);
540 
541 	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
542 	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
543 
544 	if (g->pending_unsuspend_read ||
545 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
546 		bev_group_unsuspend_reading_(g);
547 	}
548 	if (g->pending_unsuspend_write ||
549 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
550 		bev_group_unsuspend_writing_(g);
551 	}
552 
553 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
554 	 * with pending_unsuspend_write/read, we should do it on the
555 	 * next iteration of the mainloop.
556 	 */
557 
558 	UNLOCK_GROUP(g);
559 }
560 
561 int
562 bufferevent_set_rate_limit(struct bufferevent *bev,
563     struct ev_token_bucket_cfg *cfg)
564 {
565 	struct bufferevent_private *bevp =
566 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
567 	int r = -1;
568 	struct bufferevent_rate_limit *rlim;
569 	struct timeval now;
570 	ev_uint32_t tick;
571 	int reinit = 0, suspended = 0;
572 	/* XXX reference-count cfg */
573 
574 	BEV_LOCK(bev);
575 
576 	if (cfg == NULL) {
577 		if (bevp->rate_limiting) {
578 			rlim = bevp->rate_limiting;
579 			rlim->cfg = NULL;
580 			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
581 			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
582 			if (event_initialized(&rlim->refill_bucket_event))
583 				event_del(&rlim->refill_bucket_event);
584 		}
585 		r = 0;
586 		goto done;
587 	}
588 
589 	event_base_gettimeofday_cached(bev->ev_base, &now);
590 	tick = ev_token_bucket_get_tick_(&now, cfg);
591 
592 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
593 		/* no-op */
594 		r = 0;
595 		goto done;
596 	}
597 	if (bevp->rate_limiting == NULL) {
598 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
599 		if (!rlim)
600 			goto done;
601 		bevp->rate_limiting = rlim;
602 	} else {
603 		rlim = bevp->rate_limiting;
604 	}
605 	reinit = rlim->cfg != NULL;
606 
607 	rlim->cfg = cfg;
608 	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
609 
610 	if (reinit) {
611 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
612 		event_del(&rlim->refill_bucket_event);
613 	}
614 	evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
615 	    bev_refill_callback_, bevp);
616 
617 	if (rlim->limit.read_limit > 0) {
618 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
619 	} else {
620 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
621 		suspended=1;
622 	}
623 	if (rlim->limit.write_limit > 0) {
624 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
625 	} else {
626 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
627 		suspended = 1;
628 	}
629 
630 	if (suspended)
631 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
632 
633 	r = 0;
634 
635 done:
636 	BEV_UNLOCK(bev);
637 	return r;
638 }
639 
640 struct bufferevent_rate_limit_group *
641 bufferevent_rate_limit_group_new(struct event_base *base,
642     const struct ev_token_bucket_cfg *cfg)
643 {
644 	struct bufferevent_rate_limit_group *g;
645 	struct timeval now;
646 	ev_uint32_t tick;
647 
648 	event_base_gettimeofday_cached(base, &now);
649 	tick = ev_token_bucket_get_tick_(&now, cfg);
650 
651 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
652 	if (!g)
653 		return NULL;
654 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
655 	LIST_INIT(&g->members);
656 
657 	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
658 
659 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
660 	    bev_group_refill_callback_, g);
661 	/*XXXX handle event_add failure */
662 	event_add(&g->master_refill_event, &cfg->tick_timeout);
663 
664 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
665 
666 	bufferevent_rate_limit_group_set_min_share(g, 64);
667 
668 	evutil_weakrand_seed_(&g->weakrand_seed,
669 	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
670 
671 	return g;
672 }
673 
674 int
675 bufferevent_rate_limit_group_set_cfg(
676 	struct bufferevent_rate_limit_group *g,
677 	const struct ev_token_bucket_cfg *cfg)
678 {
679 	int same_tick;
680 	if (!g || !cfg)
681 		return -1;
682 
683 	LOCK_GROUP(g);
684 	same_tick = evutil_timercmp(
685 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
686 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
687 
688 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
689 		g->rate_limit.read_limit = cfg->read_maximum;
690 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
691 		g->rate_limit.write_limit = cfg->write_maximum;
692 
693 	if (!same_tick) {
694 		/* This can cause a hiccup in the schedule */
695 		event_add(&g->master_refill_event, &cfg->tick_timeout);
696 	}
697 
698 	/* The new limits might force us to adjust min_share differently. */
699 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
700 
701 	UNLOCK_GROUP(g);
702 	return 0;
703 }
704 
705 int
706 bufferevent_rate_limit_group_set_min_share(
707 	struct bufferevent_rate_limit_group *g,
708 	size_t share)
709 {
710 	if (share > EV_SSIZE_MAX)
711 		return -1;
712 
713 	g->configured_min_share = share;
714 
715 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
716 	 * state, at least one connection can go per tick. */
717 	if (share > g->rate_limit_cfg.read_rate)
718 		share = g->rate_limit_cfg.read_rate;
719 	if (share > g->rate_limit_cfg.write_rate)
720 		share = g->rate_limit_cfg.write_rate;
721 
722 	g->min_share = share;
723 	return 0;
724 }
725 
726 void
727 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
728 {
729 	LOCK_GROUP(g);
730 	EVUTIL_ASSERT(0 == g->n_members);
731 	event_del(&g->master_refill_event);
732 	UNLOCK_GROUP(g);
733 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
734 	mm_free(g);
735 }
736 
737 int
738 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
739     struct bufferevent_rate_limit_group *g)
740 {
741 	int wsuspend, rsuspend;
742 	struct bufferevent_private *bevp =
743 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
744 	BEV_LOCK(bev);
745 
746 	if (!bevp->rate_limiting) {
747 		struct bufferevent_rate_limit *rlim;
748 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
749 		if (!rlim) {
750 			BEV_UNLOCK(bev);
751 			return -1;
752 		}
753 		evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
754 		    bev_refill_callback_, bevp);
755 		bevp->rate_limiting = rlim;
756 	}
757 
758 	if (bevp->rate_limiting->group == g) {
759 		BEV_UNLOCK(bev);
760 		return 0;
761 	}
762 	if (bevp->rate_limiting->group)
763 		bufferevent_remove_from_rate_limit_group(bev);
764 
765 	LOCK_GROUP(g);
766 	bevp->rate_limiting->group = g;
767 	++g->n_members;
768 	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
769 
770 	rsuspend = g->read_suspended;
771 	wsuspend = g->write_suspended;
772 
773 	UNLOCK_GROUP(g);
774 
775 	if (rsuspend)
776 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
777 	if (wsuspend)
778 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
779 
780 	BEV_UNLOCK(bev);
781 	return 0;
782 }
783 
784 int
785 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
786 {
787 	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
788 }
789 
790 int
791 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
792     int unsuspend)
793 {
794 	struct bufferevent_private *bevp =
795 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
796 	BEV_LOCK(bev);
797 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
798 		struct bufferevent_rate_limit_group *g =
799 		    bevp->rate_limiting->group;
800 		LOCK_GROUP(g);
801 		bevp->rate_limiting->group = NULL;
802 		--g->n_members;
803 		LIST_REMOVE(bevp, rate_limiting->next_in_group);
804 		UNLOCK_GROUP(g);
805 	}
806 	if (unsuspend) {
807 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
808 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
809 	}
810 	BEV_UNLOCK(bev);
811 	return 0;
812 }
813 
814 /* ===
815  * API functions to expose rate limits.
816  *
817  * Don't use these from inside Libevent; they're meant to be for use by
818  * the program.
819  * === */
820 
821 /* Mostly you don't want to use this function from inside libevent;
822  * bufferevent_get_read_max_() is more likely what you want*/
823 ev_ssize_t
824 bufferevent_get_read_limit(struct bufferevent *bev)
825 {
826 	ev_ssize_t r;
827 	struct bufferevent_private *bevp;
828 	BEV_LOCK(bev);
829 	bevp = BEV_UPCAST(bev);
830 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
831 		bufferevent_update_buckets(bevp);
832 		r = bevp->rate_limiting->limit.read_limit;
833 	} else {
834 		r = EV_SSIZE_MAX;
835 	}
836 	BEV_UNLOCK(bev);
837 	return r;
838 }
839 
840 /* Mostly you don't want to use this function from inside libevent;
841  * bufferevent_get_write_max_() is more likely what you want*/
842 ev_ssize_t
843 bufferevent_get_write_limit(struct bufferevent *bev)
844 {
845 	ev_ssize_t r;
846 	struct bufferevent_private *bevp;
847 	BEV_LOCK(bev);
848 	bevp = BEV_UPCAST(bev);
849 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
850 		bufferevent_update_buckets(bevp);
851 		r = bevp->rate_limiting->limit.write_limit;
852 	} else {
853 		r = EV_SSIZE_MAX;
854 	}
855 	BEV_UNLOCK(bev);
856 	return r;
857 }
858 
859 int
860 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
861 {
862 	struct bufferevent_private *bevp;
863 	BEV_LOCK(bev);
864 	bevp = BEV_UPCAST(bev);
865 	if (size == 0 || size > EV_SSIZE_MAX)
866 		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
867 	else
868 		bevp->max_single_read = size;
869 	BEV_UNLOCK(bev);
870 	return 0;
871 }
872 
873 int
874 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
875 {
876 	struct bufferevent_private *bevp;
877 	BEV_LOCK(bev);
878 	bevp = BEV_UPCAST(bev);
879 	if (size == 0 || size > EV_SSIZE_MAX)
880 		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
881 	else
882 		bevp->max_single_write = size;
883 	BEV_UNLOCK(bev);
884 	return 0;
885 }
886 
887 ev_ssize_t
888 bufferevent_get_max_single_read(struct bufferevent *bev)
889 {
890 	ev_ssize_t r;
891 
892 	BEV_LOCK(bev);
893 	r = BEV_UPCAST(bev)->max_single_read;
894 	BEV_UNLOCK(bev);
895 	return r;
896 }
897 
898 ev_ssize_t
899 bufferevent_get_max_single_write(struct bufferevent *bev)
900 {
901 	ev_ssize_t r;
902 
903 	BEV_LOCK(bev);
904 	r = BEV_UPCAST(bev)->max_single_write;
905 	BEV_UNLOCK(bev);
906 	return r;
907 }
908 
909 ev_ssize_t
910 bufferevent_get_max_to_read(struct bufferevent *bev)
911 {
912 	ev_ssize_t r;
913 	BEV_LOCK(bev);
914 	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
915 	BEV_UNLOCK(bev);
916 	return r;
917 }
918 
919 ev_ssize_t
920 bufferevent_get_max_to_write(struct bufferevent *bev)
921 {
922 	ev_ssize_t r;
923 	BEV_LOCK(bev);
924 	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
925 	BEV_UNLOCK(bev);
926 	return r;
927 }
928 
929 
930 /* Mostly you don't want to use this function from inside libevent;
931  * bufferevent_get_read_max_() is more likely what you want*/
932 ev_ssize_t
933 bufferevent_rate_limit_group_get_read_limit(
934 	struct bufferevent_rate_limit_group *grp)
935 {
936 	ev_ssize_t r;
937 	LOCK_GROUP(grp);
938 	r = grp->rate_limit.read_limit;
939 	UNLOCK_GROUP(grp);
940 	return r;
941 }
942 
943 /* Mostly you don't want to use this function from inside libevent;
944  * bufferevent_get_write_max_() is more likely what you want. */
945 ev_ssize_t
946 bufferevent_rate_limit_group_get_write_limit(
947 	struct bufferevent_rate_limit_group *grp)
948 {
949 	ev_ssize_t r;
950 	LOCK_GROUP(grp);
951 	r = grp->rate_limit.write_limit;
952 	UNLOCK_GROUP(grp);
953 	return r;
954 }
955 
956 int
957 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
958 {
959 	int r = 0;
960 	ev_ssize_t old_limit, new_limit;
961 	struct bufferevent_private *bevp;
962 	BEV_LOCK(bev);
963 	bevp = BEV_UPCAST(bev);
964 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
965 	old_limit = bevp->rate_limiting->limit.read_limit;
966 
967 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
968 	if (old_limit > 0 && new_limit <= 0) {
969 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
970 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
971 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
972 			r = -1;
973 	} else if (old_limit <= 0 && new_limit > 0) {
974 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
975 			event_del(&bevp->rate_limiting->refill_bucket_event);
976 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
977 	}
978 
979 	BEV_UNLOCK(bev);
980 	return r;
981 }
982 
983 int
984 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
985 {
986 	/* XXXX this is mostly copy-and-paste from
987 	 * bufferevent_decrement_read_limit */
988 	int r = 0;
989 	ev_ssize_t old_limit, new_limit;
990 	struct bufferevent_private *bevp;
991 	BEV_LOCK(bev);
992 	bevp = BEV_UPCAST(bev);
993 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
994 	old_limit = bevp->rate_limiting->limit.write_limit;
995 
996 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
997 	if (old_limit > 0 && new_limit <= 0) {
998 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
999 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
1000 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
1001 			r = -1;
1002 	} else if (old_limit <= 0 && new_limit > 0) {
1003 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1004 			event_del(&bevp->rate_limiting->refill_bucket_event);
1005 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1006 	}
1007 
1008 	BEV_UNLOCK(bev);
1009 	return r;
1010 }
1011 
1012 int
1013 bufferevent_rate_limit_group_decrement_read(
1014 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1015 {
1016 	int r = 0;
1017 	ev_ssize_t old_limit, new_limit;
1018 	LOCK_GROUP(grp);
1019 	old_limit = grp->rate_limit.read_limit;
1020 	new_limit = (grp->rate_limit.read_limit -= decr);
1021 
1022 	if (old_limit > 0 && new_limit <= 0) {
1023 		bev_group_suspend_reading_(grp);
1024 	} else if (old_limit <= 0 && new_limit > 0) {
1025 		bev_group_unsuspend_reading_(grp);
1026 	}
1027 
1028 	UNLOCK_GROUP(grp);
1029 	return r;
1030 }
1031 
1032 int
1033 bufferevent_rate_limit_group_decrement_write(
1034 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1035 {
1036 	int r = 0;
1037 	ev_ssize_t old_limit, new_limit;
1038 	LOCK_GROUP(grp);
1039 	old_limit = grp->rate_limit.write_limit;
1040 	new_limit = (grp->rate_limit.write_limit -= decr);
1041 
1042 	if (old_limit > 0 && new_limit <= 0) {
1043 		bev_group_suspend_writing_(grp);
1044 	} else if (old_limit <= 0 && new_limit > 0) {
1045 		bev_group_unsuspend_writing_(grp);
1046 	}
1047 
1048 	UNLOCK_GROUP(grp);
1049 	return r;
1050 }
1051 
1052 void
1053 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1054     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1055 {
1056 	EVUTIL_ASSERT(grp != NULL);
1057 	if (total_read_out)
1058 		*total_read_out = grp->total_read;
1059 	if (total_written_out)
1060 		*total_written_out = grp->total_written;
1061 }
1062 
1063 void
1064 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1065 {
1066 	grp->total_read = grp->total_written = 0;
1067 }
1068 
1069 int
1070 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1071 {
1072 	bev->rate_limiting = NULL;
1073 	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1074 	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1075 
1076 	return 0;
1077 }
1078