xref: /dpdk/lib/ring/rte_ring_rts_elem_pvt.h (revision 3197a1ff2a2a6bef224cd51f835f135be3776f23)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_RTS_ELEM_PVT_H_
11 #define _RTE_RING_RTS_ELEM_PVT_H_
12 
13 /**
14  * @file rte_ring_rts_elem_pvt.h
15  * It is not recommended to include this file directly,
16  * include <rte_ring.h> instead.
17  * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
18  * For more information please refer to <rte_ring_rts.h>.
19  */
20 
21 /**
22  * @internal This function updates tail values.
23  */
24 static __rte_always_inline void
25 __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
26 {
27 	union __rte_ring_rts_poscnt h, ot, nt;
28 
29 	/*
30 	 * If there are other enqueues/dequeues in progress that
31 	 * might preceded us, then don't update tail with new value.
32 	 */
33 
34 	ot.raw = rte_atomic_load_explicit(&ht->tail.raw, rte_memory_order_acquire);
35 
36 	do {
37 		/* on 32-bit systems we have to do atomic read here */
38 		h.raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_relaxed);
39 
40 		nt.raw = ot.raw;
41 		if (++nt.val.cnt == h.val.cnt)
42 			nt.val.pos = h.val.pos;
43 
44 	} while (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
45 			(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
46 			rte_memory_order_release, rte_memory_order_acquire) == 0);
47 }
48 
49 /**
50  * @internal This function waits till head/tail distance wouldn't
51  * exceed pre-defined max value.
52  */
53 static __rte_always_inline void
54 __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
55 	union __rte_ring_rts_poscnt *h)
56 {
57 	uint32_t max;
58 
59 	max = ht->htd_max;
60 
61 	while (h->val.pos - ht->tail.val.pos > max) {
62 		rte_pause();
63 		h->raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_acquire);
64 	}
65 }
66 
67 /**
68  * @internal This is a helper function that moves the producer/consumer head
69  *
70  * @param d
71  *   A pointer to the headtail structure with head value to be moved
72  * @param s
73  *   A pointer to the counter-part headtail structure. Note that this
74  *   function only reads tail value from it
75  * @param capacity
76  *   Either ring capacity value (for producer), or zero (for consumer)
77  *   Indicates whether multi-thread safe path is needed or not
78  * @param num
79  *   The number of elements we want to move head value on
80  * @param behavior
81  *   RTE_RING_QUEUE_FIXED:    Move on a fixed number of items
82  *   RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
83  * @param old_head
84  *   Returns head value as it was before the move
85  * @param entries
86  *   Returns the number of ring entries available BEFORE head was moved
87  * @return
88  *   Actual number of objects the head was moved on
89  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
90  */
91 static __rte_always_inline uint32_t
92 __rte_ring_rts_move_head(struct rte_ring_rts_headtail *d,
93 	const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
94 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
95 	uint32_t *entries)
96 {
97 	uint32_t n;
98 	union __rte_ring_rts_poscnt nh, oh;
99 
100 	oh.raw = rte_atomic_load_explicit(&d->head.raw,
101 			rte_memory_order_acquire);
102 
103 	do {
104 		/* Reset n to the initial burst count */
105 		n = num;
106 
107 		/*
108 		 * wait for prod head/tail distance,
109 		 * make sure that we read prod head *before*
110 		 * reading cons tail.
111 		 */
112 		__rte_ring_rts_head_wait(d, &oh);
113 
114 		/*
115 		 *  The subtraction is done between two unsigned 32bits value
116 		 * (the result is always modulo 32 bits even if we have
117 		 * *old_head > cons_tail). So 'entries' is always between 0
118 		 * and capacity (which is < size).
119 		 */
120 		*entries = capacity + s->tail - oh.val.pos;
121 
122 		/* check that we have enough room in ring */
123 		if (unlikely(n > *entries))
124 			n = (behavior == RTE_RING_QUEUE_FIXED) ?
125 					0 : *entries;
126 
127 		if (n == 0)
128 			break;
129 
130 		nh.val.pos = oh.val.pos + n;
131 		nh.val.cnt = oh.val.cnt + 1;
132 
133 	/*
134 	 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
135 	 *  - OOO reads of cons tail value
136 	 *  - OOO copy of elems to the ring
137 	 */
138 	} while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw,
139 			(uint64_t *)(uintptr_t)&oh.raw, nh.raw,
140 			rte_memory_order_acquire,
141 			rte_memory_order_acquire) == 0);
142 
143 	*old_head = oh.val.pos;
144 	return n;
145 }
146 
147 /**
148  * @internal This function updates the producer head for enqueue.
149  */
150 static __rte_always_inline uint32_t
151 __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
152 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
153 	uint32_t *free_entries)
154 {
155 	return __rte_ring_rts_move_head(&r->rts_prod, &r->cons,
156 			r->capacity, num, behavior, old_head, free_entries);
157 }
158 
159 /**
160  * @internal This function updates the consumer head for dequeue
161  */
162 static __rte_always_inline unsigned int
163 __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
164 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
165 	uint32_t *entries)
166 {
167 	return __rte_ring_rts_move_head(&r->rts_cons, &r->prod,
168 			0, num, behavior, old_head, entries);
169 }
170 
171 /**
172  * @internal Enqueue several objects on the RTS ring.
173  *
174  * @param r
175  *   A pointer to the ring structure.
176  * @param obj_table
177  *   A pointer to a table of objects.
178  * @param esize
179  *   The size of ring element, in bytes. It must be a multiple of 4.
180  *   This must be the same value used while creating the ring. Otherwise
181  *   the results are undefined.
182  * @param n
183  *   The number of objects to add in the ring from the obj_table.
184  * @param behavior
185  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
186  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
187  * @param free_space
188  *   returns the amount of space after the enqueue operation has finished
189  * @return
190  *   Actual number of objects enqueued.
191  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
192  */
193 static __rte_always_inline unsigned int
194 __rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
195 	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
196 	uint32_t *free_space)
197 {
198 	uint32_t free, head;
199 
200 	n =  __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
201 
202 	if (n != 0) {
203 		__rte_ring_enqueue_elems(r, head, obj_table, esize, n);
204 		__rte_ring_rts_update_tail(&r->rts_prod);
205 	}
206 
207 	if (free_space != NULL)
208 		*free_space = free - n;
209 	return n;
210 }
211 
212 /**
213  * @internal Dequeue several objects from the RTS ring.
214  *
215  * @param r
216  *   A pointer to the ring structure.
217  * @param obj_table
218  *   A pointer to a table of objects.
219  * @param esize
220  *   The size of ring element, in bytes. It must be a multiple of 4.
221  *   This must be the same value used while creating the ring. Otherwise
222  *   the results are undefined.
223  * @param n
224  *   The number of objects to pull from the ring.
225  * @param behavior
226  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
227  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
228  * @param available
229  *   returns the number of remaining ring entries after the dequeue has finished
230  * @return
231  *   - Actual number of objects dequeued.
232  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
233  */
234 static __rte_always_inline unsigned int
235 __rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
236 	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
237 	uint32_t *available)
238 {
239 	uint32_t entries, head;
240 
241 	n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
242 
243 	if (n != 0) {
244 		__rte_ring_dequeue_elems(r, head, obj_table, esize, n);
245 		__rte_ring_rts_update_tail(&r->rts_cons);
246 	}
247 
248 	if (available != NULL)
249 		*available = entries - n;
250 	return n;
251 }
252 
253 #endif /* _RTE_RING_RTS_ELEM_PVT_H_ */
254