xref: /netbsd-src/external/bsd/ntp/dist/sntp/libevent/bufferevent_ratelim.c (revision eabc0478de71e4e011a5b4e0392741e01d491794)
1*eabc0478Schristos /*	$NetBSD: bufferevent_ratelim.c,v 1.6 2024/08/18 20:47:21 christos Exp $	*/
28585484eSchristos 
38585484eSchristos /*
48585484eSchristos  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
58585484eSchristos  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
68585484eSchristos  * All rights reserved.
78585484eSchristos  *
88585484eSchristos  * Redistribution and use in source and binary forms, with or without
98585484eSchristos  * modification, are permitted provided that the following conditions
108585484eSchristos  * are met:
118585484eSchristos  * 1. Redistributions of source code must retain the above copyright
128585484eSchristos  *    notice, this list of conditions and the following disclaimer.
138585484eSchristos  * 2. Redistributions in binary form must reproduce the above copyright
148585484eSchristos  *    notice, this list of conditions and the following disclaimer in the
158585484eSchristos  *    documentation and/or other materials provided with the distribution.
168585484eSchristos  * 3. The name of the author may not be used to endorse or promote products
178585484eSchristos  *    derived from this software without specific prior written permission.
188585484eSchristos  *
198585484eSchristos  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
208585484eSchristos  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
218585484eSchristos  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
228585484eSchristos  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
238585484eSchristos  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
248585484eSchristos  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
258585484eSchristos  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
268585484eSchristos  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
278585484eSchristos  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
288585484eSchristos  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
298585484eSchristos  */
308585484eSchristos #include "evconfig-private.h"
318585484eSchristos 
328585484eSchristos #include <sys/types.h>
338585484eSchristos #include <limits.h>
348585484eSchristos #include <string.h>
358585484eSchristos #include <stdlib.h>
368585484eSchristos 
378585484eSchristos #include "event2/event.h"
388585484eSchristos #include "event2/event_struct.h"
398585484eSchristos #include "event2/util.h"
408585484eSchristos #include "event2/bufferevent.h"
418585484eSchristos #include "event2/bufferevent_struct.h"
428585484eSchristos #include "event2/buffer.h"
438585484eSchristos 
448585484eSchristos #include "ratelim-internal.h"
458585484eSchristos 
468585484eSchristos #include "bufferevent-internal.h"
478585484eSchristos #include "mm-internal.h"
488585484eSchristos #include "util-internal.h"
498585484eSchristos #include "event-internal.h"
508585484eSchristos 
518585484eSchristos int
528585484eSchristos ev_token_bucket_init_(struct ev_token_bucket *bucket,
538585484eSchristos     const struct ev_token_bucket_cfg *cfg,
548585484eSchristos     ev_uint32_t current_tick,
558585484eSchristos     int reinitialize)
568585484eSchristos {
578585484eSchristos 	if (reinitialize) {
588585484eSchristos 		/* on reinitialization, we only clip downwards, since we've
598585484eSchristos 		   already used who-knows-how-much bandwidth this tick.  We
608585484eSchristos 		   leave "last_updated" as it is; the next update will add the
618585484eSchristos 		   appropriate amount of bandwidth to the bucket.
628585484eSchristos 		*/
638585484eSchristos 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
648585484eSchristos 			bucket->read_limit = cfg->read_maximum;
658585484eSchristos 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
668585484eSchristos 			bucket->write_limit = cfg->write_maximum;
678585484eSchristos 	} else {
688585484eSchristos 		bucket->read_limit = cfg->read_rate;
698585484eSchristos 		bucket->write_limit = cfg->write_rate;
708585484eSchristos 		bucket->last_updated = current_tick;
718585484eSchristos 	}
728585484eSchristos 	return 0;
738585484eSchristos }
748585484eSchristos 
758585484eSchristos int
768585484eSchristos ev_token_bucket_update_(struct ev_token_bucket *bucket,
778585484eSchristos     const struct ev_token_bucket_cfg *cfg,
788585484eSchristos     ev_uint32_t current_tick)
798585484eSchristos {
808585484eSchristos 	/* It's okay if the tick number overflows, since we'll just
818585484eSchristos 	 * wrap around when we do the unsigned substraction. */
828585484eSchristos 	unsigned n_ticks = current_tick - bucket->last_updated;
838585484eSchristos 
848585484eSchristos 	/* Make sure some ticks actually happened, and that time didn't
858585484eSchristos 	 * roll back. */
868585484eSchristos 	if (n_ticks == 0 || n_ticks > INT_MAX)
878585484eSchristos 		return 0;
888585484eSchristos 
898585484eSchristos 	/* Naively, we would say
908585484eSchristos 		bucket->limit += n_ticks * cfg->rate;
918585484eSchristos 
928585484eSchristos 		if (bucket->limit > cfg->maximum)
938585484eSchristos 			bucket->limit = cfg->maximum;
948585484eSchristos 
958585484eSchristos 	   But we're worried about overflow, so we do it like this:
968585484eSchristos 	*/
978585484eSchristos 
988585484eSchristos 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
998585484eSchristos 		bucket->read_limit = cfg->read_maximum;
1008585484eSchristos 	else
1018585484eSchristos 		bucket->read_limit += n_ticks * cfg->read_rate;
1028585484eSchristos 
1038585484eSchristos 
1048585484eSchristos 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
1058585484eSchristos 		bucket->write_limit = cfg->write_maximum;
1068585484eSchristos 	else
1078585484eSchristos 		bucket->write_limit += n_ticks * cfg->write_rate;
1088585484eSchristos 
1098585484eSchristos 
1108585484eSchristos 	bucket->last_updated = current_tick;
1118585484eSchristos 
1128585484eSchristos 	return 1;
1138585484eSchristos }
1148585484eSchristos 
1158585484eSchristos static inline void
1168585484eSchristos bufferevent_update_buckets(struct bufferevent_private *bev)
1178585484eSchristos {
1188585484eSchristos 	/* Must hold lock on bev. */
1198585484eSchristos 	struct timeval now;
1208585484eSchristos 	unsigned tick;
1218585484eSchristos 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
1228585484eSchristos 	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
1238585484eSchristos 	if (tick != bev->rate_limiting->limit.last_updated)
1248585484eSchristos 		ev_token_bucket_update_(&bev->rate_limiting->limit,
1258585484eSchristos 		    bev->rate_limiting->cfg, tick);
1268585484eSchristos }
1278585484eSchristos 
1288585484eSchristos ev_uint32_t
1298585484eSchristos ev_token_bucket_get_tick_(const struct timeval *tv,
1308585484eSchristos     const struct ev_token_bucket_cfg *cfg)
1318585484eSchristos {
1328585484eSchristos 	/* This computation uses two multiplies and a divide.  We could do
1338585484eSchristos 	 * fewer if we knew that the tick length was an integer number of
1348585484eSchristos 	 * seconds, or if we knew it divided evenly into a second.  We should
1358585484eSchristos 	 * investigate that more.
1368585484eSchristos 	 */
1378585484eSchristos 
1388585484eSchristos 	/* We cast to an ev_uint64_t first, since we don't want to overflow
1398585484eSchristos 	 * before we do the final divide. */
1408585484eSchristos 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
1418585484eSchristos 	return (unsigned)(msec / cfg->msec_per_tick);
1428585484eSchristos }
1438585484eSchristos 
1448585484eSchristos struct ev_token_bucket_cfg *
1458585484eSchristos ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
1468585484eSchristos     size_t write_rate, size_t write_burst,
1478585484eSchristos     const struct timeval *tick_len)
1488585484eSchristos {
1498585484eSchristos 	struct ev_token_bucket_cfg *r;
1508585484eSchristos 	struct timeval g;
1518585484eSchristos 	if (! tick_len) {
1528585484eSchristos 		g.tv_sec = 1;
1538585484eSchristos 		g.tv_usec = 0;
1548585484eSchristos 		tick_len = &g;
1558585484eSchristos 	}
1568585484eSchristos 	if (read_rate > read_burst || write_rate > write_burst ||
1578585484eSchristos 	    read_rate < 1 || write_rate < 1)
1588585484eSchristos 		return NULL;
1598585484eSchristos 	if (read_rate > EV_RATE_LIMIT_MAX ||
1608585484eSchristos 	    write_rate > EV_RATE_LIMIT_MAX ||
1618585484eSchristos 	    read_burst > EV_RATE_LIMIT_MAX ||
1628585484eSchristos 	    write_burst > EV_RATE_LIMIT_MAX)
1638585484eSchristos 		return NULL;
1648585484eSchristos 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
1658585484eSchristos 	if (!r)
1668585484eSchristos 		return NULL;
1678585484eSchristos 	r->read_rate = read_rate;
1688585484eSchristos 	r->write_rate = write_rate;
1698585484eSchristos 	r->read_maximum = read_burst;
1708585484eSchristos 	r->write_maximum = write_burst;
1718585484eSchristos 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
1728585484eSchristos 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
1738585484eSchristos 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
1748585484eSchristos 	return r;
1758585484eSchristos }
1768585484eSchristos 
1778585484eSchristos void
1788585484eSchristos ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
1798585484eSchristos {
1808585484eSchristos 	mm_free(cfg);
1818585484eSchristos }
1828585484eSchristos 
1838585484eSchristos /* Default values for max_single_read & max_single_write variables. */
1848585484eSchristos #define MAX_SINGLE_READ_DEFAULT 16384
1858585484eSchristos #define MAX_SINGLE_WRITE_DEFAULT 16384
1868585484eSchristos 
1878585484eSchristos #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
1888585484eSchristos #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
1898585484eSchristos 
1908585484eSchristos static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
1918585484eSchristos static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
1928585484eSchristos static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
1938585484eSchristos static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
1948585484eSchristos 
1958585484eSchristos /** Helper: figure out the maximum amount we should write if is_write, or
1968585484eSchristos     the maximum amount we should read if is_read.  Return that maximum, or
1978585484eSchristos     0 if our bucket is wholly exhausted.
1988585484eSchristos  */
1998585484eSchristos static inline ev_ssize_t
2008585484eSchristos bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
2018585484eSchristos {
2028585484eSchristos 	/* needs lock on bev. */
2038585484eSchristos 	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
2048585484eSchristos 
2058585484eSchristos #define LIM(x)						\
2068585484eSchristos 	(is_write ? (x).write_limit : (x).read_limit)
2078585484eSchristos 
2088585484eSchristos #define GROUP_SUSPENDED(g)			\
2098585484eSchristos 	(is_write ? (g)->write_suspended : (g)->read_suspended)
2108585484eSchristos 
2118585484eSchristos 	/* Sets max_so_far to MIN(x, max_so_far) */
2128585484eSchristos #define CLAMPTO(x)				\
2138585484eSchristos 	do {					\
2148585484eSchristos 		if (max_so_far > (x))		\
2158585484eSchristos 			max_so_far = (x);	\
2168585484eSchristos 	} while (0);
2178585484eSchristos 
2188585484eSchristos 	if (!bev->rate_limiting)
2198585484eSchristos 		return max_so_far;
2208585484eSchristos 
2218585484eSchristos 	/* If rate-limiting is enabled at all, update the appropriate
2228585484eSchristos 	   bucket, and take the smaller of our rate limit and the group
2238585484eSchristos 	   rate limit.
2248585484eSchristos 	 */
2258585484eSchristos 
2268585484eSchristos 	if (bev->rate_limiting->cfg) {
2278585484eSchristos 		bufferevent_update_buckets(bev);
2288585484eSchristos 		max_so_far = LIM(bev->rate_limiting->limit);
2298585484eSchristos 	}
2308585484eSchristos 	if (bev->rate_limiting->group) {
2318585484eSchristos 		struct bufferevent_rate_limit_group *g =
2328585484eSchristos 		    bev->rate_limiting->group;
2338585484eSchristos 		ev_ssize_t share;
2348585484eSchristos 		LOCK_GROUP(g);
2358585484eSchristos 		if (GROUP_SUSPENDED(g)) {
2368585484eSchristos 			/* We can get here if we failed to lock this
2378585484eSchristos 			 * particular bufferevent while suspending the whole
2388585484eSchristos 			 * group. */
2398585484eSchristos 			if (is_write)
2408585484eSchristos 				bufferevent_suspend_write_(&bev->bev,
2418585484eSchristos 				    BEV_SUSPEND_BW_GROUP);
2428585484eSchristos 			else
2438585484eSchristos 				bufferevent_suspend_read_(&bev->bev,
2448585484eSchristos 				    BEV_SUSPEND_BW_GROUP);
2458585484eSchristos 			share = 0;
2468585484eSchristos 		} else {
2478585484eSchristos 			/* XXXX probably we should divide among the active
2488585484eSchristos 			 * members, not the total members. */
2498585484eSchristos 			share = LIM(g->rate_limit) / g->n_members;
2508585484eSchristos 			if (share < g->min_share)
2518585484eSchristos 				share = g->min_share;
2528585484eSchristos 		}
2538585484eSchristos 		UNLOCK_GROUP(g);
2548585484eSchristos 		CLAMPTO(share);
2558585484eSchristos 	}
2568585484eSchristos 
2578585484eSchristos 	if (max_so_far < 0)
2588585484eSchristos 		max_so_far = 0;
2598585484eSchristos 	return max_so_far;
2608585484eSchristos }
2618585484eSchristos 
2628585484eSchristos ev_ssize_t
2638585484eSchristos bufferevent_get_read_max_(struct bufferevent_private *bev)
2648585484eSchristos {
2658585484eSchristos 	return bufferevent_get_rlim_max_(bev, 0);
2668585484eSchristos }
2678585484eSchristos 
2688585484eSchristos ev_ssize_t
2698585484eSchristos bufferevent_get_write_max_(struct bufferevent_private *bev)
2708585484eSchristos {
2718585484eSchristos 	return bufferevent_get_rlim_max_(bev, 1);
2728585484eSchristos }
2738585484eSchristos 
2748585484eSchristos int
2758585484eSchristos bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
2768585484eSchristos {
2778585484eSchristos 	/* XXXXX Make sure all users of this function check its return value */
2788585484eSchristos 	int r = 0;
2798585484eSchristos 	/* need to hold lock on bev */
2808585484eSchristos 	if (!bev->rate_limiting)
2818585484eSchristos 		return 0;
2828585484eSchristos 
2838585484eSchristos 	if (bev->rate_limiting->cfg) {
2848585484eSchristos 		bev->rate_limiting->limit.read_limit -= bytes;
2858585484eSchristos 		if (bev->rate_limiting->limit.read_limit <= 0) {
2868585484eSchristos 			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
2878585484eSchristos 			if (event_add(&bev->rate_limiting->refill_bucket_event,
2888585484eSchristos 				&bev->rate_limiting->cfg->tick_timeout) < 0)
2898585484eSchristos 				r = -1;
2908585484eSchristos 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
2918585484eSchristos 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
2928585484eSchristos 				event_del(&bev->rate_limiting->refill_bucket_event);
2938585484eSchristos 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
2948585484eSchristos 		}
2958585484eSchristos 	}
2968585484eSchristos 
2978585484eSchristos 	if (bev->rate_limiting->group) {
2988585484eSchristos 		LOCK_GROUP(bev->rate_limiting->group);
2998585484eSchristos 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
3008585484eSchristos 		bev->rate_limiting->group->total_read += bytes;
3018585484eSchristos 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
3028585484eSchristos 			bev_group_suspend_reading_(bev->rate_limiting->group);
3038585484eSchristos 		} else if (bev->rate_limiting->group->read_suspended) {
3048585484eSchristos 			bev_group_unsuspend_reading_(bev->rate_limiting->group);
3058585484eSchristos 		}
3068585484eSchristos 		UNLOCK_GROUP(bev->rate_limiting->group);
3078585484eSchristos 	}
3088585484eSchristos 
3098585484eSchristos 	return r;
3108585484eSchristos }
3118585484eSchristos 
3128585484eSchristos int
3138585484eSchristos bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
3148585484eSchristos {
3158585484eSchristos 	/* XXXXX Make sure all users of this function check its return value */
3168585484eSchristos 	int r = 0;
3178585484eSchristos 	/* need to hold lock */
3188585484eSchristos 	if (!bev->rate_limiting)
3198585484eSchristos 		return 0;
3208585484eSchristos 
3218585484eSchristos 	if (bev->rate_limiting->cfg) {
3228585484eSchristos 		bev->rate_limiting->limit.write_limit -= bytes;
3238585484eSchristos 		if (bev->rate_limiting->limit.write_limit <= 0) {
3248585484eSchristos 			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
3258585484eSchristos 			if (event_add(&bev->rate_limiting->refill_bucket_event,
3268585484eSchristos 				&bev->rate_limiting->cfg->tick_timeout) < 0)
3278585484eSchristos 				r = -1;
3288585484eSchristos 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
3298585484eSchristos 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
3308585484eSchristos 				event_del(&bev->rate_limiting->refill_bucket_event);
3318585484eSchristos 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
3328585484eSchristos 		}
3338585484eSchristos 	}
3348585484eSchristos 
3358585484eSchristos 	if (bev->rate_limiting->group) {
3368585484eSchristos 		LOCK_GROUP(bev->rate_limiting->group);
3378585484eSchristos 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
3388585484eSchristos 		bev->rate_limiting->group->total_written += bytes;
3398585484eSchristos 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
3408585484eSchristos 			bev_group_suspend_writing_(bev->rate_limiting->group);
3418585484eSchristos 		} else if (bev->rate_limiting->group->write_suspended) {
3428585484eSchristos 			bev_group_unsuspend_writing_(bev->rate_limiting->group);
3438585484eSchristos 		}
3448585484eSchristos 		UNLOCK_GROUP(bev->rate_limiting->group);
3458585484eSchristos 	}
3468585484eSchristos 
3478585484eSchristos 	return r;
3488585484eSchristos }
3498585484eSchristos 
3508585484eSchristos /** Stop reading on every bufferevent in <b>g</b> */
3518585484eSchristos static int
3528585484eSchristos bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
3538585484eSchristos {
3548585484eSchristos 	/* Needs group lock */
3558585484eSchristos 	struct bufferevent_private *bev;
3568585484eSchristos 	g->read_suspended = 1;
3578585484eSchristos 	g->pending_unsuspend_read = 0;
3588585484eSchristos 
3598585484eSchristos 	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
3608585484eSchristos 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
3618585484eSchristos 	   the bufferevent locks.  If we are unable to lock any individual
3628585484eSchristos 	   bufferevent, it will find out later when it looks at its limit
3638585484eSchristos 	   and sees that its group is suspended.)
3648585484eSchristos 	*/
3658585484eSchristos 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
3668585484eSchristos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
3678585484eSchristos 			bufferevent_suspend_read_(&bev->bev,
3688585484eSchristos 			    BEV_SUSPEND_BW_GROUP);
3698585484eSchristos 			EVLOCK_UNLOCK(bev->lock, 0);
3708585484eSchristos 		}
3718585484eSchristos 	}
3728585484eSchristos 	return 0;
3738585484eSchristos }
3748585484eSchristos 
3758585484eSchristos /** Stop writing on every bufferevent in <b>g</b> */
3768585484eSchristos static int
3778585484eSchristos bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
3788585484eSchristos {
3798585484eSchristos 	/* Needs group lock */
3808585484eSchristos 	struct bufferevent_private *bev;
3818585484eSchristos 	g->write_suspended = 1;
3828585484eSchristos 	g->pending_unsuspend_write = 0;
3838585484eSchristos 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
3848585484eSchristos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
3858585484eSchristos 			bufferevent_suspend_write_(&bev->bev,
3868585484eSchristos 			    BEV_SUSPEND_BW_GROUP);
3878585484eSchristos 			EVLOCK_UNLOCK(bev->lock, 0);
3888585484eSchristos 		}
3898585484eSchristos 	}
3908585484eSchristos 	return 0;
3918585484eSchristos }
3928585484eSchristos 
3938585484eSchristos /** Timer callback invoked on a single bufferevent with one or more exhausted
3948585484eSchristos     buckets when they are ready to refill. */
3958585484eSchristos static void
3968585484eSchristos bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
3978585484eSchristos {
3988585484eSchristos 	unsigned tick;
3998585484eSchristos 	struct timeval now;
4008585484eSchristos 	struct bufferevent_private *bev = arg;
4018585484eSchristos 	int again = 0;
4028585484eSchristos 	BEV_LOCK(&bev->bev);
4038585484eSchristos 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
4048585484eSchristos 		BEV_UNLOCK(&bev->bev);
4058585484eSchristos 		return;
4068585484eSchristos 	}
4078585484eSchristos 
4088585484eSchristos 	/* First, update the bucket */
4098585484eSchristos 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
4108585484eSchristos 	tick = ev_token_bucket_get_tick_(&now,
4118585484eSchristos 	    bev->rate_limiting->cfg);
4128585484eSchristos 	ev_token_bucket_update_(&bev->rate_limiting->limit,
4138585484eSchristos 	    bev->rate_limiting->cfg,
4148585484eSchristos 	    tick);
4158585484eSchristos 
4168585484eSchristos 	/* Now unsuspend any read/write operations as appropriate. */
4178585484eSchristos 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
4188585484eSchristos 		if (bev->rate_limiting->limit.read_limit > 0)
4198585484eSchristos 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
4208585484eSchristos 		else
4218585484eSchristos 			again = 1;
4228585484eSchristos 	}
4238585484eSchristos 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
4248585484eSchristos 		if (bev->rate_limiting->limit.write_limit > 0)
4258585484eSchristos 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
4268585484eSchristos 		else
4278585484eSchristos 			again = 1;
4288585484eSchristos 	}
4298585484eSchristos 	if (again) {
4308585484eSchristos 		/* One or more of the buckets may need another refill if they
4318585484eSchristos 		   started negative.
4328585484eSchristos 
4338585484eSchristos 		   XXXX if we need to be quiet for more ticks, we should
4348585484eSchristos 		   maybe figure out what timeout we really want.
4358585484eSchristos 		*/
4368585484eSchristos 		/* XXXX Handle event_add failure somehow */
4378585484eSchristos 		event_add(&bev->rate_limiting->refill_bucket_event,
4388585484eSchristos 		    &bev->rate_limiting->cfg->tick_timeout);
4398585484eSchristos 	}
4408585484eSchristos 	BEV_UNLOCK(&bev->bev);
4418585484eSchristos }
4428585484eSchristos 
4438585484eSchristos /** Helper: grab a random element from a bufferevent group.
4448585484eSchristos  *
4458585484eSchristos  * Requires that we hold the lock on the group.
4468585484eSchristos  */
4478585484eSchristos static struct bufferevent_private *
4488585484eSchristos bev_group_random_element_(struct bufferevent_rate_limit_group *group)
4498585484eSchristos {
4508585484eSchristos 	int which;
4518585484eSchristos 	struct bufferevent_private *bev;
4528585484eSchristos 
4538585484eSchristos 	/* requires group lock */
4548585484eSchristos 
4558585484eSchristos 	if (!group->n_members)
4568585484eSchristos 		return NULL;
4578585484eSchristos 
4588585484eSchristos 	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
4598585484eSchristos 
4608585484eSchristos 	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
4618585484eSchristos 
4628585484eSchristos 	bev = LIST_FIRST(&group->members);
4638585484eSchristos 	while (which--)
4648585484eSchristos 		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
4658585484eSchristos 
4668585484eSchristos 	return bev;
4678585484eSchristos }
4688585484eSchristos 
4698585484eSchristos /** Iterate over the elements of a rate-limiting group 'g' with a random
4708585484eSchristos     starting point, assigning each to the variable 'bev', and executing the
4718585484eSchristos     block 'block'.
4728585484eSchristos 
4738585484eSchristos     We do this in a half-baked effort to get fairness among group members.
4748585484eSchristos     XXX Round-robin or some kind of priority queue would be even more fair.
4758585484eSchristos  */
4768585484eSchristos #define FOREACH_RANDOM_ORDER(block)			\
4778585484eSchristos 	do {						\
4788585484eSchristos 		first = bev_group_random_element_(g);	\
4798585484eSchristos 		for (bev = first; bev != LIST_END(&g->members); \
4808585484eSchristos 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
4818585484eSchristos 			block ;					 \
4828585484eSchristos 		}						 \
4838585484eSchristos 		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
4848585484eSchristos 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
4858585484eSchristos 			block ;						\
4868585484eSchristos 		}							\
4878585484eSchristos 	} while (0)
4888585484eSchristos 
4898585484eSchristos static void
4908585484eSchristos bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
4918585484eSchristos {
4928585484eSchristos 	int again = 0;
4938585484eSchristos 	struct bufferevent_private *bev, *first;
4948585484eSchristos 
4958585484eSchristos 	g->read_suspended = 0;
4968585484eSchristos 	FOREACH_RANDOM_ORDER({
4978585484eSchristos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
4988585484eSchristos 			bufferevent_unsuspend_read_(&bev->bev,
4998585484eSchristos 			    BEV_SUSPEND_BW_GROUP);
5008585484eSchristos 			EVLOCK_UNLOCK(bev->lock, 0);
5018585484eSchristos 		} else {
5028585484eSchristos 			again = 1;
5038585484eSchristos 		}
5048585484eSchristos 	});
5058585484eSchristos 	g->pending_unsuspend_read = again;
5068585484eSchristos }
5078585484eSchristos 
5088585484eSchristos static void
5098585484eSchristos bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
5108585484eSchristos {
5118585484eSchristos 	int again = 0;
5128585484eSchristos 	struct bufferevent_private *bev, *first;
5138585484eSchristos 	g->write_suspended = 0;
5148585484eSchristos 
5158585484eSchristos 	FOREACH_RANDOM_ORDER({
5168585484eSchristos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
5178585484eSchristos 			bufferevent_unsuspend_write_(&bev->bev,
5188585484eSchristos 			    BEV_SUSPEND_BW_GROUP);
5198585484eSchristos 			EVLOCK_UNLOCK(bev->lock, 0);
5208585484eSchristos 		} else {
5218585484eSchristos 			again = 1;
5228585484eSchristos 		}
5238585484eSchristos 	});
5248585484eSchristos 	g->pending_unsuspend_write = again;
5258585484eSchristos }
5268585484eSchristos 
5278585484eSchristos /** Callback invoked every tick to add more elements to the group bucket
5288585484eSchristos     and unsuspend group members as needed.
5298585484eSchristos  */
5308585484eSchristos static void
5318585484eSchristos bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
5328585484eSchristos {
5338585484eSchristos 	struct bufferevent_rate_limit_group *g = arg;
5348585484eSchristos 	unsigned tick;
5358585484eSchristos 	struct timeval now;
5368585484eSchristos 
5378585484eSchristos 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
5388585484eSchristos 
5398585484eSchristos 	LOCK_GROUP(g);
5408585484eSchristos 
5418585484eSchristos 	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
5428585484eSchristos 	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
5438585484eSchristos 
5448585484eSchristos 	if (g->pending_unsuspend_read ||
5458585484eSchristos 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
5468585484eSchristos 		bev_group_unsuspend_reading_(g);
5478585484eSchristos 	}
5488585484eSchristos 	if (g->pending_unsuspend_write ||
5498585484eSchristos 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
5508585484eSchristos 		bev_group_unsuspend_writing_(g);
5518585484eSchristos 	}
5528585484eSchristos 
5538585484eSchristos 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
5548585484eSchristos 	 * with pending_unsuspend_write/read, we should do it on the
5558585484eSchristos 	 * next iteration of the mainloop.
5568585484eSchristos 	 */
5578585484eSchristos 
5588585484eSchristos 	UNLOCK_GROUP(g);
5598585484eSchristos }
5608585484eSchristos 
5618585484eSchristos int
5628585484eSchristos bufferevent_set_rate_limit(struct bufferevent *bev,
5638585484eSchristos     struct ev_token_bucket_cfg *cfg)
5648585484eSchristos {
565*eabc0478Schristos 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
5668585484eSchristos 	int r = -1;
5678585484eSchristos 	struct bufferevent_rate_limit *rlim;
5688585484eSchristos 	struct timeval now;
5698585484eSchristos 	ev_uint32_t tick;
5708585484eSchristos 	int reinit = 0, suspended = 0;
5718585484eSchristos 	/* XXX reference-count cfg */
5728585484eSchristos 
5738585484eSchristos 	BEV_LOCK(bev);
5748585484eSchristos 
5758585484eSchristos 	if (cfg == NULL) {
5768585484eSchristos 		if (bevp->rate_limiting) {
5778585484eSchristos 			rlim = bevp->rate_limiting;
5788585484eSchristos 			rlim->cfg = NULL;
5798585484eSchristos 			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
5808585484eSchristos 			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
5818585484eSchristos 			if (event_initialized(&rlim->refill_bucket_event))
5828585484eSchristos 				event_del(&rlim->refill_bucket_event);
5838585484eSchristos 		}
5848585484eSchristos 		r = 0;
5858585484eSchristos 		goto done;
5868585484eSchristos 	}
5878585484eSchristos 
5888585484eSchristos 	event_base_gettimeofday_cached(bev->ev_base, &now);
5898585484eSchristos 	tick = ev_token_bucket_get_tick_(&now, cfg);
5908585484eSchristos 
5918585484eSchristos 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
5928585484eSchristos 		/* no-op */
5938585484eSchristos 		r = 0;
5948585484eSchristos 		goto done;
5958585484eSchristos 	}
5968585484eSchristos 	if (bevp->rate_limiting == NULL) {
5978585484eSchristos 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
5988585484eSchristos 		if (!rlim)
5998585484eSchristos 			goto done;
6008585484eSchristos 		bevp->rate_limiting = rlim;
6018585484eSchristos 	} else {
6028585484eSchristos 		rlim = bevp->rate_limiting;
6038585484eSchristos 	}
6048585484eSchristos 	reinit = rlim->cfg != NULL;
6058585484eSchristos 
6068585484eSchristos 	rlim->cfg = cfg;
6078585484eSchristos 	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
6088585484eSchristos 
6098585484eSchristos 	if (reinit) {
6108585484eSchristos 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
6118585484eSchristos 		event_del(&rlim->refill_bucket_event);
6128585484eSchristos 	}
613b8ecfcfeSchristos 	event_assign(&rlim->refill_bucket_event, bev->ev_base,
614b8ecfcfeSchristos 	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
6158585484eSchristos 
6168585484eSchristos 	if (rlim->limit.read_limit > 0) {
6178585484eSchristos 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
6188585484eSchristos 	} else {
6198585484eSchristos 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
6208585484eSchristos 		suspended=1;
6218585484eSchristos 	}
6228585484eSchristos 	if (rlim->limit.write_limit > 0) {
6238585484eSchristos 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
6248585484eSchristos 	} else {
6258585484eSchristos 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
6268585484eSchristos 		suspended = 1;
6278585484eSchristos 	}
6288585484eSchristos 
6298585484eSchristos 	if (suspended)
6308585484eSchristos 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
6318585484eSchristos 
6328585484eSchristos 	r = 0;
6338585484eSchristos 
6348585484eSchristos done:
6358585484eSchristos 	BEV_UNLOCK(bev);
6368585484eSchristos 	return r;
6378585484eSchristos }
6388585484eSchristos 
6398585484eSchristos struct bufferevent_rate_limit_group *
6408585484eSchristos bufferevent_rate_limit_group_new(struct event_base *base,
6418585484eSchristos     const struct ev_token_bucket_cfg *cfg)
6428585484eSchristos {
6438585484eSchristos 	struct bufferevent_rate_limit_group *g;
6448585484eSchristos 	struct timeval now;
6458585484eSchristos 	ev_uint32_t tick;
6468585484eSchristos 
6478585484eSchristos 	event_base_gettimeofday_cached(base, &now);
6488585484eSchristos 	tick = ev_token_bucket_get_tick_(&now, cfg);
6498585484eSchristos 
6508585484eSchristos 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
6518585484eSchristos 	if (!g)
6528585484eSchristos 		return NULL;
6538585484eSchristos 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
6548585484eSchristos 	LIST_INIT(&g->members);
6558585484eSchristos 
6568585484eSchristos 	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
6578585484eSchristos 
658b8ecfcfeSchristos 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
6598585484eSchristos 	    bev_group_refill_callback_, g);
6608585484eSchristos 	/*XXXX handle event_add failure */
6618585484eSchristos 	event_add(&g->master_refill_event, &cfg->tick_timeout);
6628585484eSchristos 
6638585484eSchristos 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
6648585484eSchristos 
6658585484eSchristos 	bufferevent_rate_limit_group_set_min_share(g, 64);
6668585484eSchristos 
6678585484eSchristos 	evutil_weakrand_seed_(&g->weakrand_seed,
6688585484eSchristos 	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
6698585484eSchristos 
6708585484eSchristos 	return g;
6718585484eSchristos }
6728585484eSchristos 
6738585484eSchristos int
6748585484eSchristos bufferevent_rate_limit_group_set_cfg(
6758585484eSchristos 	struct bufferevent_rate_limit_group *g,
6768585484eSchristos 	const struct ev_token_bucket_cfg *cfg)
6778585484eSchristos {
6788585484eSchristos 	int same_tick;
6798585484eSchristos 	if (!g || !cfg)
6808585484eSchristos 		return -1;
6818585484eSchristos 
6828585484eSchristos 	LOCK_GROUP(g);
6838585484eSchristos 	same_tick = evutil_timercmp(
6848585484eSchristos 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
6858585484eSchristos 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
6868585484eSchristos 
6878585484eSchristos 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
6888585484eSchristos 		g->rate_limit.read_limit = cfg->read_maximum;
6898585484eSchristos 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
6908585484eSchristos 		g->rate_limit.write_limit = cfg->write_maximum;
6918585484eSchristos 
6928585484eSchristos 	if (!same_tick) {
6938585484eSchristos 		/* This can cause a hiccup in the schedule */
6948585484eSchristos 		event_add(&g->master_refill_event, &cfg->tick_timeout);
6958585484eSchristos 	}
6968585484eSchristos 
6978585484eSchristos 	/* The new limits might force us to adjust min_share differently. */
6988585484eSchristos 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
6998585484eSchristos 
7008585484eSchristos 	UNLOCK_GROUP(g);
7018585484eSchristos 	return 0;
7028585484eSchristos }
7038585484eSchristos 
7048585484eSchristos int
7058585484eSchristos bufferevent_rate_limit_group_set_min_share(
7068585484eSchristos 	struct bufferevent_rate_limit_group *g,
7078585484eSchristos 	size_t share)
7088585484eSchristos {
7098585484eSchristos 	if (share > EV_SSIZE_MAX)
7108585484eSchristos 		return -1;
7118585484eSchristos 
7128585484eSchristos 	g->configured_min_share = share;
7138585484eSchristos 
7148585484eSchristos 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
7158585484eSchristos 	 * state, at least one connection can go per tick. */
7168585484eSchristos 	if (share > g->rate_limit_cfg.read_rate)
7178585484eSchristos 		share = g->rate_limit_cfg.read_rate;
7188585484eSchristos 	if (share > g->rate_limit_cfg.write_rate)
7198585484eSchristos 		share = g->rate_limit_cfg.write_rate;
7208585484eSchristos 
7218585484eSchristos 	g->min_share = share;
7228585484eSchristos 	return 0;
7238585484eSchristos }
7248585484eSchristos 
7258585484eSchristos void
7268585484eSchristos bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
7278585484eSchristos {
7288585484eSchristos 	LOCK_GROUP(g);
7298585484eSchristos 	EVUTIL_ASSERT(0 == g->n_members);
7308585484eSchristos 	event_del(&g->master_refill_event);
7318585484eSchristos 	UNLOCK_GROUP(g);
7328585484eSchristos 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
7338585484eSchristos 	mm_free(g);
7348585484eSchristos }
7358585484eSchristos 
7368585484eSchristos int
7378585484eSchristos bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
7388585484eSchristos     struct bufferevent_rate_limit_group *g)
7398585484eSchristos {
7408585484eSchristos 	int wsuspend, rsuspend;
741*eabc0478Schristos 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
7428585484eSchristos 	BEV_LOCK(bev);
7438585484eSchristos 
7448585484eSchristos 	if (!bevp->rate_limiting) {
7458585484eSchristos 		struct bufferevent_rate_limit *rlim;
7468585484eSchristos 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
7478585484eSchristos 		if (!rlim) {
7488585484eSchristos 			BEV_UNLOCK(bev);
7498585484eSchristos 			return -1;
7508585484eSchristos 		}
751b8ecfcfeSchristos 		event_assign(&rlim->refill_bucket_event, bev->ev_base,
752b8ecfcfeSchristos 		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
7538585484eSchristos 		bevp->rate_limiting = rlim;
7548585484eSchristos 	}
7558585484eSchristos 
7568585484eSchristos 	if (bevp->rate_limiting->group == g) {
7578585484eSchristos 		BEV_UNLOCK(bev);
7588585484eSchristos 		return 0;
7598585484eSchristos 	}
7608585484eSchristos 	if (bevp->rate_limiting->group)
7618585484eSchristos 		bufferevent_remove_from_rate_limit_group(bev);
7628585484eSchristos 
7638585484eSchristos 	LOCK_GROUP(g);
7648585484eSchristos 	bevp->rate_limiting->group = g;
7658585484eSchristos 	++g->n_members;
7668585484eSchristos 	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
7678585484eSchristos 
7688585484eSchristos 	rsuspend = g->read_suspended;
7698585484eSchristos 	wsuspend = g->write_suspended;
7708585484eSchristos 
7718585484eSchristos 	UNLOCK_GROUP(g);
7728585484eSchristos 
7738585484eSchristos 	if (rsuspend)
7748585484eSchristos 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
7758585484eSchristos 	if (wsuspend)
7768585484eSchristos 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
7778585484eSchristos 
7788585484eSchristos 	BEV_UNLOCK(bev);
7798585484eSchristos 	return 0;
7808585484eSchristos }
7818585484eSchristos 
7828585484eSchristos int
7838585484eSchristos bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
7848585484eSchristos {
7858585484eSchristos 	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
7868585484eSchristos }
7878585484eSchristos 
7888585484eSchristos int
7898585484eSchristos bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
7908585484eSchristos     int unsuspend)
7918585484eSchristos {
792*eabc0478Schristos 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
7938585484eSchristos 	BEV_LOCK(bev);
7948585484eSchristos 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
7958585484eSchristos 		struct bufferevent_rate_limit_group *g =
7968585484eSchristos 		    bevp->rate_limiting->group;
7978585484eSchristos 		LOCK_GROUP(g);
7988585484eSchristos 		bevp->rate_limiting->group = NULL;
7998585484eSchristos 		--g->n_members;
8008585484eSchristos 		LIST_REMOVE(bevp, rate_limiting->next_in_group);
8018585484eSchristos 		UNLOCK_GROUP(g);
8028585484eSchristos 	}
8038585484eSchristos 	if (unsuspend) {
8048585484eSchristos 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
8058585484eSchristos 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
8068585484eSchristos 	}
8078585484eSchristos 	BEV_UNLOCK(bev);
8088585484eSchristos 	return 0;
8098585484eSchristos }
8108585484eSchristos 
8118585484eSchristos /* ===
8128585484eSchristos  * API functions to expose rate limits.
8138585484eSchristos  *
8148585484eSchristos  * Don't use these from inside Libevent; they're meant to be for use by
8158585484eSchristos  * the program.
8168585484eSchristos  * === */
8178585484eSchristos 
8188585484eSchristos /* Mostly you don't want to use this function from inside libevent;
8198585484eSchristos  * bufferevent_get_read_max_() is more likely what you want*/
8208585484eSchristos ev_ssize_t
8218585484eSchristos bufferevent_get_read_limit(struct bufferevent *bev)
8228585484eSchristos {
8238585484eSchristos 	ev_ssize_t r;
8248585484eSchristos 	struct bufferevent_private *bevp;
8258585484eSchristos 	BEV_LOCK(bev);
8268585484eSchristos 	bevp = BEV_UPCAST(bev);
8278585484eSchristos 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
8288585484eSchristos 		bufferevent_update_buckets(bevp);
8298585484eSchristos 		r = bevp->rate_limiting->limit.read_limit;
8308585484eSchristos 	} else {
8318585484eSchristos 		r = EV_SSIZE_MAX;
8328585484eSchristos 	}
8338585484eSchristos 	BEV_UNLOCK(bev);
8348585484eSchristos 	return r;
8358585484eSchristos }
8368585484eSchristos 
8378585484eSchristos /* Mostly you don't want to use this function from inside libevent;
8388585484eSchristos  * bufferevent_get_write_max_() is more likely what you want*/
8398585484eSchristos ev_ssize_t
8408585484eSchristos bufferevent_get_write_limit(struct bufferevent *bev)
8418585484eSchristos {
8428585484eSchristos 	ev_ssize_t r;
8438585484eSchristos 	struct bufferevent_private *bevp;
8448585484eSchristos 	BEV_LOCK(bev);
8458585484eSchristos 	bevp = BEV_UPCAST(bev);
8468585484eSchristos 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
8478585484eSchristos 		bufferevent_update_buckets(bevp);
8488585484eSchristos 		r = bevp->rate_limiting->limit.write_limit;
8498585484eSchristos 	} else {
8508585484eSchristos 		r = EV_SSIZE_MAX;
8518585484eSchristos 	}
8528585484eSchristos 	BEV_UNLOCK(bev);
8538585484eSchristos 	return r;
8548585484eSchristos }
8558585484eSchristos 
8568585484eSchristos int
8578585484eSchristos bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
8588585484eSchristos {
8598585484eSchristos 	struct bufferevent_private *bevp;
8608585484eSchristos 	BEV_LOCK(bev);
8618585484eSchristos 	bevp = BEV_UPCAST(bev);
8628585484eSchristos 	if (size == 0 || size > EV_SSIZE_MAX)
8638585484eSchristos 		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
8648585484eSchristos 	else
8658585484eSchristos 		bevp->max_single_read = size;
8668585484eSchristos 	BEV_UNLOCK(bev);
8678585484eSchristos 	return 0;
8688585484eSchristos }
8698585484eSchristos 
8708585484eSchristos int
8718585484eSchristos bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
8728585484eSchristos {
8738585484eSchristos 	struct bufferevent_private *bevp;
8748585484eSchristos 	BEV_LOCK(bev);
8758585484eSchristos 	bevp = BEV_UPCAST(bev);
8768585484eSchristos 	if (size == 0 || size > EV_SSIZE_MAX)
8778585484eSchristos 		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
8788585484eSchristos 	else
8798585484eSchristos 		bevp->max_single_write = size;
8808585484eSchristos 	BEV_UNLOCK(bev);
8818585484eSchristos 	return 0;
8828585484eSchristos }
8838585484eSchristos 
8848585484eSchristos ev_ssize_t
8858585484eSchristos bufferevent_get_max_single_read(struct bufferevent *bev)
8868585484eSchristos {
8878585484eSchristos 	ev_ssize_t r;
8888585484eSchristos 
8898585484eSchristos 	BEV_LOCK(bev);
8908585484eSchristos 	r = BEV_UPCAST(bev)->max_single_read;
8918585484eSchristos 	BEV_UNLOCK(bev);
8928585484eSchristos 	return r;
8938585484eSchristos }
8948585484eSchristos 
8958585484eSchristos ev_ssize_t
8968585484eSchristos bufferevent_get_max_single_write(struct bufferevent *bev)
8978585484eSchristos {
8988585484eSchristos 	ev_ssize_t r;
8998585484eSchristos 
9008585484eSchristos 	BEV_LOCK(bev);
9018585484eSchristos 	r = BEV_UPCAST(bev)->max_single_write;
9028585484eSchristos 	BEV_UNLOCK(bev);
9038585484eSchristos 	return r;
9048585484eSchristos }
9058585484eSchristos 
9068585484eSchristos ev_ssize_t
9078585484eSchristos bufferevent_get_max_to_read(struct bufferevent *bev)
9088585484eSchristos {
9098585484eSchristos 	ev_ssize_t r;
9108585484eSchristos 	BEV_LOCK(bev);
9118585484eSchristos 	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
9128585484eSchristos 	BEV_UNLOCK(bev);
9138585484eSchristos 	return r;
9148585484eSchristos }
9158585484eSchristos 
9168585484eSchristos ev_ssize_t
9178585484eSchristos bufferevent_get_max_to_write(struct bufferevent *bev)
9188585484eSchristos {
9198585484eSchristos 	ev_ssize_t r;
9208585484eSchristos 	BEV_LOCK(bev);
9218585484eSchristos 	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
9228585484eSchristos 	BEV_UNLOCK(bev);
9238585484eSchristos 	return r;
9248585484eSchristos }
9258585484eSchristos 
926b8ecfcfeSchristos const struct ev_token_bucket_cfg *
927b8ecfcfeSchristos bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
928b8ecfcfeSchristos 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
929b8ecfcfeSchristos 	struct ev_token_bucket_cfg *cfg;
930b8ecfcfeSchristos 
931b8ecfcfeSchristos 	BEV_LOCK(bev);
932b8ecfcfeSchristos 
933b8ecfcfeSchristos 	if (bufev_private->rate_limiting) {
934b8ecfcfeSchristos 		cfg = bufev_private->rate_limiting->cfg;
935b8ecfcfeSchristos 	} else {
936b8ecfcfeSchristos 		cfg = NULL;
937b8ecfcfeSchristos 	}
938b8ecfcfeSchristos 
939b8ecfcfeSchristos 	BEV_UNLOCK(bev);
940b8ecfcfeSchristos 
941b8ecfcfeSchristos 	return cfg;
942b8ecfcfeSchristos }
9438585484eSchristos 
9448585484eSchristos /* Mostly you don't want to use this function from inside libevent;
9458585484eSchristos  * bufferevent_get_read_max_() is more likely what you want*/
9468585484eSchristos ev_ssize_t
9478585484eSchristos bufferevent_rate_limit_group_get_read_limit(
9488585484eSchristos 	struct bufferevent_rate_limit_group *grp)
9498585484eSchristos {
9508585484eSchristos 	ev_ssize_t r;
9518585484eSchristos 	LOCK_GROUP(grp);
9528585484eSchristos 	r = grp->rate_limit.read_limit;
9538585484eSchristos 	UNLOCK_GROUP(grp);
9548585484eSchristos 	return r;
9558585484eSchristos }
9568585484eSchristos 
9578585484eSchristos /* Mostly you don't want to use this function from inside libevent;
9588585484eSchristos  * bufferevent_get_write_max_() is more likely what you want. */
9598585484eSchristos ev_ssize_t
9608585484eSchristos bufferevent_rate_limit_group_get_write_limit(
9618585484eSchristos 	struct bufferevent_rate_limit_group *grp)
9628585484eSchristos {
9638585484eSchristos 	ev_ssize_t r;
9648585484eSchristos 	LOCK_GROUP(grp);
9658585484eSchristos 	r = grp->rate_limit.write_limit;
9668585484eSchristos 	UNLOCK_GROUP(grp);
9678585484eSchristos 	return r;
9688585484eSchristos }
9698585484eSchristos 
9708585484eSchristos int
9718585484eSchristos bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
9728585484eSchristos {
9738585484eSchristos 	int r = 0;
9748585484eSchristos 	ev_ssize_t old_limit, new_limit;
9758585484eSchristos 	struct bufferevent_private *bevp;
9768585484eSchristos 	BEV_LOCK(bev);
9778585484eSchristos 	bevp = BEV_UPCAST(bev);
9788585484eSchristos 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
9798585484eSchristos 	old_limit = bevp->rate_limiting->limit.read_limit;
9808585484eSchristos 
9818585484eSchristos 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
9828585484eSchristos 	if (old_limit > 0 && new_limit <= 0) {
9838585484eSchristos 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
9848585484eSchristos 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
9858585484eSchristos 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
9868585484eSchristos 			r = -1;
9878585484eSchristos 	} else if (old_limit <= 0 && new_limit > 0) {
9888585484eSchristos 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
9898585484eSchristos 			event_del(&bevp->rate_limiting->refill_bucket_event);
9908585484eSchristos 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
9918585484eSchristos 	}
9928585484eSchristos 
9938585484eSchristos 	BEV_UNLOCK(bev);
9948585484eSchristos 	return r;
9958585484eSchristos }
9968585484eSchristos 
9978585484eSchristos int
9988585484eSchristos bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
9998585484eSchristos {
10008585484eSchristos 	/* XXXX this is mostly copy-and-paste from
10018585484eSchristos 	 * bufferevent_decrement_read_limit */
10028585484eSchristos 	int r = 0;
10038585484eSchristos 	ev_ssize_t old_limit, new_limit;
10048585484eSchristos 	struct bufferevent_private *bevp;
10058585484eSchristos 	BEV_LOCK(bev);
10068585484eSchristos 	bevp = BEV_UPCAST(bev);
10078585484eSchristos 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
10088585484eSchristos 	old_limit = bevp->rate_limiting->limit.write_limit;
10098585484eSchristos 
10108585484eSchristos 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
10118585484eSchristos 	if (old_limit > 0 && new_limit <= 0) {
10128585484eSchristos 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
10138585484eSchristos 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
10148585484eSchristos 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
10158585484eSchristos 			r = -1;
10168585484eSchristos 	} else if (old_limit <= 0 && new_limit > 0) {
10178585484eSchristos 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
10188585484eSchristos 			event_del(&bevp->rate_limiting->refill_bucket_event);
10198585484eSchristos 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
10208585484eSchristos 	}
10218585484eSchristos 
10228585484eSchristos 	BEV_UNLOCK(bev);
10238585484eSchristos 	return r;
10248585484eSchristos }
10258585484eSchristos 
10268585484eSchristos int
10278585484eSchristos bufferevent_rate_limit_group_decrement_read(
10288585484eSchristos 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
10298585484eSchristos {
10308585484eSchristos 	int r = 0;
10318585484eSchristos 	ev_ssize_t old_limit, new_limit;
10328585484eSchristos 	LOCK_GROUP(grp);
10338585484eSchristos 	old_limit = grp->rate_limit.read_limit;
10348585484eSchristos 	new_limit = (grp->rate_limit.read_limit -= decr);
10358585484eSchristos 
10368585484eSchristos 	if (old_limit > 0 && new_limit <= 0) {
10378585484eSchristos 		bev_group_suspend_reading_(grp);
10388585484eSchristos 	} else if (old_limit <= 0 && new_limit > 0) {
10398585484eSchristos 		bev_group_unsuspend_reading_(grp);
10408585484eSchristos 	}
10418585484eSchristos 
10428585484eSchristos 	UNLOCK_GROUP(grp);
10438585484eSchristos 	return r;
10448585484eSchristos }
10458585484eSchristos 
10468585484eSchristos int
10478585484eSchristos bufferevent_rate_limit_group_decrement_write(
10488585484eSchristos 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
10498585484eSchristos {
10508585484eSchristos 	int r = 0;
10518585484eSchristos 	ev_ssize_t old_limit, new_limit;
10528585484eSchristos 	LOCK_GROUP(grp);
10538585484eSchristos 	old_limit = grp->rate_limit.write_limit;
10548585484eSchristos 	new_limit = (grp->rate_limit.write_limit -= decr);
10558585484eSchristos 
10568585484eSchristos 	if (old_limit > 0 && new_limit <= 0) {
10578585484eSchristos 		bev_group_suspend_writing_(grp);
10588585484eSchristos 	} else if (old_limit <= 0 && new_limit > 0) {
10598585484eSchristos 		bev_group_unsuspend_writing_(grp);
10608585484eSchristos 	}
10618585484eSchristos 
10628585484eSchristos 	UNLOCK_GROUP(grp);
10638585484eSchristos 	return r;
10648585484eSchristos }
10658585484eSchristos 
10668585484eSchristos void
10678585484eSchristos bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
10688585484eSchristos     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
10698585484eSchristos {
10708585484eSchristos 	EVUTIL_ASSERT(grp != NULL);
10718585484eSchristos 	if (total_read_out)
10728585484eSchristos 		*total_read_out = grp->total_read;
10738585484eSchristos 	if (total_written_out)
10748585484eSchristos 		*total_written_out = grp->total_written;
10758585484eSchristos }
10768585484eSchristos 
10778585484eSchristos void
10788585484eSchristos bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
10798585484eSchristos {
10808585484eSchristos 	grp->total_read = grp->total_written = 0;
10818585484eSchristos }
10828585484eSchristos 
10838585484eSchristos int
10848585484eSchristos bufferevent_ratelim_init_(struct bufferevent_private *bev)
10858585484eSchristos {
10868585484eSchristos 	bev->rate_limiting = NULL;
10878585484eSchristos 	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
10888585484eSchristos 	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
10898585484eSchristos 
10908585484eSchristos 	return 0;
10918585484eSchristos }
1092