1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause 3 * 4 * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org> 5 * All rights reserved. 6 * Copyright (c) 2024 Arm Ltd 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 * 29 */ 30 31 #ifndef _SYS_BUF_RING_H_ 32 #define _SYS_BUF_RING_H_ 33 34 #include <sys/param.h> 35 #include <sys/kassert.h> 36 #include <machine/atomic.h> 37 #include <machine/cpu.h> 38 39 #if defined(DEBUG_BUFRING) && defined(_KERNEL) 40 #include <sys/lock.h> 41 #include <sys/mutex.h> 42 #endif 43 44 /* 45 * We only apply the mask to the head and tail values when calculating the 46 * index into br_ring to access. This means the upper bits can be used as 47 * epoch to reduce the chance the atomic_cmpset succeedes when it should 48 * fail, e.g. when the head wraps while the CPU is in an interrupt. This 49 * is a probablistic fix as there is still a very unlikely chance the 50 * value wraps back to the expected value. 51 * 52 */ 53 struct buf_ring { 54 uint32_t br_prod_head; 55 uint32_t br_prod_tail; 56 int br_prod_size; 57 int br_prod_mask; 58 uint64_t br_drops; 59 uint32_t br_cons_head __aligned(CACHE_LINE_SIZE); 60 uint32_t br_cons_tail; 61 int br_cons_size; 62 int br_cons_mask; 63 #if defined(DEBUG_BUFRING) && defined(_KERNEL) 64 struct mtx *br_lock; 65 #endif 66 void *br_ring[0] __aligned(CACHE_LINE_SIZE); 67 }; 68 69 /* 70 * multi-producer safe lock-free ring buffer enqueue 71 * 72 */ 73 static __inline int 74 buf_ring_enqueue(struct buf_ring *br, void *buf) 75 { 76 uint32_t prod_head, prod_next, prod_idx; 77 uint32_t cons_tail, mask; 78 79 mask = br->br_prod_mask; 80 #ifdef DEBUG_BUFRING 81 /* 82 * Note: It is possible to encounter an mbuf that was removed 83 * via drbr_peek(), and then re-added via drbr_putback() and 84 * trigger a spurious panic. 85 */ 86 for (uint32_t i = atomic_load_32(&br->br_cons_head); 87 i != atomic_load_32(&br->br_prod_head); i++) 88 if (br->br_ring[i & mask] == buf) 89 panic("buf=%p already enqueue at %d prod=%d cons=%d", 90 buf, i, atomic_load_32(&br->br_prod_tail), 91 atomic_load_32(&br->br_cons_tail)); 92 #endif 93 critical_enter(); 94 do { 95 /* 96 * br->br_prod_head needs to be read before br->br_cons_tail. 97 * If not then we could perform the dequeue and enqueue 98 * between reading br_cons_tail and reading br_prod_head. This 99 * could give us values where br_cons_head == br_prod_tail 100 * (after masking). 101 * 102 * To work around this us a load acquire. This is just to 103 * ensure ordering within this thread. 104 */ 105 prod_head = atomic_load_acq_32(&br->br_prod_head); 106 prod_next = prod_head + 1; 107 cons_tail = atomic_load_acq_32(&br->br_cons_tail); 108 109 if ((int32_t)(cons_tail + br->br_prod_size - prod_next) < 1) { 110 if (prod_head == atomic_load_32(&br->br_prod_head) && 111 cons_tail == atomic_load_32(&br->br_cons_tail)) { 112 br->br_drops++; 113 critical_exit(); 114 return (ENOBUFS); 115 } 116 continue; 117 } 118 } while (!atomic_cmpset_32(&br->br_prod_head, prod_head, prod_next)); 119 prod_idx = prod_head & mask; 120 #ifdef DEBUG_BUFRING 121 if (br->br_ring[prod_idx] != NULL) 122 panic("dangling value in enqueue"); 123 #endif 124 br->br_ring[prod_idx] = buf; 125 126 /* 127 * If there are other enqueues in progress 128 * that preceded us, we need to wait for them 129 * to complete 130 */ 131 while (atomic_load_32(&br->br_prod_tail) != prod_head) 132 cpu_spinwait(); 133 atomic_store_rel_32(&br->br_prod_tail, prod_next); 134 critical_exit(); 135 return (0); 136 } 137 138 /* 139 * multi-consumer safe dequeue 140 * 141 */ 142 static __inline void * 143 buf_ring_dequeue_mc(struct buf_ring *br) 144 { 145 uint32_t cons_head, cons_next, cons_idx; 146 uint32_t prod_tail, mask; 147 void *buf; 148 149 critical_enter(); 150 mask = br->br_cons_mask; 151 do { 152 /* 153 * As with buf_ring_enqueue ensure we read the head before 154 * the tail. If we read them in the wrong order we may 155 * think the bug_ring is full when it is empty. 156 */ 157 cons_head = atomic_load_acq_32(&br->br_cons_head); 158 cons_next = cons_head + 1; 159 prod_tail = atomic_load_acq_32(&br->br_prod_tail); 160 161 if (cons_head == prod_tail) { 162 critical_exit(); 163 return (NULL); 164 } 165 } while (!atomic_cmpset_32(&br->br_cons_head, cons_head, cons_next)); 166 cons_idx = cons_head & mask; 167 168 buf = br->br_ring[cons_idx]; 169 #ifdef DEBUG_BUFRING 170 br->br_ring[cons_idx] = NULL; 171 #endif 172 /* 173 * If there are other dequeues in progress 174 * that preceded us, we need to wait for them 175 * to complete 176 */ 177 while (atomic_load_32(&br->br_cons_tail) != cons_head) 178 cpu_spinwait(); 179 180 atomic_store_rel_32(&br->br_cons_tail, cons_next); 181 critical_exit(); 182 183 return (buf); 184 } 185 186 /* 187 * single-consumer dequeue 188 * use where dequeue is protected by a lock 189 * e.g. a network driver's tx queue lock 190 */ 191 static __inline void * 192 buf_ring_dequeue_sc(struct buf_ring *br) 193 { 194 uint32_t cons_head, cons_next, cons_idx; 195 uint32_t prod_tail, mask; 196 void *buf; 197 198 mask = br->br_cons_mask; 199 cons_head = atomic_load_32(&br->br_cons_head); 200 prod_tail = atomic_load_acq_32(&br->br_prod_tail); 201 202 cons_next = cons_head + 1; 203 204 if (cons_head == prod_tail) 205 return (NULL); 206 207 cons_idx = cons_head & mask; 208 atomic_store_32(&br->br_cons_head, cons_next); 209 buf = br->br_ring[cons_idx]; 210 211 #ifdef DEBUG_BUFRING 212 br->br_ring[cons_idx] = NULL; 213 #ifdef _KERNEL 214 if (!mtx_owned(br->br_lock)) 215 panic("lock not held on single consumer dequeue"); 216 #endif 217 if (atomic_load_32(&br->br_cons_tail) != cons_head) 218 panic("inconsistent list cons_tail=%d cons_head=%d", 219 atomic_load_32(&br->br_cons_tail), cons_head); 220 #endif 221 atomic_store_rel_32(&br->br_cons_tail, cons_next); 222 return (buf); 223 } 224 225 /* 226 * single-consumer advance after a peek 227 * use where it is protected by a lock 228 * e.g. a network driver's tx queue lock 229 */ 230 static __inline void 231 buf_ring_advance_sc(struct buf_ring *br) 232 { 233 uint32_t cons_head, cons_next, prod_tail; 234 #ifdef DEBUG_BUFRING 235 uint32_t mask; 236 237 mask = br->br_cons_mask; 238 #endif 239 cons_head = atomic_load_32(&br->br_cons_head); 240 prod_tail = atomic_load_32(&br->br_prod_tail); 241 242 cons_next = cons_head + 1; 243 if (cons_head == prod_tail) 244 return; 245 atomic_store_32(&br->br_cons_head, cons_next); 246 #ifdef DEBUG_BUFRING 247 br->br_ring[cons_head & mask] = NULL; 248 #endif 249 atomic_store_rel_32(&br->br_cons_tail, cons_next); 250 } 251 252 /* 253 * Used to return a buffer (most likely already there) 254 * to the top of the ring. The caller should *not* 255 * have used any dequeue to pull it out of the ring 256 * but instead should have used the peek() function. 257 * This is normally used where the transmit queue 258 * of a driver is full, and an mbuf must be returned. 259 * Most likely whats in the ring-buffer is what 260 * is being put back (since it was not removed), but 261 * sometimes the lower transmit function may have 262 * done a pullup or other function that will have 263 * changed it. As an optimization we always put it 264 * back (since jhb says the store is probably cheaper), 265 * if we have to do a multi-queue version we will need 266 * the compare and an atomic. 267 */ 268 static __inline void 269 buf_ring_putback_sc(struct buf_ring *br, void *new) 270 { 271 uint32_t cons_idx, mask; 272 273 mask = br->br_cons_mask; 274 cons_idx = atomic_load_32(&br->br_cons_head) & mask; 275 KASSERT(cons_idx != (atomic_load_32(&br->br_prod_tail) & mask), 276 ("Buf-Ring has none in putback")) ; 277 br->br_ring[cons_idx] = new; 278 } 279 280 /* 281 * return a pointer to the first entry in the ring 282 * without modifying it, or NULL if the ring is empty 283 * race-prone if not protected by a lock 284 */ 285 static __inline void * 286 buf_ring_peek(struct buf_ring *br) 287 { 288 uint32_t cons_head, prod_tail, mask; 289 290 #if defined(DEBUG_BUFRING) && defined(_KERNEL) 291 if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) 292 panic("lock not held on single consumer dequeue"); 293 #endif 294 mask = br->br_cons_mask; 295 prod_tail = atomic_load_acq_32(&br->br_prod_tail); 296 cons_head = atomic_load_32(&br->br_cons_head); 297 298 if (cons_head == prod_tail) 299 return (NULL); 300 301 return (br->br_ring[cons_head & mask]); 302 } 303 304 static __inline void * 305 buf_ring_peek_clear_sc(struct buf_ring *br) 306 { 307 uint32_t cons_head, prod_tail, mask; 308 void *ret; 309 310 #if defined(DEBUG_BUFRING) && defined(_KERNEL) 311 if (!mtx_owned(br->br_lock)) 312 panic("lock not held on single consumer dequeue"); 313 #endif 314 315 mask = br->br_cons_mask; 316 prod_tail = atomic_load_acq_32(&br->br_prod_tail); 317 cons_head = atomic_load_32(&br->br_cons_head); 318 319 if (cons_head == prod_tail) 320 return (NULL); 321 322 ret = br->br_ring[cons_head & mask]; 323 #ifdef DEBUG_BUFRING 324 /* 325 * Single consumer, i.e. cons_head will not move while we are 326 * running, so atomic_swap_ptr() is not necessary here. 327 */ 328 br->br_ring[cons_head & mask] = NULL; 329 #endif 330 return (ret); 331 } 332 333 static __inline int 334 buf_ring_full(struct buf_ring *br) 335 { 336 337 return (atomic_load_32(&br->br_prod_head) == 338 atomic_load_32(&br->br_cons_tail) + br->br_cons_size - 1); 339 } 340 341 static __inline int 342 buf_ring_empty(struct buf_ring *br) 343 { 344 345 return (atomic_load_32(&br->br_cons_head) == 346 atomic_load_32(&br->br_prod_tail)); 347 } 348 349 static __inline int 350 buf_ring_count(struct buf_ring *br) 351 { 352 uint32_t cons_tail, prod_tail; 353 354 cons_tail = atomic_load_32(&br->br_cons_tail); 355 prod_tail = atomic_load_32(&br->br_prod_tail); 356 return ((br->br_prod_size + prod_tail - cons_tail) & br->br_prod_mask); 357 } 358 359 #ifdef _KERNEL 360 struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags, 361 struct mtx *); 362 void buf_ring_free(struct buf_ring *br, struct malloc_type *type); 363 #else 364 365 #include <stdlib.h> 366 367 static inline struct buf_ring * 368 buf_ring_alloc(int count) 369 { 370 struct buf_ring *br; 371 372 KASSERT(powerof2(count), ("buf ring must be size power of 2")); 373 374 br = calloc(1, sizeof(struct buf_ring) + count * sizeof(void *)); 375 if (br == NULL) 376 return (NULL); 377 br->br_prod_size = br->br_cons_size = count; 378 br->br_prod_mask = br->br_cons_mask = count - 1; 379 br->br_prod_head = br->br_cons_head = 0; 380 br->br_prod_tail = br->br_cons_tail = 0; 381 return (br); 382 } 383 384 static inline void 385 buf_ring_free(struct buf_ring *br) 386 { 387 free(br); 388 } 389 390 #endif /* !_KERNEL */ 391 #endif /* _SYS_BUF_RING_H_ */ 392