1*eabc0478Schristos /* $NetBSD: bufferevent_ratelim.c,v 1.6 2024/08/18 20:47:21 christos Exp $ */ 28585484eSchristos 38585484eSchristos /* 48585484eSchristos * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 58585484eSchristos * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 68585484eSchristos * All rights reserved. 78585484eSchristos * 88585484eSchristos * Redistribution and use in source and binary forms, with or without 98585484eSchristos * modification, are permitted provided that the following conditions 108585484eSchristos * are met: 118585484eSchristos * 1. Redistributions of source code must retain the above copyright 128585484eSchristos * notice, this list of conditions and the following disclaimer. 138585484eSchristos * 2. Redistributions in binary form must reproduce the above copyright 148585484eSchristos * notice, this list of conditions and the following disclaimer in the 158585484eSchristos * documentation and/or other materials provided with the distribution. 168585484eSchristos * 3. The name of the author may not be used to endorse or promote products 178585484eSchristos * derived from this software without specific prior written permission. 188585484eSchristos * 198585484eSchristos * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 208585484eSchristos * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 218585484eSchristos * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 228585484eSchristos * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 238585484eSchristos * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 248585484eSchristos * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 258585484eSchristos * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 268585484eSchristos * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 278585484eSchristos * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 288585484eSchristos * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 298585484eSchristos */ 308585484eSchristos #include "evconfig-private.h" 318585484eSchristos 328585484eSchristos #include <sys/types.h> 338585484eSchristos #include <limits.h> 348585484eSchristos #include <string.h> 358585484eSchristos #include <stdlib.h> 368585484eSchristos 378585484eSchristos #include "event2/event.h" 388585484eSchristos #include "event2/event_struct.h" 398585484eSchristos #include "event2/util.h" 408585484eSchristos #include "event2/bufferevent.h" 418585484eSchristos #include "event2/bufferevent_struct.h" 428585484eSchristos #include "event2/buffer.h" 438585484eSchristos 448585484eSchristos #include "ratelim-internal.h" 458585484eSchristos 468585484eSchristos #include "bufferevent-internal.h" 478585484eSchristos #include "mm-internal.h" 488585484eSchristos #include "util-internal.h" 498585484eSchristos #include "event-internal.h" 508585484eSchristos 518585484eSchristos int 528585484eSchristos ev_token_bucket_init_(struct ev_token_bucket *bucket, 538585484eSchristos const struct ev_token_bucket_cfg *cfg, 548585484eSchristos ev_uint32_t current_tick, 558585484eSchristos int reinitialize) 568585484eSchristos { 578585484eSchristos if (reinitialize) { 588585484eSchristos /* on reinitialization, we only clip downwards, since we've 598585484eSchristos already used who-knows-how-much bandwidth this tick. We 608585484eSchristos leave "last_updated" as it is; the next update will add the 618585484eSchristos appropriate amount of bandwidth to the bucket. 628585484eSchristos */ 638585484eSchristos if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) 648585484eSchristos bucket->read_limit = cfg->read_maximum; 658585484eSchristos if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) 668585484eSchristos bucket->write_limit = cfg->write_maximum; 678585484eSchristos } else { 688585484eSchristos bucket->read_limit = cfg->read_rate; 698585484eSchristos bucket->write_limit = cfg->write_rate; 708585484eSchristos bucket->last_updated = current_tick; 718585484eSchristos } 728585484eSchristos return 0; 738585484eSchristos } 748585484eSchristos 758585484eSchristos int 768585484eSchristos ev_token_bucket_update_(struct ev_token_bucket *bucket, 778585484eSchristos const struct ev_token_bucket_cfg *cfg, 788585484eSchristos ev_uint32_t current_tick) 798585484eSchristos { 808585484eSchristos /* It's okay if the tick number overflows, since we'll just 818585484eSchristos * wrap around when we do the unsigned substraction. */ 828585484eSchristos unsigned n_ticks = current_tick - bucket->last_updated; 838585484eSchristos 848585484eSchristos /* Make sure some ticks actually happened, and that time didn't 858585484eSchristos * roll back. */ 868585484eSchristos if (n_ticks == 0 || n_ticks > INT_MAX) 878585484eSchristos return 0; 888585484eSchristos 898585484eSchristos /* Naively, we would say 908585484eSchristos bucket->limit += n_ticks * cfg->rate; 918585484eSchristos 928585484eSchristos if (bucket->limit > cfg->maximum) 938585484eSchristos bucket->limit = cfg->maximum; 948585484eSchristos 958585484eSchristos But we're worried about overflow, so we do it like this: 968585484eSchristos */ 978585484eSchristos 988585484eSchristos if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) 998585484eSchristos bucket->read_limit = cfg->read_maximum; 1008585484eSchristos else 1018585484eSchristos bucket->read_limit += n_ticks * cfg->read_rate; 1028585484eSchristos 1038585484eSchristos 1048585484eSchristos if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) 1058585484eSchristos bucket->write_limit = cfg->write_maximum; 1068585484eSchristos else 1078585484eSchristos bucket->write_limit += n_ticks * cfg->write_rate; 1088585484eSchristos 1098585484eSchristos 1108585484eSchristos bucket->last_updated = current_tick; 1118585484eSchristos 1128585484eSchristos return 1; 1138585484eSchristos } 1148585484eSchristos 1158585484eSchristos static inline void 1168585484eSchristos bufferevent_update_buckets(struct bufferevent_private *bev) 1178585484eSchristos { 1188585484eSchristos /* Must hold lock on bev. */ 1198585484eSchristos struct timeval now; 1208585484eSchristos unsigned tick; 1218585484eSchristos event_base_gettimeofday_cached(bev->bev.ev_base, &now); 1228585484eSchristos tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg); 1238585484eSchristos if (tick != bev->rate_limiting->limit.last_updated) 1248585484eSchristos ev_token_bucket_update_(&bev->rate_limiting->limit, 1258585484eSchristos bev->rate_limiting->cfg, tick); 1268585484eSchristos } 1278585484eSchristos 1288585484eSchristos ev_uint32_t 1298585484eSchristos ev_token_bucket_get_tick_(const struct timeval *tv, 1308585484eSchristos const struct ev_token_bucket_cfg *cfg) 1318585484eSchristos { 1328585484eSchristos /* This computation uses two multiplies and a divide. We could do 1338585484eSchristos * fewer if we knew that the tick length was an integer number of 1348585484eSchristos * seconds, or if we knew it divided evenly into a second. We should 1358585484eSchristos * investigate that more. 1368585484eSchristos */ 1378585484eSchristos 1388585484eSchristos /* We cast to an ev_uint64_t first, since we don't want to overflow 1398585484eSchristos * before we do the final divide. */ 1408585484eSchristos ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; 1418585484eSchristos return (unsigned)(msec / cfg->msec_per_tick); 1428585484eSchristos } 1438585484eSchristos 1448585484eSchristos struct ev_token_bucket_cfg * 1458585484eSchristos ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, 1468585484eSchristos size_t write_rate, size_t write_burst, 1478585484eSchristos const struct timeval *tick_len) 1488585484eSchristos { 1498585484eSchristos struct ev_token_bucket_cfg *r; 1508585484eSchristos struct timeval g; 1518585484eSchristos if (! tick_len) { 1528585484eSchristos g.tv_sec = 1; 1538585484eSchristos g.tv_usec = 0; 1548585484eSchristos tick_len = &g; 1558585484eSchristos } 1568585484eSchristos if (read_rate > read_burst || write_rate > write_burst || 1578585484eSchristos read_rate < 1 || write_rate < 1) 1588585484eSchristos return NULL; 1598585484eSchristos if (read_rate > EV_RATE_LIMIT_MAX || 1608585484eSchristos write_rate > EV_RATE_LIMIT_MAX || 1618585484eSchristos read_burst > EV_RATE_LIMIT_MAX || 1628585484eSchristos write_burst > EV_RATE_LIMIT_MAX) 1638585484eSchristos return NULL; 1648585484eSchristos r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); 1658585484eSchristos if (!r) 1668585484eSchristos return NULL; 1678585484eSchristos r->read_rate = read_rate; 1688585484eSchristos r->write_rate = write_rate; 1698585484eSchristos r->read_maximum = read_burst; 1708585484eSchristos r->write_maximum = write_burst; 1718585484eSchristos memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); 1728585484eSchristos r->msec_per_tick = (tick_len->tv_sec * 1000) + 1738585484eSchristos (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; 1748585484eSchristos return r; 1758585484eSchristos } 1768585484eSchristos 1778585484eSchristos void 1788585484eSchristos ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) 1798585484eSchristos { 1808585484eSchristos mm_free(cfg); 1818585484eSchristos } 1828585484eSchristos 1838585484eSchristos /* Default values for max_single_read & max_single_write variables. */ 1848585484eSchristos #define MAX_SINGLE_READ_DEFAULT 16384 1858585484eSchristos #define MAX_SINGLE_WRITE_DEFAULT 16384 1868585484eSchristos 1878585484eSchristos #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) 1888585484eSchristos #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) 1898585484eSchristos 1908585484eSchristos static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g); 1918585484eSchristos static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g); 1928585484eSchristos static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g); 1938585484eSchristos static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g); 1948585484eSchristos 1958585484eSchristos /** Helper: figure out the maximum amount we should write if is_write, or 1968585484eSchristos the maximum amount we should read if is_read. Return that maximum, or 1978585484eSchristos 0 if our bucket is wholly exhausted. 1988585484eSchristos */ 1998585484eSchristos static inline ev_ssize_t 2008585484eSchristos bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write) 2018585484eSchristos { 2028585484eSchristos /* needs lock on bev. */ 2038585484eSchristos ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read; 2048585484eSchristos 2058585484eSchristos #define LIM(x) \ 2068585484eSchristos (is_write ? (x).write_limit : (x).read_limit) 2078585484eSchristos 2088585484eSchristos #define GROUP_SUSPENDED(g) \ 2098585484eSchristos (is_write ? (g)->write_suspended : (g)->read_suspended) 2108585484eSchristos 2118585484eSchristos /* Sets max_so_far to MIN(x, max_so_far) */ 2128585484eSchristos #define CLAMPTO(x) \ 2138585484eSchristos do { \ 2148585484eSchristos if (max_so_far > (x)) \ 2158585484eSchristos max_so_far = (x); \ 2168585484eSchristos } while (0); 2178585484eSchristos 2188585484eSchristos if (!bev->rate_limiting) 2198585484eSchristos return max_so_far; 2208585484eSchristos 2218585484eSchristos /* If rate-limiting is enabled at all, update the appropriate 2228585484eSchristos bucket, and take the smaller of our rate limit and the group 2238585484eSchristos rate limit. 2248585484eSchristos */ 2258585484eSchristos 2268585484eSchristos if (bev->rate_limiting->cfg) { 2278585484eSchristos bufferevent_update_buckets(bev); 2288585484eSchristos max_so_far = LIM(bev->rate_limiting->limit); 2298585484eSchristos } 2308585484eSchristos if (bev->rate_limiting->group) { 2318585484eSchristos struct bufferevent_rate_limit_group *g = 2328585484eSchristos bev->rate_limiting->group; 2338585484eSchristos ev_ssize_t share; 2348585484eSchristos LOCK_GROUP(g); 2358585484eSchristos if (GROUP_SUSPENDED(g)) { 2368585484eSchristos /* We can get here if we failed to lock this 2378585484eSchristos * particular bufferevent while suspending the whole 2388585484eSchristos * group. */ 2398585484eSchristos if (is_write) 2408585484eSchristos bufferevent_suspend_write_(&bev->bev, 2418585484eSchristos BEV_SUSPEND_BW_GROUP); 2428585484eSchristos else 2438585484eSchristos bufferevent_suspend_read_(&bev->bev, 2448585484eSchristos BEV_SUSPEND_BW_GROUP); 2458585484eSchristos share = 0; 2468585484eSchristos } else { 2478585484eSchristos /* XXXX probably we should divide among the active 2488585484eSchristos * members, not the total members. */ 2498585484eSchristos share = LIM(g->rate_limit) / g->n_members; 2508585484eSchristos if (share < g->min_share) 2518585484eSchristos share = g->min_share; 2528585484eSchristos } 2538585484eSchristos UNLOCK_GROUP(g); 2548585484eSchristos CLAMPTO(share); 2558585484eSchristos } 2568585484eSchristos 2578585484eSchristos if (max_so_far < 0) 2588585484eSchristos max_so_far = 0; 2598585484eSchristos return max_so_far; 2608585484eSchristos } 2618585484eSchristos 2628585484eSchristos ev_ssize_t 2638585484eSchristos bufferevent_get_read_max_(struct bufferevent_private *bev) 2648585484eSchristos { 2658585484eSchristos return bufferevent_get_rlim_max_(bev, 0); 2668585484eSchristos } 2678585484eSchristos 2688585484eSchristos ev_ssize_t 2698585484eSchristos bufferevent_get_write_max_(struct bufferevent_private *bev) 2708585484eSchristos { 2718585484eSchristos return bufferevent_get_rlim_max_(bev, 1); 2728585484eSchristos } 2738585484eSchristos 2748585484eSchristos int 2758585484eSchristos bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 2768585484eSchristos { 2778585484eSchristos /* XXXXX Make sure all users of this function check its return value */ 2788585484eSchristos int r = 0; 2798585484eSchristos /* need to hold lock on bev */ 2808585484eSchristos if (!bev->rate_limiting) 2818585484eSchristos return 0; 2828585484eSchristos 2838585484eSchristos if (bev->rate_limiting->cfg) { 2848585484eSchristos bev->rate_limiting->limit.read_limit -= bytes; 2858585484eSchristos if (bev->rate_limiting->limit.read_limit <= 0) { 2868585484eSchristos bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW); 2878585484eSchristos if (event_add(&bev->rate_limiting->refill_bucket_event, 2888585484eSchristos &bev->rate_limiting->cfg->tick_timeout) < 0) 2898585484eSchristos r = -1; 2908585484eSchristos } else if (bev->read_suspended & BEV_SUSPEND_BW) { 2918585484eSchristos if (!(bev->write_suspended & BEV_SUSPEND_BW)) 2928585484eSchristos event_del(&bev->rate_limiting->refill_bucket_event); 2938585484eSchristos bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 2948585484eSchristos } 2958585484eSchristos } 2968585484eSchristos 2978585484eSchristos if (bev->rate_limiting->group) { 2988585484eSchristos LOCK_GROUP(bev->rate_limiting->group); 2998585484eSchristos bev->rate_limiting->group->rate_limit.read_limit -= bytes; 3008585484eSchristos bev->rate_limiting->group->total_read += bytes; 3018585484eSchristos if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { 3028585484eSchristos bev_group_suspend_reading_(bev->rate_limiting->group); 3038585484eSchristos } else if (bev->rate_limiting->group->read_suspended) { 3048585484eSchristos bev_group_unsuspend_reading_(bev->rate_limiting->group); 3058585484eSchristos } 3068585484eSchristos UNLOCK_GROUP(bev->rate_limiting->group); 3078585484eSchristos } 3088585484eSchristos 3098585484eSchristos return r; 3108585484eSchristos } 3118585484eSchristos 3128585484eSchristos int 3138585484eSchristos bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 3148585484eSchristos { 3158585484eSchristos /* XXXXX Make sure all users of this function check its return value */ 3168585484eSchristos int r = 0; 3178585484eSchristos /* need to hold lock */ 3188585484eSchristos if (!bev->rate_limiting) 3198585484eSchristos return 0; 3208585484eSchristos 3218585484eSchristos if (bev->rate_limiting->cfg) { 3228585484eSchristos bev->rate_limiting->limit.write_limit -= bytes; 3238585484eSchristos if (bev->rate_limiting->limit.write_limit <= 0) { 3248585484eSchristos bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW); 3258585484eSchristos if (event_add(&bev->rate_limiting->refill_bucket_event, 3268585484eSchristos &bev->rate_limiting->cfg->tick_timeout) < 0) 3278585484eSchristos r = -1; 3288585484eSchristos } else if (bev->write_suspended & BEV_SUSPEND_BW) { 3298585484eSchristos if (!(bev->read_suspended & BEV_SUSPEND_BW)) 3308585484eSchristos event_del(&bev->rate_limiting->refill_bucket_event); 3318585484eSchristos bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 3328585484eSchristos } 3338585484eSchristos } 3348585484eSchristos 3358585484eSchristos if (bev->rate_limiting->group) { 3368585484eSchristos LOCK_GROUP(bev->rate_limiting->group); 3378585484eSchristos bev->rate_limiting->group->rate_limit.write_limit -= bytes; 3388585484eSchristos bev->rate_limiting->group->total_written += bytes; 3398585484eSchristos if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { 3408585484eSchristos bev_group_suspend_writing_(bev->rate_limiting->group); 3418585484eSchristos } else if (bev->rate_limiting->group->write_suspended) { 3428585484eSchristos bev_group_unsuspend_writing_(bev->rate_limiting->group); 3438585484eSchristos } 3448585484eSchristos UNLOCK_GROUP(bev->rate_limiting->group); 3458585484eSchristos } 3468585484eSchristos 3478585484eSchristos return r; 3488585484eSchristos } 3498585484eSchristos 3508585484eSchristos /** Stop reading on every bufferevent in <b>g</b> */ 3518585484eSchristos static int 3528585484eSchristos bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g) 3538585484eSchristos { 3548585484eSchristos /* Needs group lock */ 3558585484eSchristos struct bufferevent_private *bev; 3568585484eSchristos g->read_suspended = 1; 3578585484eSchristos g->pending_unsuspend_read = 0; 3588585484eSchristos 3598585484eSchristos /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK, 3608585484eSchristos to prevent a deadlock. (Ordinarily, the group lock nests inside 3618585484eSchristos the bufferevent locks. If we are unable to lock any individual 3628585484eSchristos bufferevent, it will find out later when it looks at its limit 3638585484eSchristos and sees that its group is suspended.) 3648585484eSchristos */ 3658585484eSchristos LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 3668585484eSchristos if (EVLOCK_TRY_LOCK_(bev->lock)) { 3678585484eSchristos bufferevent_suspend_read_(&bev->bev, 3688585484eSchristos BEV_SUSPEND_BW_GROUP); 3698585484eSchristos EVLOCK_UNLOCK(bev->lock, 0); 3708585484eSchristos } 3718585484eSchristos } 3728585484eSchristos return 0; 3738585484eSchristos } 3748585484eSchristos 3758585484eSchristos /** Stop writing on every bufferevent in <b>g</b> */ 3768585484eSchristos static int 3778585484eSchristos bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g) 3788585484eSchristos { 3798585484eSchristos /* Needs group lock */ 3808585484eSchristos struct bufferevent_private *bev; 3818585484eSchristos g->write_suspended = 1; 3828585484eSchristos g->pending_unsuspend_write = 0; 3838585484eSchristos LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 3848585484eSchristos if (EVLOCK_TRY_LOCK_(bev->lock)) { 3858585484eSchristos bufferevent_suspend_write_(&bev->bev, 3868585484eSchristos BEV_SUSPEND_BW_GROUP); 3878585484eSchristos EVLOCK_UNLOCK(bev->lock, 0); 3888585484eSchristos } 3898585484eSchristos } 3908585484eSchristos return 0; 3918585484eSchristos } 3928585484eSchristos 3938585484eSchristos /** Timer callback invoked on a single bufferevent with one or more exhausted 3948585484eSchristos buckets when they are ready to refill. */ 3958585484eSchristos static void 3968585484eSchristos bev_refill_callback_(evutil_socket_t fd, short what, void *arg) 3978585484eSchristos { 3988585484eSchristos unsigned tick; 3998585484eSchristos struct timeval now; 4008585484eSchristos struct bufferevent_private *bev = arg; 4018585484eSchristos int again = 0; 4028585484eSchristos BEV_LOCK(&bev->bev); 4038585484eSchristos if (!bev->rate_limiting || !bev->rate_limiting->cfg) { 4048585484eSchristos BEV_UNLOCK(&bev->bev); 4058585484eSchristos return; 4068585484eSchristos } 4078585484eSchristos 4088585484eSchristos /* First, update the bucket */ 4098585484eSchristos event_base_gettimeofday_cached(bev->bev.ev_base, &now); 4108585484eSchristos tick = ev_token_bucket_get_tick_(&now, 4118585484eSchristos bev->rate_limiting->cfg); 4128585484eSchristos ev_token_bucket_update_(&bev->rate_limiting->limit, 4138585484eSchristos bev->rate_limiting->cfg, 4148585484eSchristos tick); 4158585484eSchristos 4168585484eSchristos /* Now unsuspend any read/write operations as appropriate. */ 4178585484eSchristos if ((bev->read_suspended & BEV_SUSPEND_BW)) { 4188585484eSchristos if (bev->rate_limiting->limit.read_limit > 0) 4198585484eSchristos bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 4208585484eSchristos else 4218585484eSchristos again = 1; 4228585484eSchristos } 4238585484eSchristos if ((bev->write_suspended & BEV_SUSPEND_BW)) { 4248585484eSchristos if (bev->rate_limiting->limit.write_limit > 0) 4258585484eSchristos bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 4268585484eSchristos else 4278585484eSchristos again = 1; 4288585484eSchristos } 4298585484eSchristos if (again) { 4308585484eSchristos /* One or more of the buckets may need another refill if they 4318585484eSchristos started negative. 4328585484eSchristos 4338585484eSchristos XXXX if we need to be quiet for more ticks, we should 4348585484eSchristos maybe figure out what timeout we really want. 4358585484eSchristos */ 4368585484eSchristos /* XXXX Handle event_add failure somehow */ 4378585484eSchristos event_add(&bev->rate_limiting->refill_bucket_event, 4388585484eSchristos &bev->rate_limiting->cfg->tick_timeout); 4398585484eSchristos } 4408585484eSchristos BEV_UNLOCK(&bev->bev); 4418585484eSchristos } 4428585484eSchristos 4438585484eSchristos /** Helper: grab a random element from a bufferevent group. 4448585484eSchristos * 4458585484eSchristos * Requires that we hold the lock on the group. 4468585484eSchristos */ 4478585484eSchristos static struct bufferevent_private * 4488585484eSchristos bev_group_random_element_(struct bufferevent_rate_limit_group *group) 4498585484eSchristos { 4508585484eSchristos int which; 4518585484eSchristos struct bufferevent_private *bev; 4528585484eSchristos 4538585484eSchristos /* requires group lock */ 4548585484eSchristos 4558585484eSchristos if (!group->n_members) 4568585484eSchristos return NULL; 4578585484eSchristos 4588585484eSchristos EVUTIL_ASSERT(! LIST_EMPTY(&group->members)); 4598585484eSchristos 4608585484eSchristos which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members); 4618585484eSchristos 4628585484eSchristos bev = LIST_FIRST(&group->members); 4638585484eSchristos while (which--) 4648585484eSchristos bev = LIST_NEXT(bev, rate_limiting->next_in_group); 4658585484eSchristos 4668585484eSchristos return bev; 4678585484eSchristos } 4688585484eSchristos 4698585484eSchristos /** Iterate over the elements of a rate-limiting group 'g' with a random 4708585484eSchristos starting point, assigning each to the variable 'bev', and executing the 4718585484eSchristos block 'block'. 4728585484eSchristos 4738585484eSchristos We do this in a half-baked effort to get fairness among group members. 4748585484eSchristos XXX Round-robin or some kind of priority queue would be even more fair. 4758585484eSchristos */ 4768585484eSchristos #define FOREACH_RANDOM_ORDER(block) \ 4778585484eSchristos do { \ 4788585484eSchristos first = bev_group_random_element_(g); \ 4798585484eSchristos for (bev = first; bev != LIST_END(&g->members); \ 4808585484eSchristos bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 4818585484eSchristos block ; \ 4828585484eSchristos } \ 4838585484eSchristos for (bev = LIST_FIRST(&g->members); bev && bev != first; \ 4848585484eSchristos bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 4858585484eSchristos block ; \ 4868585484eSchristos } \ 4878585484eSchristos } while (0) 4888585484eSchristos 4898585484eSchristos static void 4908585484eSchristos bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g) 4918585484eSchristos { 4928585484eSchristos int again = 0; 4938585484eSchristos struct bufferevent_private *bev, *first; 4948585484eSchristos 4958585484eSchristos g->read_suspended = 0; 4968585484eSchristos FOREACH_RANDOM_ORDER({ 4978585484eSchristos if (EVLOCK_TRY_LOCK_(bev->lock)) { 4988585484eSchristos bufferevent_unsuspend_read_(&bev->bev, 4998585484eSchristos BEV_SUSPEND_BW_GROUP); 5008585484eSchristos EVLOCK_UNLOCK(bev->lock, 0); 5018585484eSchristos } else { 5028585484eSchristos again = 1; 5038585484eSchristos } 5048585484eSchristos }); 5058585484eSchristos g->pending_unsuspend_read = again; 5068585484eSchristos } 5078585484eSchristos 5088585484eSchristos static void 5098585484eSchristos bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g) 5108585484eSchristos { 5118585484eSchristos int again = 0; 5128585484eSchristos struct bufferevent_private *bev, *first; 5138585484eSchristos g->write_suspended = 0; 5148585484eSchristos 5158585484eSchristos FOREACH_RANDOM_ORDER({ 5168585484eSchristos if (EVLOCK_TRY_LOCK_(bev->lock)) { 5178585484eSchristos bufferevent_unsuspend_write_(&bev->bev, 5188585484eSchristos BEV_SUSPEND_BW_GROUP); 5198585484eSchristos EVLOCK_UNLOCK(bev->lock, 0); 5208585484eSchristos } else { 5218585484eSchristos again = 1; 5228585484eSchristos } 5238585484eSchristos }); 5248585484eSchristos g->pending_unsuspend_write = again; 5258585484eSchristos } 5268585484eSchristos 5278585484eSchristos /** Callback invoked every tick to add more elements to the group bucket 5288585484eSchristos and unsuspend group members as needed. 5298585484eSchristos */ 5308585484eSchristos static void 5318585484eSchristos bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg) 5328585484eSchristos { 5338585484eSchristos struct bufferevent_rate_limit_group *g = arg; 5348585484eSchristos unsigned tick; 5358585484eSchristos struct timeval now; 5368585484eSchristos 5378585484eSchristos event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); 5388585484eSchristos 5398585484eSchristos LOCK_GROUP(g); 5408585484eSchristos 5418585484eSchristos tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg); 5428585484eSchristos ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick); 5438585484eSchristos 5448585484eSchristos if (g->pending_unsuspend_read || 5458585484eSchristos (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { 5468585484eSchristos bev_group_unsuspend_reading_(g); 5478585484eSchristos } 5488585484eSchristos if (g->pending_unsuspend_write || 5498585484eSchristos (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ 5508585484eSchristos bev_group_unsuspend_writing_(g); 5518585484eSchristos } 5528585484eSchristos 5538585484eSchristos /* XXXX Rather than waiting to the next tick to unsuspend stuff 5548585484eSchristos * with pending_unsuspend_write/read, we should do it on the 5558585484eSchristos * next iteration of the mainloop. 5568585484eSchristos */ 5578585484eSchristos 5588585484eSchristos UNLOCK_GROUP(g); 5598585484eSchristos } 5608585484eSchristos 5618585484eSchristos int 5628585484eSchristos bufferevent_set_rate_limit(struct bufferevent *bev, 5638585484eSchristos struct ev_token_bucket_cfg *cfg) 5648585484eSchristos { 565*eabc0478Schristos struct bufferevent_private *bevp = BEV_UPCAST(bev); 5668585484eSchristos int r = -1; 5678585484eSchristos struct bufferevent_rate_limit *rlim; 5688585484eSchristos struct timeval now; 5698585484eSchristos ev_uint32_t tick; 5708585484eSchristos int reinit = 0, suspended = 0; 5718585484eSchristos /* XXX reference-count cfg */ 5728585484eSchristos 5738585484eSchristos BEV_LOCK(bev); 5748585484eSchristos 5758585484eSchristos if (cfg == NULL) { 5768585484eSchristos if (bevp->rate_limiting) { 5778585484eSchristos rlim = bevp->rate_limiting; 5788585484eSchristos rlim->cfg = NULL; 5798585484eSchristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 5808585484eSchristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 5818585484eSchristos if (event_initialized(&rlim->refill_bucket_event)) 5828585484eSchristos event_del(&rlim->refill_bucket_event); 5838585484eSchristos } 5848585484eSchristos r = 0; 5858585484eSchristos goto done; 5868585484eSchristos } 5878585484eSchristos 5888585484eSchristos event_base_gettimeofday_cached(bev->ev_base, &now); 5898585484eSchristos tick = ev_token_bucket_get_tick_(&now, cfg); 5908585484eSchristos 5918585484eSchristos if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { 5928585484eSchristos /* no-op */ 5938585484eSchristos r = 0; 5948585484eSchristos goto done; 5958585484eSchristos } 5968585484eSchristos if (bevp->rate_limiting == NULL) { 5978585484eSchristos rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 5988585484eSchristos if (!rlim) 5998585484eSchristos goto done; 6008585484eSchristos bevp->rate_limiting = rlim; 6018585484eSchristos } else { 6028585484eSchristos rlim = bevp->rate_limiting; 6038585484eSchristos } 6048585484eSchristos reinit = rlim->cfg != NULL; 6058585484eSchristos 6068585484eSchristos rlim->cfg = cfg; 6078585484eSchristos ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit); 6088585484eSchristos 6098585484eSchristos if (reinit) { 6108585484eSchristos EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); 6118585484eSchristos event_del(&rlim->refill_bucket_event); 6128585484eSchristos } 613b8ecfcfeSchristos event_assign(&rlim->refill_bucket_event, bev->ev_base, 614b8ecfcfeSchristos -1, EV_FINALIZE, bev_refill_callback_, bevp); 6158585484eSchristos 6168585484eSchristos if (rlim->limit.read_limit > 0) { 6178585484eSchristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 6188585484eSchristos } else { 6198585484eSchristos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 6208585484eSchristos suspended=1; 6218585484eSchristos } 6228585484eSchristos if (rlim->limit.write_limit > 0) { 6238585484eSchristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 6248585484eSchristos } else { 6258585484eSchristos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 6268585484eSchristos suspended = 1; 6278585484eSchristos } 6288585484eSchristos 6298585484eSchristos if (suspended) 6308585484eSchristos event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); 6318585484eSchristos 6328585484eSchristos r = 0; 6338585484eSchristos 6348585484eSchristos done: 6358585484eSchristos BEV_UNLOCK(bev); 6368585484eSchristos return r; 6378585484eSchristos } 6388585484eSchristos 6398585484eSchristos struct bufferevent_rate_limit_group * 6408585484eSchristos bufferevent_rate_limit_group_new(struct event_base *base, 6418585484eSchristos const struct ev_token_bucket_cfg *cfg) 6428585484eSchristos { 6438585484eSchristos struct bufferevent_rate_limit_group *g; 6448585484eSchristos struct timeval now; 6458585484eSchristos ev_uint32_t tick; 6468585484eSchristos 6478585484eSchristos event_base_gettimeofday_cached(base, &now); 6488585484eSchristos tick = ev_token_bucket_get_tick_(&now, cfg); 6498585484eSchristos 6508585484eSchristos g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); 6518585484eSchristos if (!g) 6528585484eSchristos return NULL; 6538585484eSchristos memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 6548585484eSchristos LIST_INIT(&g->members); 6558585484eSchristos 6568585484eSchristos ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0); 6578585484eSchristos 658b8ecfcfeSchristos event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE, 6598585484eSchristos bev_group_refill_callback_, g); 6608585484eSchristos /*XXXX handle event_add failure */ 6618585484eSchristos event_add(&g->master_refill_event, &cfg->tick_timeout); 6628585484eSchristos 6638585484eSchristos EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 6648585484eSchristos 6658585484eSchristos bufferevent_rate_limit_group_set_min_share(g, 64); 6668585484eSchristos 6678585484eSchristos evutil_weakrand_seed_(&g->weakrand_seed, 6688585484eSchristos (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g)); 6698585484eSchristos 6708585484eSchristos return g; 6718585484eSchristos } 6728585484eSchristos 6738585484eSchristos int 6748585484eSchristos bufferevent_rate_limit_group_set_cfg( 6758585484eSchristos struct bufferevent_rate_limit_group *g, 6768585484eSchristos const struct ev_token_bucket_cfg *cfg) 6778585484eSchristos { 6788585484eSchristos int same_tick; 6798585484eSchristos if (!g || !cfg) 6808585484eSchristos return -1; 6818585484eSchristos 6828585484eSchristos LOCK_GROUP(g); 6838585484eSchristos same_tick = evutil_timercmp( 6848585484eSchristos &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); 6858585484eSchristos memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 6868585484eSchristos 6878585484eSchristos if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) 6888585484eSchristos g->rate_limit.read_limit = cfg->read_maximum; 6898585484eSchristos if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) 6908585484eSchristos g->rate_limit.write_limit = cfg->write_maximum; 6918585484eSchristos 6928585484eSchristos if (!same_tick) { 6938585484eSchristos /* This can cause a hiccup in the schedule */ 6948585484eSchristos event_add(&g->master_refill_event, &cfg->tick_timeout); 6958585484eSchristos } 6968585484eSchristos 6978585484eSchristos /* The new limits might force us to adjust min_share differently. */ 6988585484eSchristos bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); 6998585484eSchristos 7008585484eSchristos UNLOCK_GROUP(g); 7018585484eSchristos return 0; 7028585484eSchristos } 7038585484eSchristos 7048585484eSchristos int 7058585484eSchristos bufferevent_rate_limit_group_set_min_share( 7068585484eSchristos struct bufferevent_rate_limit_group *g, 7078585484eSchristos size_t share) 7088585484eSchristos { 7098585484eSchristos if (share > EV_SSIZE_MAX) 7108585484eSchristos return -1; 7118585484eSchristos 7128585484eSchristos g->configured_min_share = share; 7138585484eSchristos 7148585484eSchristos /* Can't set share to less than the one-tick maximum. IOW, at steady 7158585484eSchristos * state, at least one connection can go per tick. */ 7168585484eSchristos if (share > g->rate_limit_cfg.read_rate) 7178585484eSchristos share = g->rate_limit_cfg.read_rate; 7188585484eSchristos if (share > g->rate_limit_cfg.write_rate) 7198585484eSchristos share = g->rate_limit_cfg.write_rate; 7208585484eSchristos 7218585484eSchristos g->min_share = share; 7228585484eSchristos return 0; 7238585484eSchristos } 7248585484eSchristos 7258585484eSchristos void 7268585484eSchristos bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) 7278585484eSchristos { 7288585484eSchristos LOCK_GROUP(g); 7298585484eSchristos EVUTIL_ASSERT(0 == g->n_members); 7308585484eSchristos event_del(&g->master_refill_event); 7318585484eSchristos UNLOCK_GROUP(g); 7328585484eSchristos EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 7338585484eSchristos mm_free(g); 7348585484eSchristos } 7358585484eSchristos 7368585484eSchristos int 7378585484eSchristos bufferevent_add_to_rate_limit_group(struct bufferevent *bev, 7388585484eSchristos struct bufferevent_rate_limit_group *g) 7398585484eSchristos { 7408585484eSchristos int wsuspend, rsuspend; 741*eabc0478Schristos struct bufferevent_private *bevp = BEV_UPCAST(bev); 7428585484eSchristos BEV_LOCK(bev); 7438585484eSchristos 7448585484eSchristos if (!bevp->rate_limiting) { 7458585484eSchristos struct bufferevent_rate_limit *rlim; 7468585484eSchristos rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 7478585484eSchristos if (!rlim) { 7488585484eSchristos BEV_UNLOCK(bev); 7498585484eSchristos return -1; 7508585484eSchristos } 751b8ecfcfeSchristos event_assign(&rlim->refill_bucket_event, bev->ev_base, 752b8ecfcfeSchristos -1, EV_FINALIZE, bev_refill_callback_, bevp); 7538585484eSchristos bevp->rate_limiting = rlim; 7548585484eSchristos } 7558585484eSchristos 7568585484eSchristos if (bevp->rate_limiting->group == g) { 7578585484eSchristos BEV_UNLOCK(bev); 7588585484eSchristos return 0; 7598585484eSchristos } 7608585484eSchristos if (bevp->rate_limiting->group) 7618585484eSchristos bufferevent_remove_from_rate_limit_group(bev); 7628585484eSchristos 7638585484eSchristos LOCK_GROUP(g); 7648585484eSchristos bevp->rate_limiting->group = g; 7658585484eSchristos ++g->n_members; 7668585484eSchristos LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group); 7678585484eSchristos 7688585484eSchristos rsuspend = g->read_suspended; 7698585484eSchristos wsuspend = g->write_suspended; 7708585484eSchristos 7718585484eSchristos UNLOCK_GROUP(g); 7728585484eSchristos 7738585484eSchristos if (rsuspend) 7748585484eSchristos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP); 7758585484eSchristos if (wsuspend) 7768585484eSchristos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP); 7778585484eSchristos 7788585484eSchristos BEV_UNLOCK(bev); 7798585484eSchristos return 0; 7808585484eSchristos } 7818585484eSchristos 7828585484eSchristos int 7838585484eSchristos bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) 7848585484eSchristos { 7858585484eSchristos return bufferevent_remove_from_rate_limit_group_internal_(bev, 1); 7868585484eSchristos } 7878585484eSchristos 7888585484eSchristos int 7898585484eSchristos bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev, 7908585484eSchristos int unsuspend) 7918585484eSchristos { 792*eabc0478Schristos struct bufferevent_private *bevp = BEV_UPCAST(bev); 7938585484eSchristos BEV_LOCK(bev); 7948585484eSchristos if (bevp->rate_limiting && bevp->rate_limiting->group) { 7958585484eSchristos struct bufferevent_rate_limit_group *g = 7968585484eSchristos bevp->rate_limiting->group; 7978585484eSchristos LOCK_GROUP(g); 7988585484eSchristos bevp->rate_limiting->group = NULL; 7998585484eSchristos --g->n_members; 8008585484eSchristos LIST_REMOVE(bevp, rate_limiting->next_in_group); 8018585484eSchristos UNLOCK_GROUP(g); 8028585484eSchristos } 8038585484eSchristos if (unsuspend) { 8048585484eSchristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP); 8058585484eSchristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP); 8068585484eSchristos } 8078585484eSchristos BEV_UNLOCK(bev); 8088585484eSchristos return 0; 8098585484eSchristos } 8108585484eSchristos 8118585484eSchristos /* === 8128585484eSchristos * API functions to expose rate limits. 8138585484eSchristos * 8148585484eSchristos * Don't use these from inside Libevent; they're meant to be for use by 8158585484eSchristos * the program. 8168585484eSchristos * === */ 8178585484eSchristos 8188585484eSchristos /* Mostly you don't want to use this function from inside libevent; 8198585484eSchristos * bufferevent_get_read_max_() is more likely what you want*/ 8208585484eSchristos ev_ssize_t 8218585484eSchristos bufferevent_get_read_limit(struct bufferevent *bev) 8228585484eSchristos { 8238585484eSchristos ev_ssize_t r; 8248585484eSchristos struct bufferevent_private *bevp; 8258585484eSchristos BEV_LOCK(bev); 8268585484eSchristos bevp = BEV_UPCAST(bev); 8278585484eSchristos if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 8288585484eSchristos bufferevent_update_buckets(bevp); 8298585484eSchristos r = bevp->rate_limiting->limit.read_limit; 8308585484eSchristos } else { 8318585484eSchristos r = EV_SSIZE_MAX; 8328585484eSchristos } 8338585484eSchristos BEV_UNLOCK(bev); 8348585484eSchristos return r; 8358585484eSchristos } 8368585484eSchristos 8378585484eSchristos /* Mostly you don't want to use this function from inside libevent; 8388585484eSchristos * bufferevent_get_write_max_() is more likely what you want*/ 8398585484eSchristos ev_ssize_t 8408585484eSchristos bufferevent_get_write_limit(struct bufferevent *bev) 8418585484eSchristos { 8428585484eSchristos ev_ssize_t r; 8438585484eSchristos struct bufferevent_private *bevp; 8448585484eSchristos BEV_LOCK(bev); 8458585484eSchristos bevp = BEV_UPCAST(bev); 8468585484eSchristos if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 8478585484eSchristos bufferevent_update_buckets(bevp); 8488585484eSchristos r = bevp->rate_limiting->limit.write_limit; 8498585484eSchristos } else { 8508585484eSchristos r = EV_SSIZE_MAX; 8518585484eSchristos } 8528585484eSchristos BEV_UNLOCK(bev); 8538585484eSchristos return r; 8548585484eSchristos } 8558585484eSchristos 8568585484eSchristos int 8578585484eSchristos bufferevent_set_max_single_read(struct bufferevent *bev, size_t size) 8588585484eSchristos { 8598585484eSchristos struct bufferevent_private *bevp; 8608585484eSchristos BEV_LOCK(bev); 8618585484eSchristos bevp = BEV_UPCAST(bev); 8628585484eSchristos if (size == 0 || size > EV_SSIZE_MAX) 8638585484eSchristos bevp->max_single_read = MAX_SINGLE_READ_DEFAULT; 8648585484eSchristos else 8658585484eSchristos bevp->max_single_read = size; 8668585484eSchristos BEV_UNLOCK(bev); 8678585484eSchristos return 0; 8688585484eSchristos } 8698585484eSchristos 8708585484eSchristos int 8718585484eSchristos bufferevent_set_max_single_write(struct bufferevent *bev, size_t size) 8728585484eSchristos { 8738585484eSchristos struct bufferevent_private *bevp; 8748585484eSchristos BEV_LOCK(bev); 8758585484eSchristos bevp = BEV_UPCAST(bev); 8768585484eSchristos if (size == 0 || size > EV_SSIZE_MAX) 8778585484eSchristos bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 8788585484eSchristos else 8798585484eSchristos bevp->max_single_write = size; 8808585484eSchristos BEV_UNLOCK(bev); 8818585484eSchristos return 0; 8828585484eSchristos } 8838585484eSchristos 8848585484eSchristos ev_ssize_t 8858585484eSchristos bufferevent_get_max_single_read(struct bufferevent *bev) 8868585484eSchristos { 8878585484eSchristos ev_ssize_t r; 8888585484eSchristos 8898585484eSchristos BEV_LOCK(bev); 8908585484eSchristos r = BEV_UPCAST(bev)->max_single_read; 8918585484eSchristos BEV_UNLOCK(bev); 8928585484eSchristos return r; 8938585484eSchristos } 8948585484eSchristos 8958585484eSchristos ev_ssize_t 8968585484eSchristos bufferevent_get_max_single_write(struct bufferevent *bev) 8978585484eSchristos { 8988585484eSchristos ev_ssize_t r; 8998585484eSchristos 9008585484eSchristos BEV_LOCK(bev); 9018585484eSchristos r = BEV_UPCAST(bev)->max_single_write; 9028585484eSchristos BEV_UNLOCK(bev); 9038585484eSchristos return r; 9048585484eSchristos } 9058585484eSchristos 9068585484eSchristos ev_ssize_t 9078585484eSchristos bufferevent_get_max_to_read(struct bufferevent *bev) 9088585484eSchristos { 9098585484eSchristos ev_ssize_t r; 9108585484eSchristos BEV_LOCK(bev); 9118585484eSchristos r = bufferevent_get_read_max_(BEV_UPCAST(bev)); 9128585484eSchristos BEV_UNLOCK(bev); 9138585484eSchristos return r; 9148585484eSchristos } 9158585484eSchristos 9168585484eSchristos ev_ssize_t 9178585484eSchristos bufferevent_get_max_to_write(struct bufferevent *bev) 9188585484eSchristos { 9198585484eSchristos ev_ssize_t r; 9208585484eSchristos BEV_LOCK(bev); 9218585484eSchristos r = bufferevent_get_write_max_(BEV_UPCAST(bev)); 9228585484eSchristos BEV_UNLOCK(bev); 9238585484eSchristos return r; 9248585484eSchristos } 9258585484eSchristos 926b8ecfcfeSchristos const struct ev_token_bucket_cfg * 927b8ecfcfeSchristos bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) { 928b8ecfcfeSchristos struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 929b8ecfcfeSchristos struct ev_token_bucket_cfg *cfg; 930b8ecfcfeSchristos 931b8ecfcfeSchristos BEV_LOCK(bev); 932b8ecfcfeSchristos 933b8ecfcfeSchristos if (bufev_private->rate_limiting) { 934b8ecfcfeSchristos cfg = bufev_private->rate_limiting->cfg; 935b8ecfcfeSchristos } else { 936b8ecfcfeSchristos cfg = NULL; 937b8ecfcfeSchristos } 938b8ecfcfeSchristos 939b8ecfcfeSchristos BEV_UNLOCK(bev); 940b8ecfcfeSchristos 941b8ecfcfeSchristos return cfg; 942b8ecfcfeSchristos } 9438585484eSchristos 9448585484eSchristos /* Mostly you don't want to use this function from inside libevent; 9458585484eSchristos * bufferevent_get_read_max_() is more likely what you want*/ 9468585484eSchristos ev_ssize_t 9478585484eSchristos bufferevent_rate_limit_group_get_read_limit( 9488585484eSchristos struct bufferevent_rate_limit_group *grp) 9498585484eSchristos { 9508585484eSchristos ev_ssize_t r; 9518585484eSchristos LOCK_GROUP(grp); 9528585484eSchristos r = grp->rate_limit.read_limit; 9538585484eSchristos UNLOCK_GROUP(grp); 9548585484eSchristos return r; 9558585484eSchristos } 9568585484eSchristos 9578585484eSchristos /* Mostly you don't want to use this function from inside libevent; 9588585484eSchristos * bufferevent_get_write_max_() is more likely what you want. */ 9598585484eSchristos ev_ssize_t 9608585484eSchristos bufferevent_rate_limit_group_get_write_limit( 9618585484eSchristos struct bufferevent_rate_limit_group *grp) 9628585484eSchristos { 9638585484eSchristos ev_ssize_t r; 9648585484eSchristos LOCK_GROUP(grp); 9658585484eSchristos r = grp->rate_limit.write_limit; 9668585484eSchristos UNLOCK_GROUP(grp); 9678585484eSchristos return r; 9688585484eSchristos } 9698585484eSchristos 9708585484eSchristos int 9718585484eSchristos bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) 9728585484eSchristos { 9738585484eSchristos int r = 0; 9748585484eSchristos ev_ssize_t old_limit, new_limit; 9758585484eSchristos struct bufferevent_private *bevp; 9768585484eSchristos BEV_LOCK(bev); 9778585484eSchristos bevp = BEV_UPCAST(bev); 9788585484eSchristos EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 9798585484eSchristos old_limit = bevp->rate_limiting->limit.read_limit; 9808585484eSchristos 9818585484eSchristos new_limit = (bevp->rate_limiting->limit.read_limit -= decr); 9828585484eSchristos if (old_limit > 0 && new_limit <= 0) { 9838585484eSchristos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 9848585484eSchristos if (event_add(&bevp->rate_limiting->refill_bucket_event, 9858585484eSchristos &bevp->rate_limiting->cfg->tick_timeout) < 0) 9868585484eSchristos r = -1; 9878585484eSchristos } else if (old_limit <= 0 && new_limit > 0) { 9888585484eSchristos if (!(bevp->write_suspended & BEV_SUSPEND_BW)) 9898585484eSchristos event_del(&bevp->rate_limiting->refill_bucket_event); 9908585484eSchristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 9918585484eSchristos } 9928585484eSchristos 9938585484eSchristos BEV_UNLOCK(bev); 9948585484eSchristos return r; 9958585484eSchristos } 9968585484eSchristos 9978585484eSchristos int 9988585484eSchristos bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) 9998585484eSchristos { 10008585484eSchristos /* XXXX this is mostly copy-and-paste from 10018585484eSchristos * bufferevent_decrement_read_limit */ 10028585484eSchristos int r = 0; 10038585484eSchristos ev_ssize_t old_limit, new_limit; 10048585484eSchristos struct bufferevent_private *bevp; 10058585484eSchristos BEV_LOCK(bev); 10068585484eSchristos bevp = BEV_UPCAST(bev); 10078585484eSchristos EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 10088585484eSchristos old_limit = bevp->rate_limiting->limit.write_limit; 10098585484eSchristos 10108585484eSchristos new_limit = (bevp->rate_limiting->limit.write_limit -= decr); 10118585484eSchristos if (old_limit > 0 && new_limit <= 0) { 10128585484eSchristos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 10138585484eSchristos if (event_add(&bevp->rate_limiting->refill_bucket_event, 10148585484eSchristos &bevp->rate_limiting->cfg->tick_timeout) < 0) 10158585484eSchristos r = -1; 10168585484eSchristos } else if (old_limit <= 0 && new_limit > 0) { 10178585484eSchristos if (!(bevp->read_suspended & BEV_SUSPEND_BW)) 10188585484eSchristos event_del(&bevp->rate_limiting->refill_bucket_event); 10198585484eSchristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 10208585484eSchristos } 10218585484eSchristos 10228585484eSchristos BEV_UNLOCK(bev); 10238585484eSchristos return r; 10248585484eSchristos } 10258585484eSchristos 10268585484eSchristos int 10278585484eSchristos bufferevent_rate_limit_group_decrement_read( 10288585484eSchristos struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 10298585484eSchristos { 10308585484eSchristos int r = 0; 10318585484eSchristos ev_ssize_t old_limit, new_limit; 10328585484eSchristos LOCK_GROUP(grp); 10338585484eSchristos old_limit = grp->rate_limit.read_limit; 10348585484eSchristos new_limit = (grp->rate_limit.read_limit -= decr); 10358585484eSchristos 10368585484eSchristos if (old_limit > 0 && new_limit <= 0) { 10378585484eSchristos bev_group_suspend_reading_(grp); 10388585484eSchristos } else if (old_limit <= 0 && new_limit > 0) { 10398585484eSchristos bev_group_unsuspend_reading_(grp); 10408585484eSchristos } 10418585484eSchristos 10428585484eSchristos UNLOCK_GROUP(grp); 10438585484eSchristos return r; 10448585484eSchristos } 10458585484eSchristos 10468585484eSchristos int 10478585484eSchristos bufferevent_rate_limit_group_decrement_write( 10488585484eSchristos struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 10498585484eSchristos { 10508585484eSchristos int r = 0; 10518585484eSchristos ev_ssize_t old_limit, new_limit; 10528585484eSchristos LOCK_GROUP(grp); 10538585484eSchristos old_limit = grp->rate_limit.write_limit; 10548585484eSchristos new_limit = (grp->rate_limit.write_limit -= decr); 10558585484eSchristos 10568585484eSchristos if (old_limit > 0 && new_limit <= 0) { 10578585484eSchristos bev_group_suspend_writing_(grp); 10588585484eSchristos } else if (old_limit <= 0 && new_limit > 0) { 10598585484eSchristos bev_group_unsuspend_writing_(grp); 10608585484eSchristos } 10618585484eSchristos 10628585484eSchristos UNLOCK_GROUP(grp); 10638585484eSchristos return r; 10648585484eSchristos } 10658585484eSchristos 10668585484eSchristos void 10678585484eSchristos bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp, 10688585484eSchristos ev_uint64_t *total_read_out, ev_uint64_t *total_written_out) 10698585484eSchristos { 10708585484eSchristos EVUTIL_ASSERT(grp != NULL); 10718585484eSchristos if (total_read_out) 10728585484eSchristos *total_read_out = grp->total_read; 10738585484eSchristos if (total_written_out) 10748585484eSchristos *total_written_out = grp->total_written; 10758585484eSchristos } 10768585484eSchristos 10778585484eSchristos void 10788585484eSchristos bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp) 10798585484eSchristos { 10808585484eSchristos grp->total_read = grp->total_written = 0; 10818585484eSchristos } 10828585484eSchristos 10838585484eSchristos int 10848585484eSchristos bufferevent_ratelim_init_(struct bufferevent_private *bev) 10858585484eSchristos { 10868585484eSchristos bev->rate_limiting = NULL; 10878585484eSchristos bev->max_single_read = MAX_SINGLE_READ_DEFAULT; 10888585484eSchristos bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 10898585484eSchristos 10908585484eSchristos return 0; 10918585484eSchristos } 1092