1 /* SPDX-License-Identifier: BSD-3-Clause 2 * 3 * Copyright (c) 2010-2020 Intel Corporation 4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org 5 * All rights reserved. 6 * Derived from FreeBSD's bufring.h 7 * Used as BSD-3 Licensed with permission from Kip Macy. 8 */ 9 10 #ifndef _RTE_RING_RTS_ELEM_PVT_H_ 11 #define _RTE_RING_RTS_ELEM_PVT_H_ 12 13 /** 14 * @file rte_ring_rts_elem_pvt.h 15 * It is not recommended to include this file directly, 16 * include <rte_ring.h> instead. 17 * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode. 18 * For more information please refer to <rte_ring_rts.h>. 19 */ 20 21 /** 22 * @internal This function updates tail values. 23 */ 24 static __rte_always_inline void 25 __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) 26 { 27 union __rte_ring_rts_poscnt h, ot, nt; 28 29 /* 30 * If there are other enqueues/dequeues in progress that 31 * might preceded us, then don't update tail with new value. 32 */ 33 34 ot.raw = rte_atomic_load_explicit(&ht->tail.raw, rte_memory_order_acquire); 35 36 do { 37 /* on 32-bit systems we have to do atomic read here */ 38 h.raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_relaxed); 39 40 nt.raw = ot.raw; 41 if (++nt.val.cnt == h.val.cnt) 42 nt.val.pos = h.val.pos; 43 44 } while (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw, 45 (uint64_t *)(uintptr_t)&ot.raw, nt.raw, 46 rte_memory_order_release, rte_memory_order_acquire) == 0); 47 } 48 49 /** 50 * @internal This function waits till head/tail distance wouldn't 51 * exceed pre-defined max value. 52 */ 53 static __rte_always_inline void 54 __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht, 55 union __rte_ring_rts_poscnt *h) 56 { 57 uint32_t max; 58 59 max = ht->htd_max; 60 61 while (h->val.pos - ht->tail.val.pos > max) { 62 rte_pause(); 63 h->raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_acquire); 64 } 65 } 66 67 /** 68 * @internal This is a helper function that moves the producer/consumer head 69 * 70 * @param d 71 * A pointer to the headtail structure with head value to be moved 72 * @param s 73 * A pointer to the counter-part headtail structure. Note that this 74 * function only reads tail value from it 75 * @param capacity 76 * Either ring capacity value (for producer), or zero (for consumer) 77 * Indicates whether multi-thread safe path is needed or not 78 * @param num 79 * The number of elements we want to move head value on 80 * @param behavior 81 * RTE_RING_QUEUE_FIXED: Move on a fixed number of items 82 * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible 83 * @param old_head 84 * Returns head value as it was before the move 85 * @param entries 86 * Returns the number of ring entries available BEFORE head was moved 87 * @return 88 * Actual number of objects the head was moved on 89 * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only 90 */ 91 static __rte_always_inline uint32_t 92 __rte_ring_rts_move_head(struct rte_ring_rts_headtail *d, 93 const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num, 94 enum rte_ring_queue_behavior behavior, uint32_t *old_head, 95 uint32_t *entries) 96 { 97 uint32_t n; 98 union __rte_ring_rts_poscnt nh, oh; 99 100 oh.raw = rte_atomic_load_explicit(&d->head.raw, 101 rte_memory_order_acquire); 102 103 do { 104 /* Reset n to the initial burst count */ 105 n = num; 106 107 /* 108 * wait for prod head/tail distance, 109 * make sure that we read prod head *before* 110 * reading cons tail. 111 */ 112 __rte_ring_rts_head_wait(d, &oh); 113 114 /* 115 * The subtraction is done between two unsigned 32bits value 116 * (the result is always modulo 32 bits even if we have 117 * *old_head > cons_tail). So 'entries' is always between 0 118 * and capacity (which is < size). 119 */ 120 *entries = capacity + s->tail - oh.val.pos; 121 122 /* check that we have enough room in ring */ 123 if (unlikely(n > *entries)) 124 n = (behavior == RTE_RING_QUEUE_FIXED) ? 125 0 : *entries; 126 127 if (n == 0) 128 break; 129 130 nh.val.pos = oh.val.pos + n; 131 nh.val.cnt = oh.val.cnt + 1; 132 133 /* 134 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: 135 * - OOO reads of cons tail value 136 * - OOO copy of elems to the ring 137 */ 138 } while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw, 139 (uint64_t *)(uintptr_t)&oh.raw, nh.raw, 140 rte_memory_order_acquire, 141 rte_memory_order_acquire) == 0); 142 143 *old_head = oh.val.pos; 144 return n; 145 } 146 147 /** 148 * @internal This function updates the producer head for enqueue. 149 */ 150 static __rte_always_inline uint32_t 151 __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, 152 enum rte_ring_queue_behavior behavior, uint32_t *old_head, 153 uint32_t *free_entries) 154 { 155 return __rte_ring_rts_move_head(&r->rts_prod, &r->cons, 156 r->capacity, num, behavior, old_head, free_entries); 157 } 158 159 /** 160 * @internal This function updates the consumer head for dequeue 161 */ 162 static __rte_always_inline unsigned int 163 __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num, 164 enum rte_ring_queue_behavior behavior, uint32_t *old_head, 165 uint32_t *entries) 166 { 167 return __rte_ring_rts_move_head(&r->rts_cons, &r->prod, 168 0, num, behavior, old_head, entries); 169 } 170 171 /** 172 * @internal Enqueue several objects on the RTS ring. 173 * 174 * @param r 175 * A pointer to the ring structure. 176 * @param obj_table 177 * A pointer to a table of objects. 178 * @param esize 179 * The size of ring element, in bytes. It must be a multiple of 4. 180 * This must be the same value used while creating the ring. Otherwise 181 * the results are undefined. 182 * @param n 183 * The number of objects to add in the ring from the obj_table. 184 * @param behavior 185 * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring 186 * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring 187 * @param free_space 188 * returns the amount of space after the enqueue operation has finished 189 * @return 190 * Actual number of objects enqueued. 191 * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. 192 */ 193 static __rte_always_inline unsigned int 194 __rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table, 195 uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, 196 uint32_t *free_space) 197 { 198 uint32_t free, head; 199 200 n = __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free); 201 202 if (n != 0) { 203 __rte_ring_enqueue_elems(r, head, obj_table, esize, n); 204 __rte_ring_rts_update_tail(&r->rts_prod); 205 } 206 207 if (free_space != NULL) 208 *free_space = free - n; 209 return n; 210 } 211 212 /** 213 * @internal Dequeue several objects from the RTS ring. 214 * 215 * @param r 216 * A pointer to the ring structure. 217 * @param obj_table 218 * A pointer to a table of objects. 219 * @param esize 220 * The size of ring element, in bytes. It must be a multiple of 4. 221 * This must be the same value used while creating the ring. Otherwise 222 * the results are undefined. 223 * @param n 224 * The number of objects to pull from the ring. 225 * @param behavior 226 * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring 227 * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring 228 * @param available 229 * returns the number of remaining ring entries after the dequeue has finished 230 * @return 231 * - Actual number of objects dequeued. 232 * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. 233 */ 234 static __rte_always_inline unsigned int 235 __rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table, 236 uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, 237 uint32_t *available) 238 { 239 uint32_t entries, head; 240 241 n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries); 242 243 if (n != 0) { 244 __rte_ring_dequeue_elems(r, head, obj_table, esize, n); 245 __rte_ring_rts_update_tail(&r->rts_cons); 246 } 247 248 if (available != NULL) 249 *available = entries - n; 250 return n; 251 } 252 253 #endif /* _RTE_RING_RTS_ELEM_PVT_H_ */ 254