1 /* SPDX-License-Identifier: BSD-3-Clause 2 * 3 * Copyright (c) 2010-2020 Intel Corporation 4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org 5 * All rights reserved. 6 * Derived from FreeBSD's bufring.h 7 * Used as BSD-3 Licensed with permission from Kip Macy. 8 */ 9 10 #ifndef _RTE_RING_HTS_ELEM_PVT_H_ 11 #define _RTE_RING_HTS_ELEM_PVT_H_ 12 13 #include <rte_stdatomic.h> 14 15 /** 16 * @file rte_ring_hts_elem_pvt.h 17 * It is not recommended to include this file directly, 18 * include <rte_ring.h> instead. 19 * Contains internal helper functions for head/tail sync (HTS) ring mode. 20 * For more information please refer to <rte_ring_hts.h>. 21 */ 22 23 /** 24 * @internal update tail with new value. 25 */ 26 static __rte_always_inline void 27 __rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail, 28 uint32_t num, uint32_t enqueue) 29 { 30 uint32_t tail; 31 32 RTE_SET_USED(enqueue); 33 34 tail = old_tail + num; 35 rte_atomic_store_explicit(&ht->ht.pos.tail, tail, rte_memory_order_release); 36 } 37 38 /** 39 * @internal waits till tail will become equal to head. 40 * Means no writer/reader is active for that ring. 41 * Suppose to work as serialization point. 42 */ 43 static __rte_always_inline void 44 __rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht, 45 union __rte_ring_hts_pos *p) 46 { 47 while (p->pos.head != p->pos.tail) { 48 rte_pause(); 49 p->raw = rte_atomic_load_explicit(&ht->ht.raw, rte_memory_order_acquire); 50 } 51 } 52 53 /** 54 * @internal This is a helper function that moves the producer/consumer head 55 * 56 * @param d 57 * A pointer to the headtail structure with head value to be moved 58 * @param s 59 * A pointer to the counter-part headtail structure. Note that this 60 * function only reads tail value from it 61 * @param capacity 62 * Either ring capacity value (for producer), or zero (for consumer) 63 * Indicates whether multi-thread safe path is needed or not 64 * @param num 65 * The number of elements we want to move head value on 66 * @param behavior 67 * RTE_RING_QUEUE_FIXED: Move on a fixed number of items 68 * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible 69 * @param old_head 70 * Returns head value as it was before the move 71 * @param entries 72 * Returns the number of ring entries available BEFORE head was moved 73 * @return 74 * Actual number of objects the head was moved on 75 * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only 76 */ 77 static __rte_always_inline uint32_t 78 __rte_ring_hts_move_head(struct rte_ring_hts_headtail *d, 79 const struct rte_ring_headtail *s, uint32_t capacity, unsigned int num, 80 enum rte_ring_queue_behavior behavior, uint32_t *old_head, 81 uint32_t *entries) 82 { 83 uint32_t n; 84 union __rte_ring_hts_pos np, op; 85 86 op.raw = rte_atomic_load_explicit(&d->ht.raw, rte_memory_order_acquire); 87 88 do { 89 /* Reset n to the initial burst count */ 90 n = num; 91 92 /* 93 * wait for tail to be equal to head, 94 * make sure that we read prod head/tail *before* 95 * reading cons tail. 96 */ 97 __rte_ring_hts_head_wait(d, &op); 98 99 /* 100 * The subtraction is done between two unsigned 32bits value 101 * (the result is always modulo 32 bits even if we have 102 * *old_head > cons_tail). So 'entries' is always between 0 103 * and capacity (which is < size). 104 */ 105 *entries = capacity + s->tail - op.pos.head; 106 107 /* check that we have enough room in ring */ 108 if (unlikely(n > *entries)) 109 n = (behavior == RTE_RING_QUEUE_FIXED) ? 110 0 : *entries; 111 112 if (n == 0) 113 break; 114 115 np.pos.tail = op.pos.tail; 116 np.pos.head = op.pos.head + n; 117 118 /* 119 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: 120 * - OOO reads of cons tail value 121 * - OOO copy of elems from the ring 122 */ 123 } while (rte_atomic_compare_exchange_strong_explicit(&d->ht.raw, 124 (uint64_t *)(uintptr_t)&op.raw, np.raw, 125 rte_memory_order_acquire, 126 rte_memory_order_acquire) == 0); 127 128 *old_head = op.pos.head; 129 return n; 130 } 131 /** 132 * @internal This function updates the producer head for enqueue 133 */ 134 static __rte_always_inline unsigned int 135 __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, 136 enum rte_ring_queue_behavior behavior, uint32_t *old_head, 137 uint32_t *free_entries) 138 { 139 return __rte_ring_hts_move_head(&r->hts_prod, &r->cons, 140 r->capacity, num, behavior, old_head, free_entries); 141 } 142 143 /** 144 * @internal This function updates the consumer head for dequeue 145 */ 146 static __rte_always_inline unsigned int 147 __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, 148 enum rte_ring_queue_behavior behavior, uint32_t *old_head, 149 uint32_t *entries) 150 { 151 return __rte_ring_hts_move_head(&r->hts_cons, &r->prod, 152 0, num, behavior, old_head, entries); 153 } 154 155 /** 156 * @internal Enqueue several objects on the HTS ring. 157 * 158 * @param r 159 * A pointer to the ring structure. 160 * @param obj_table 161 * A pointer to a table of objects. 162 * @param esize 163 * The size of ring element, in bytes. It must be a multiple of 4. 164 * This must be the same value used while creating the ring. Otherwise 165 * the results are undefined. 166 * @param n 167 * The number of objects to add in the ring from the obj_table. 168 * @param behavior 169 * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring 170 * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring 171 * @param free_space 172 * returns the amount of space after the enqueue operation has finished 173 * @return 174 * Actual number of objects enqueued. 175 * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. 176 */ 177 static __rte_always_inline unsigned int 178 __rte_ring_do_hts_enqueue_elem(struct rte_ring *r, const void *obj_table, 179 uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, 180 uint32_t *free_space) 181 { 182 uint32_t free, head; 183 184 n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); 185 186 if (n != 0) { 187 __rte_ring_enqueue_elems(r, head, obj_table, esize, n); 188 __rte_ring_hts_update_tail(&r->hts_prod, head, n, 1); 189 } 190 191 if (free_space != NULL) 192 *free_space = free - n; 193 return n; 194 } 195 196 /** 197 * @internal Dequeue several objects from the HTS ring. 198 * 199 * @param r 200 * A pointer to the ring structure. 201 * @param obj_table 202 * A pointer to a table of objects. 203 * @param esize 204 * The size of ring element, in bytes. It must be a multiple of 4. 205 * This must be the same value used while creating the ring. Otherwise 206 * the results are undefined. 207 * @param n 208 * The number of objects to pull from the ring. 209 * @param behavior 210 * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring 211 * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring 212 * @param available 213 * returns the number of remaining ring entries after the dequeue has finished 214 * @return 215 * - Actual number of objects dequeued. 216 * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. 217 */ 218 static __rte_always_inline unsigned int 219 __rte_ring_do_hts_dequeue_elem(struct rte_ring *r, void *obj_table, 220 uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, 221 uint32_t *available) 222 { 223 uint32_t entries, head; 224 225 n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries); 226 227 if (n != 0) { 228 __rte_ring_dequeue_elems(r, head, obj_table, esize, n); 229 __rte_ring_hts_update_tail(&r->hts_cons, head, n, 0); 230 } 231 232 if (available != NULL) 233 *available = entries - n; 234 return n; 235 } 236 237 #endif /* _RTE_RING_HTS_ELEM_PVT_H_ */ 238