1 /* SPDX-License-Identifier: BSD-3-Clause 2 * 3 * Copyright (c) 2010-2020 Intel Corporation 4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org 5 * All rights reserved. 6 * Derived from FreeBSD's bufring.h 7 * Used as BSD-3 Licensed with permission from Kip Macy. 8 */ 9 10 #ifndef _RTE_RING_RTS_H_ 11 #define _RTE_RING_RTS_H_ 12 13 /** 14 * @file rte_ring_rts.h 15 * It is not recommended to include this file directly. 16 * Please include <rte_ring.h> instead. 17 * 18 * Contains functions for Relaxed Tail Sync (RTS) ring mode. 19 * The main idea remains the same as for our original MP/MC synchronization 20 * mechanism. 21 * The main difference is that tail value is increased not 22 * by every thread that finished enqueue/dequeue, 23 * but only by the current last one doing enqueue/dequeue. 24 * That allows threads to skip spinning on tail value, 25 * leaving actual tail value change to last thread at a given instance. 26 * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation: 27 * one for head update, second for tail update. 28 * As a gain it allows thread to avoid spinning/waiting on tail value. 29 * In comparison original MP/MC algorithm requires one 32-bit CAS 30 * for head update and waiting/spinning on tail value. 31 * 32 * Brief outline: 33 * - introduce update counter (cnt) for both head and tail. 34 * - increment head.cnt for each head.value update 35 * - write head.value and head.cnt atomically (64-bit CAS) 36 * - move tail.value ahead only when tail.cnt + 1 == head.cnt 37 * (indicating that this is the last thread updating the tail) 38 * - increment tail.cnt when each enqueue/dequeue op finishes 39 * (no matter if tail.value going to change or not) 40 * - write tail.value and tail.cnt atomically (64-bit CAS) 41 * 42 * To avoid producer/consumer starvation: 43 * - limit max allowed distance between head and tail value (HTD_MAX). 44 * I.E. thread is allowed to proceed with changing head.value, 45 * only when: head.value - tail.value <= HTD_MAX 46 * HTD_MAX is an optional parameter. 47 * With HTD_MAX == 0 we'll have fully serialized ring - 48 * i.e. only one thread at a time will be able to enqueue/dequeue 49 * to/from the ring. 50 * With HTD_MAX >= ring.capacity - no limitation. 51 * By default HTD_MAX == ring.capacity / 8. 52 */ 53 54 #include <rte_ring_rts_elem_pvt.h> 55 56 #ifdef __cplusplus 57 extern "C" { 58 #endif 59 60 /** 61 * Enqueue several objects on the RTS ring (multi-producers safe). 62 * 63 * @param r 64 * A pointer to the ring structure. 65 * @param obj_table 66 * A pointer to a table of objects. 67 * @param esize 68 * The size of ring element, in bytes. It must be a multiple of 4. 69 * This must be the same value used while creating the ring. Otherwise 70 * the results are undefined. 71 * @param n 72 * The number of objects to add in the ring from the obj_table. 73 * @param free_space 74 * if non-NULL, returns the amount of space in the ring after the 75 * enqueue operation has finished. 76 * @return 77 * The number of objects enqueued, either 0 or n 78 */ 79 static __rte_always_inline unsigned int 80 rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, 81 unsigned int esize, unsigned int n, unsigned int *free_space) 82 { 83 return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, 84 RTE_RING_QUEUE_FIXED, free_space); 85 } 86 87 /** 88 * Dequeue several objects from an RTS ring (multi-consumers safe). 89 * 90 * @param r 91 * A pointer to the ring structure. 92 * @param obj_table 93 * A pointer to a table of objects that will be filled. 94 * @param esize 95 * The size of ring element, in bytes. It must be a multiple of 4. 96 * This must be the same value used while creating the ring. Otherwise 97 * the results are undefined. 98 * @param n 99 * The number of objects to dequeue from the ring to the obj_table. 100 * @param available 101 * If non-NULL, returns the number of remaining ring entries after the 102 * dequeue has finished. 103 * @return 104 * The number of objects dequeued, either 0 or n 105 */ 106 static __rte_always_inline unsigned int 107 rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, 108 unsigned int esize, unsigned int n, unsigned int *available) 109 { 110 return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, 111 RTE_RING_QUEUE_FIXED, available); 112 } 113 114 /** 115 * Enqueue several objects on the RTS ring (multi-producers safe). 116 * 117 * @param r 118 * A pointer to the ring structure. 119 * @param obj_table 120 * A pointer to a table of objects. 121 * @param esize 122 * The size of ring element, in bytes. It must be a multiple of 4. 123 * This must be the same value used while creating the ring. Otherwise 124 * the results are undefined. 125 * @param n 126 * The number of objects to add in the ring from the obj_table. 127 * @param free_space 128 * if non-NULL, returns the amount of space in the ring after the 129 * enqueue operation has finished. 130 * @return 131 * - n: Actual number of objects enqueued. 132 */ 133 static __rte_always_inline unsigned int 134 rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, 135 unsigned int esize, unsigned int n, unsigned int *free_space) 136 { 137 return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, 138 RTE_RING_QUEUE_VARIABLE, free_space); 139 } 140 141 /** 142 * Dequeue several objects from an RTS ring (multi-consumers safe). 143 * When the requested objects are more than the available objects, 144 * only dequeue the actual number of objects. 145 * 146 * @param r 147 * A pointer to the ring structure. 148 * @param obj_table 149 * A pointer to a table of objects that will be filled. 150 * @param esize 151 * The size of ring element, in bytes. It must be a multiple of 4. 152 * This must be the same value used while creating the ring. Otherwise 153 * the results are undefined. 154 * @param n 155 * The number of objects to dequeue from the ring to the obj_table. 156 * @param available 157 * If non-NULL, returns the number of remaining ring entries after the 158 * dequeue has finished. 159 * @return 160 * - n: Actual number of objects dequeued, 0 if ring is empty 161 */ 162 static __rte_always_inline unsigned int 163 rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table, 164 unsigned int esize, unsigned int n, unsigned int *available) 165 { 166 return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, 167 RTE_RING_QUEUE_VARIABLE, available); 168 } 169 170 /** 171 * Enqueue several objects on the RTS ring (multi-producers safe). 172 * 173 * @param r 174 * A pointer to the ring structure. 175 * @param obj_table 176 * A pointer to a table of void * pointers (objects). 177 * @param n 178 * The number of objects to add in the ring from the obj_table. 179 * @param free_space 180 * if non-NULL, returns the amount of space in the ring after the 181 * enqueue operation has finished. 182 * @return 183 * The number of objects enqueued, either 0 or n 184 */ 185 static __rte_always_inline unsigned int 186 rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, 187 unsigned int n, unsigned int *free_space) 188 { 189 return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, 190 sizeof(uintptr_t), n, free_space); 191 } 192 193 /** 194 * Dequeue several objects from an RTS ring (multi-consumers safe). 195 * 196 * @param r 197 * A pointer to the ring structure. 198 * @param obj_table 199 * A pointer to a table of void * pointers (objects) that will be filled. 200 * @param n 201 * The number of objects to dequeue from the ring to the obj_table. 202 * @param available 203 * If non-NULL, returns the number of remaining ring entries after the 204 * dequeue has finished. 205 * @return 206 * The number of objects dequeued, either 0 or n 207 */ 208 static __rte_always_inline unsigned int 209 rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table, 210 unsigned int n, unsigned int *available) 211 { 212 return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, 213 sizeof(uintptr_t), n, available); 214 } 215 216 /** 217 * Enqueue several objects on the RTS ring (multi-producers safe). 218 * 219 * @param r 220 * A pointer to the ring structure. 221 * @param obj_table 222 * A pointer to a table of void * pointers (objects). 223 * @param n 224 * The number of objects to add in the ring from the obj_table. 225 * @param free_space 226 * if non-NULL, returns the amount of space in the ring after the 227 * enqueue operation has finished. 228 * @return 229 * - n: Actual number of objects enqueued. 230 */ 231 static __rte_always_inline unsigned int 232 rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table, 233 unsigned int n, unsigned int *free_space) 234 { 235 return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, 236 sizeof(uintptr_t), n, free_space); 237 } 238 239 /** 240 * Dequeue several objects from an RTS ring (multi-consumers safe). 241 * When the requested objects are more than the available objects, 242 * only dequeue the actual number of objects. 243 * 244 * @param r 245 * A pointer to the ring structure. 246 * @param obj_table 247 * A pointer to a table of void * pointers (objects) that will be filled. 248 * @param n 249 * The number of objects to dequeue from the ring to the obj_table. 250 * @param available 251 * If non-NULL, returns the number of remaining ring entries after the 252 * dequeue has finished. 253 * @return 254 * - n: Actual number of objects dequeued, 0 if ring is empty 255 */ 256 static __rte_always_inline unsigned int 257 rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table, 258 unsigned int n, unsigned int *available) 259 { 260 return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, 261 sizeof(uintptr_t), n, available); 262 } 263 264 /** 265 * Return producer max Head-Tail-Distance (HTD). 266 * 267 * @param r 268 * A pointer to the ring structure. 269 * @return 270 * Producer HTD value, if producer is set in appropriate sync mode, 271 * or UINT32_MAX otherwise. 272 */ 273 static inline uint32_t 274 rte_ring_get_prod_htd_max(const struct rte_ring *r) 275 { 276 if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS) 277 return r->rts_prod.htd_max; 278 return UINT32_MAX; 279 } 280 281 /** 282 * Set producer max Head-Tail-Distance (HTD). 283 * Note that producer has to use appropriate sync mode (RTS). 284 * 285 * @param r 286 * A pointer to the ring structure. 287 * @param v 288 * new HTD value to setup. 289 * @return 290 * Zero on success, or negative error code otherwise. 291 */ 292 static inline int 293 rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v) 294 { 295 if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS) 296 return -ENOTSUP; 297 298 r->rts_prod.htd_max = v; 299 return 0; 300 } 301 302 /** 303 * Return consumer max Head-Tail-Distance (HTD). 304 * 305 * @param r 306 * A pointer to the ring structure. 307 * @return 308 * Consumer HTD value, if consumer is set in appropriate sync mode, 309 * or UINT32_MAX otherwise. 310 */ 311 static inline uint32_t 312 rte_ring_get_cons_htd_max(const struct rte_ring *r) 313 { 314 if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS) 315 return r->rts_cons.htd_max; 316 return UINT32_MAX; 317 } 318 319 /** 320 * Set consumer max Head-Tail-Distance (HTD). 321 * Note that consumer has to use appropriate sync mode (RTS). 322 * 323 * @param r 324 * A pointer to the ring structure. 325 * @param v 326 * new HTD value to setup. 327 * @return 328 * Zero on success, or negative error code otherwise. 329 */ 330 static inline int 331 rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v) 332 { 333 if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS) 334 return -ENOTSUP; 335 336 r->rts_cons.htd_max = v; 337 return 0; 338 } 339 340 #ifdef __cplusplus 341 } 342 #endif 343 344 #endif /* _RTE_RING_RTS_H_ */ 345