xref: /dpdk/lib/ring/soring.c (revision b5458e2cc48349b314c7354e4ddfd2100bd55c29)
1*b5458e2cSKonstantin Ananyev /* SPDX-License-Identifier: BSD-3-Clause
2*b5458e2cSKonstantin Ananyev  * Copyright(c) 2024 Huawei Technologies Co., Ltd
3*b5458e2cSKonstantin Ananyev  */
4*b5458e2cSKonstantin Ananyev 
5*b5458e2cSKonstantin Ananyev /**
6*b5458e2cSKonstantin Ananyev  * @file
7*b5458e2cSKonstantin Ananyev  * This file contains implementation of SORING 'datapath' functions.
8*b5458e2cSKonstantin Ananyev  *
9*b5458e2cSKonstantin Ananyev  * Brief description:
10*b5458e2cSKonstantin Ananyev  * ==================
11*b5458e2cSKonstantin Ananyev  * enqueue/dequeue works the same as for conventional rte_ring:
12*b5458e2cSKonstantin Ananyev  * any rte_ring sync types can be used, etc.
13*b5458e2cSKonstantin Ananyev  * Plus there could be multiple 'stages'.
14*b5458e2cSKonstantin Ananyev  * For each stage there is an acquire (start) and release (finish) operation.
15*b5458e2cSKonstantin Ananyev  * After some elems are 'acquired' - user can safely assume that he has
16*b5458e2cSKonstantin Ananyev  * exclusive possession of these elems till 'release' for them is done.
17*b5458e2cSKonstantin Ananyev  * Note that right now user has to release exactly the same number of elems
18*b5458e2cSKonstantin Ananyev  * he acquired before.
19*b5458e2cSKonstantin Ananyev  * After 'release', elems can be 'acquired' by next stage and/or dequeued
20*b5458e2cSKonstantin Ananyev  * (in case of last stage).
21*b5458e2cSKonstantin Ananyev  *
22*b5458e2cSKonstantin Ananyev  * Internal structure:
23*b5458e2cSKonstantin Ananyev  * ===================
24*b5458e2cSKonstantin Ananyev  * In addition to 'normal' ring of elems, it also has a ring of states of the
25*b5458e2cSKonstantin Ananyev  * same size. Each state[] corresponds to exactly one elem[].
26*b5458e2cSKonstantin Ananyev  * state[] will be used by acquire/release/dequeue functions to store internal
27*b5458e2cSKonstantin Ananyev  * information and should not be accessed by the user directly.
28*b5458e2cSKonstantin Ananyev  *
29*b5458e2cSKonstantin Ananyev  * How it works:
30*b5458e2cSKonstantin Ananyev  * =============
31*b5458e2cSKonstantin Ananyev  * 'acquire()' just moves stage's head (same as rte_ring move_head does),
32*b5458e2cSKonstantin Ananyev  * plus it saves in state[stage.cur_head] information about how many elems
33*b5458e2cSKonstantin Ananyev  * were acquired, current head position and special flag value to indicate
34*b5458e2cSKonstantin Ananyev  * that elems are acquired (SORING_ST_START).
35*b5458e2cSKonstantin Ananyev  * Note that 'acquire()' returns to the user a special 'ftoken' that user has
36*b5458e2cSKonstantin Ananyev  * to provide for 'release()' (in fact it is just a position for current head
37*b5458e2cSKonstantin Ananyev  * plus current stage index).
38*b5458e2cSKonstantin Ananyev  * 'release()' extracts old head value from provided ftoken and checks that
39*b5458e2cSKonstantin Ananyev  * corresponding 'state[]' contains expected values(mostly for sanity
40*b5458e2cSKonstantin Ananyev  * purposes).
41*b5458e2cSKonstantin Ananyev  * Then it marks this state[] with 'SORING_ST_FINISH' flag to indicate
42*b5458e2cSKonstantin Ananyev  * that given subset of objects was released.
43*b5458e2cSKonstantin Ananyev  * After that, it checks does old head value equals to current tail value?
44*b5458e2cSKonstantin Ananyev  * If yes, then it performs  'finalize()' operation, otherwise 'release()'
45*b5458e2cSKonstantin Ananyev  * just returns (without spinning on stage tail value).
46*b5458e2cSKonstantin Ananyev  * As updated state[] is shared by all threads, some other thread can do
47*b5458e2cSKonstantin Ananyev  * 'finalize()' for given stage.
48*b5458e2cSKonstantin Ananyev  * That allows 'release()' to avoid excessive waits on the tail value.
49*b5458e2cSKonstantin Ananyev  * Main purpose of 'finalize()' operation is to walk through 'state[]'
50*b5458e2cSKonstantin Ananyev  * from current stage tail up to its head, check state[] and move stage tail
51*b5458e2cSKonstantin Ananyev  * through elements that already are in SORING_ST_FINISH state.
52*b5458e2cSKonstantin Ananyev  * Along with that, corresponding state[] values are reset to zero.
53*b5458e2cSKonstantin Ananyev  * Note that 'finalize()' for given stage can be done from multiple places:
54*b5458e2cSKonstantin Ananyev  * 'release()' for that stage or from 'acquire()' for next stage
55*b5458e2cSKonstantin Ananyev  * even from consumer's 'dequeue()' - in case given stage is the last one.
56*b5458e2cSKonstantin Ananyev  * So 'finalize()' has to be MT-safe and inside it we have to
57*b5458e2cSKonstantin Ananyev  * guarantee that only one thread will update state[] and stage's tail values.
58*b5458e2cSKonstantin Ananyev  */
59*b5458e2cSKonstantin Ananyev 
60*b5458e2cSKonstantin Ananyev #include "soring.h"
61*b5458e2cSKonstantin Ananyev 
62*b5458e2cSKonstantin Ananyev /*
63*b5458e2cSKonstantin Ananyev  * Inline functions (fastpath) start here.
64*b5458e2cSKonstantin Ananyev  */
65*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t
66*b5458e2cSKonstantin Ananyev __rte_soring_stage_finalize(struct soring_stage_headtail *sht, uint32_t stage,
67*b5458e2cSKonstantin Ananyev 	union soring_state *rstate, uint32_t rmask, uint32_t maxn)
68*b5458e2cSKonstantin Ananyev {
69*b5458e2cSKonstantin Ananyev 	int32_t rc;
70*b5458e2cSKonstantin Ananyev 	uint32_t ftkn, head, i, idx, k, n, tail;
71*b5458e2cSKonstantin Ananyev 	union soring_stage_tail nt, ot;
72*b5458e2cSKonstantin Ananyev 	union soring_state st;
73*b5458e2cSKonstantin Ananyev 
74*b5458e2cSKonstantin Ananyev 	/* try to grab exclusive right to update tail value */
75*b5458e2cSKonstantin Ananyev 	ot.raw = rte_atomic_load_explicit(&sht->tail.raw,
76*b5458e2cSKonstantin Ananyev 			rte_memory_order_acquire);
77*b5458e2cSKonstantin Ananyev 
78*b5458e2cSKonstantin Ananyev 	/* other thread already finalizing it for us */
79*b5458e2cSKonstantin Ananyev 	if (ot.sync != 0)
80*b5458e2cSKonstantin Ananyev 		return 0;
81*b5458e2cSKonstantin Ananyev 
82*b5458e2cSKonstantin Ananyev 	nt.pos = ot.pos;
83*b5458e2cSKonstantin Ananyev 	nt.sync = 1;
84*b5458e2cSKonstantin Ananyev 	rc = rte_atomic_compare_exchange_strong_explicit(&sht->tail.raw,
85*b5458e2cSKonstantin Ananyev 		(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
86*b5458e2cSKonstantin Ananyev 		rte_memory_order_release, rte_memory_order_relaxed);
87*b5458e2cSKonstantin Ananyev 
88*b5458e2cSKonstantin Ananyev 	/* other thread won the race */
89*b5458e2cSKonstantin Ananyev 	if (rc == 0)
90*b5458e2cSKonstantin Ananyev 		return 0;
91*b5458e2cSKonstantin Ananyev 
92*b5458e2cSKonstantin Ananyev 	/* Ensure the head is read before rstate[] */
93*b5458e2cSKonstantin Ananyev 	head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
94*b5458e2cSKonstantin Ananyev 	rte_atomic_thread_fence(rte_memory_order_acquire);
95*b5458e2cSKonstantin Ananyev 
96*b5458e2cSKonstantin Ananyev 	/*
97*b5458e2cSKonstantin Ananyev 	 * start with current tail and walk through states that are
98*b5458e2cSKonstantin Ananyev 	 * already finished.
99*b5458e2cSKonstantin Ananyev 	 */
100*b5458e2cSKonstantin Ananyev 
101*b5458e2cSKonstantin Ananyev 	n = RTE_MIN(head - ot.pos, maxn);
102*b5458e2cSKonstantin Ananyev 	for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
103*b5458e2cSKonstantin Ananyev 
104*b5458e2cSKonstantin Ananyev 		idx = tail & rmask;
105*b5458e2cSKonstantin Ananyev 		ftkn = SORING_FTKN_MAKE(tail, stage);
106*b5458e2cSKonstantin Ananyev 
107*b5458e2cSKonstantin Ananyev 		st.raw = rte_atomic_load_explicit(&rstate[idx].raw,
108*b5458e2cSKonstantin Ananyev 			rte_memory_order_relaxed);
109*b5458e2cSKonstantin Ananyev 		if ((st.stnum & SORING_ST_MASK) != SORING_ST_FINISH ||
110*b5458e2cSKonstantin Ananyev 				st.ftoken != ftkn)
111*b5458e2cSKonstantin Ananyev 			break;
112*b5458e2cSKonstantin Ananyev 
113*b5458e2cSKonstantin Ananyev 		k = st.stnum & ~SORING_ST_MASK;
114*b5458e2cSKonstantin Ananyev 		rte_atomic_store_explicit(&rstate[idx].raw, 0,
115*b5458e2cSKonstantin Ananyev 				rte_memory_order_relaxed);
116*b5458e2cSKonstantin Ananyev 	}
117*b5458e2cSKonstantin Ananyev 
118*b5458e2cSKonstantin Ananyev 
119*b5458e2cSKonstantin Ananyev 	/* release exclusive right to update along with new tail value */
120*b5458e2cSKonstantin Ananyev 	ot.pos = tail;
121*b5458e2cSKonstantin Ananyev 	rte_atomic_store_explicit(&sht->tail.raw, ot.raw,
122*b5458e2cSKonstantin Ananyev 			rte_memory_order_release);
123*b5458e2cSKonstantin Ananyev 
124*b5458e2cSKonstantin Ananyev 	return i;
125*b5458e2cSKonstantin Ananyev }
126*b5458e2cSKonstantin Ananyev 
127*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t
128*b5458e2cSKonstantin Ananyev __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
129*b5458e2cSKonstantin Ananyev 	enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
130*b5458e2cSKonstantin Ananyev 	uint32_t *head, uint32_t *next, uint32_t *free)
131*b5458e2cSKonstantin Ananyev {
132*b5458e2cSKonstantin Ananyev 	uint32_t n;
133*b5458e2cSKonstantin Ananyev 
134*b5458e2cSKonstantin Ananyev 	switch (st) {
135*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_ST:
136*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_MT:
137*b5458e2cSKonstantin Ananyev 		n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
138*b5458e2cSKonstantin Ananyev 			r->capacity, st, num, behavior, head, next, free);
139*b5458e2cSKonstantin Ananyev 		break;
140*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_MT_RTS:
141*b5458e2cSKonstantin Ananyev 		n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
142*b5458e2cSKonstantin Ananyev 			r->capacity, num, behavior, head, free);
143*b5458e2cSKonstantin Ananyev 		*next = *head + n;
144*b5458e2cSKonstantin Ananyev 		break;
145*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_MT_HTS:
146*b5458e2cSKonstantin Ananyev 		n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
147*b5458e2cSKonstantin Ananyev 			r->capacity, num, behavior, head, free);
148*b5458e2cSKonstantin Ananyev 		*next = *head + n;
149*b5458e2cSKonstantin Ananyev 		break;
150*b5458e2cSKonstantin Ananyev 	default:
151*b5458e2cSKonstantin Ananyev 		/* unsupported mode, shouldn't be here */
152*b5458e2cSKonstantin Ananyev 		RTE_ASSERT(0);
153*b5458e2cSKonstantin Ananyev 		*free = 0;
154*b5458e2cSKonstantin Ananyev 		n = 0;
155*b5458e2cSKonstantin Ananyev 	}
156*b5458e2cSKonstantin Ananyev 
157*b5458e2cSKonstantin Ananyev 	return n;
158*b5458e2cSKonstantin Ananyev }
159*b5458e2cSKonstantin Ananyev 
160*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t
161*b5458e2cSKonstantin Ananyev __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
162*b5458e2cSKonstantin Ananyev 	enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
163*b5458e2cSKonstantin Ananyev 	uint32_t *head, uint32_t *next, uint32_t *avail)
164*b5458e2cSKonstantin Ananyev {
165*b5458e2cSKonstantin Ananyev 	uint32_t n;
166*b5458e2cSKonstantin Ananyev 
167*b5458e2cSKonstantin Ananyev 	switch (st) {
168*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_ST:
169*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_MT:
170*b5458e2cSKonstantin Ananyev 		n = __rte_ring_headtail_move_head(&r->cons.ht,
171*b5458e2cSKonstantin Ananyev 			&r->stage[stage].ht, 0, st, num, behavior,
172*b5458e2cSKonstantin Ananyev 			head, next, avail);
173*b5458e2cSKonstantin Ananyev 		break;
174*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_MT_RTS:
175*b5458e2cSKonstantin Ananyev 		n = __rte_ring_rts_move_head(&r->cons.rts, &r->stage[stage].ht,
176*b5458e2cSKonstantin Ananyev 			0, num, behavior, head, avail);
177*b5458e2cSKonstantin Ananyev 		*next = *head + n;
178*b5458e2cSKonstantin Ananyev 		break;
179*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_MT_HTS:
180*b5458e2cSKonstantin Ananyev 		n = __rte_ring_hts_move_head(&r->cons.hts, &r->stage[stage].ht,
181*b5458e2cSKonstantin Ananyev 			0, num, behavior, head, avail);
182*b5458e2cSKonstantin Ananyev 		*next = *head + n;
183*b5458e2cSKonstantin Ananyev 		break;
184*b5458e2cSKonstantin Ananyev 	default:
185*b5458e2cSKonstantin Ananyev 		/* unsupported mode, shouldn't be here */
186*b5458e2cSKonstantin Ananyev 		RTE_ASSERT(0);
187*b5458e2cSKonstantin Ananyev 		*avail = 0;
188*b5458e2cSKonstantin Ananyev 		n = 0;
189*b5458e2cSKonstantin Ananyev 	}
190*b5458e2cSKonstantin Ananyev 
191*b5458e2cSKonstantin Ananyev 	return n;
192*b5458e2cSKonstantin Ananyev }
193*b5458e2cSKonstantin Ananyev 
194*b5458e2cSKonstantin Ananyev static __rte_always_inline void
195*b5458e2cSKonstantin Ananyev __rte_soring_update_tail(struct __rte_ring_headtail *rht,
196*b5458e2cSKonstantin Ananyev 	enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq)
197*b5458e2cSKonstantin Ananyev {
198*b5458e2cSKonstantin Ananyev 	uint32_t n;
199*b5458e2cSKonstantin Ananyev 
200*b5458e2cSKonstantin Ananyev 	switch (st) {
201*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_ST:
202*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_MT:
203*b5458e2cSKonstantin Ananyev 		__rte_ring_update_tail(&rht->ht, head, next, st, enq);
204*b5458e2cSKonstantin Ananyev 		break;
205*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_MT_RTS:
206*b5458e2cSKonstantin Ananyev 		__rte_ring_rts_update_tail(&rht->rts);
207*b5458e2cSKonstantin Ananyev 		break;
208*b5458e2cSKonstantin Ananyev 	case RTE_RING_SYNC_MT_HTS:
209*b5458e2cSKonstantin Ananyev 		n = next - head;
210*b5458e2cSKonstantin Ananyev 		__rte_ring_hts_update_tail(&rht->hts, head, n, enq);
211*b5458e2cSKonstantin Ananyev 		break;
212*b5458e2cSKonstantin Ananyev 	default:
213*b5458e2cSKonstantin Ananyev 		/* unsupported mode, shouldn't be here */
214*b5458e2cSKonstantin Ananyev 		RTE_ASSERT(0);
215*b5458e2cSKonstantin Ananyev 	}
216*b5458e2cSKonstantin Ananyev }
217*b5458e2cSKonstantin Ananyev 
218*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t
219*b5458e2cSKonstantin Ananyev __rte_soring_stage_move_head(struct soring_stage_headtail *d,
220*b5458e2cSKonstantin Ananyev 	const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
221*b5458e2cSKonstantin Ananyev 	enum rte_ring_queue_behavior behavior,
222*b5458e2cSKonstantin Ananyev 	uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
223*b5458e2cSKonstantin Ananyev {
224*b5458e2cSKonstantin Ananyev 	uint32_t n, tail;
225*b5458e2cSKonstantin Ananyev 
226*b5458e2cSKonstantin Ananyev 	*old_head = rte_atomic_load_explicit(&d->head,
227*b5458e2cSKonstantin Ananyev 			rte_memory_order_relaxed);
228*b5458e2cSKonstantin Ananyev 
229*b5458e2cSKonstantin Ananyev 	do {
230*b5458e2cSKonstantin Ananyev 		n = num;
231*b5458e2cSKonstantin Ananyev 
232*b5458e2cSKonstantin Ananyev 		/* Ensure the head is read before tail */
233*b5458e2cSKonstantin Ananyev 		rte_atomic_thread_fence(rte_memory_order_acquire);
234*b5458e2cSKonstantin Ananyev 
235*b5458e2cSKonstantin Ananyev 		tail = rte_atomic_load_explicit(&s->tail,
236*b5458e2cSKonstantin Ananyev 				rte_memory_order_acquire);
237*b5458e2cSKonstantin Ananyev 		*avail = capacity + tail - *old_head;
238*b5458e2cSKonstantin Ananyev 		if (n > *avail)
239*b5458e2cSKonstantin Ananyev 			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
240*b5458e2cSKonstantin Ananyev 		if (n == 0)
241*b5458e2cSKonstantin Ananyev 			return 0;
242*b5458e2cSKonstantin Ananyev 		*new_head = *old_head + n;
243*b5458e2cSKonstantin Ananyev 	} while (rte_atomic_compare_exchange_strong_explicit(&d->head,
244*b5458e2cSKonstantin Ananyev 			old_head, *new_head, rte_memory_order_acq_rel,
245*b5458e2cSKonstantin Ananyev 			rte_memory_order_relaxed) == 0);
246*b5458e2cSKonstantin Ananyev 
247*b5458e2cSKonstantin Ananyev 	return n;
248*b5458e2cSKonstantin Ananyev }
249*b5458e2cSKonstantin Ananyev 
250*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t
251*b5458e2cSKonstantin Ananyev soring_enqueue(struct rte_soring *r, const void *objs,
252*b5458e2cSKonstantin Ananyev 	const void *meta, uint32_t n, enum rte_ring_queue_behavior behavior,
253*b5458e2cSKonstantin Ananyev 	uint32_t *free_space)
254*b5458e2cSKonstantin Ananyev {
255*b5458e2cSKonstantin Ananyev 	enum rte_ring_sync_type st;
256*b5458e2cSKonstantin Ananyev 	uint32_t nb_free, prod_head, prod_next;
257*b5458e2cSKonstantin Ananyev 
258*b5458e2cSKonstantin Ananyev 	RTE_ASSERT(r != NULL && r->nb_stage > 0);
259*b5458e2cSKonstantin Ananyev 	RTE_ASSERT(meta == NULL || r->meta != NULL);
260*b5458e2cSKonstantin Ananyev 
261*b5458e2cSKonstantin Ananyev 	st = r->prod.ht.sync_type;
262*b5458e2cSKonstantin Ananyev 
263*b5458e2cSKonstantin Ananyev 	n = __rte_soring_move_prod_head(r, n, behavior, st,
264*b5458e2cSKonstantin Ananyev 			&prod_head, &prod_next, &nb_free);
265*b5458e2cSKonstantin Ananyev 	if (n != 0) {
266*b5458e2cSKonstantin Ananyev 		__rte_ring_do_enqueue_elems(&r[1], objs, r->size,
267*b5458e2cSKonstantin Ananyev 			prod_head & r->mask, r->esize, n);
268*b5458e2cSKonstantin Ananyev 		if (meta != NULL)
269*b5458e2cSKonstantin Ananyev 			__rte_ring_do_enqueue_elems(r->meta, meta, r->size,
270*b5458e2cSKonstantin Ananyev 				prod_head & r->mask, r->msize, n);
271*b5458e2cSKonstantin Ananyev 		__rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1);
272*b5458e2cSKonstantin Ananyev 	}
273*b5458e2cSKonstantin Ananyev 
274*b5458e2cSKonstantin Ananyev 	if (free_space != NULL)
275*b5458e2cSKonstantin Ananyev 		*free_space = nb_free - n;
276*b5458e2cSKonstantin Ananyev 	return n;
277*b5458e2cSKonstantin Ananyev }
278*b5458e2cSKonstantin Ananyev 
279*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t
280*b5458e2cSKonstantin Ananyev soring_dequeue(struct rte_soring *r, void *objs, void *meta,
281*b5458e2cSKonstantin Ananyev 	uint32_t num, enum rte_ring_queue_behavior behavior,
282*b5458e2cSKonstantin Ananyev 	uint32_t *available)
283*b5458e2cSKonstantin Ananyev {
284*b5458e2cSKonstantin Ananyev 	enum rte_ring_sync_type st;
285*b5458e2cSKonstantin Ananyev 	uint32_t entries, cons_head, cons_next, n, ns, reqn;
286*b5458e2cSKonstantin Ananyev 
287*b5458e2cSKonstantin Ananyev 	RTE_ASSERT(r != NULL && r->nb_stage > 0);
288*b5458e2cSKonstantin Ananyev 	RTE_ASSERT(meta == NULL || r->meta != NULL);
289*b5458e2cSKonstantin Ananyev 
290*b5458e2cSKonstantin Ananyev 	ns = r->nb_stage - 1;
291*b5458e2cSKonstantin Ananyev 	st = r->cons.ht.sync_type;
292*b5458e2cSKonstantin Ananyev 
293*b5458e2cSKonstantin Ananyev 	/* try to grab exactly @num elems first */
294*b5458e2cSKonstantin Ananyev 	n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st,
295*b5458e2cSKonstantin Ananyev 			&cons_head, &cons_next, &entries);
296*b5458e2cSKonstantin Ananyev 	if (n == 0) {
297*b5458e2cSKonstantin Ananyev 		/* try to finalize some elems from previous stage */
298*b5458e2cSKonstantin Ananyev 		n = __rte_soring_stage_finalize(&r->stage[ns].sht, ns,
299*b5458e2cSKonstantin Ananyev 			r->state, r->mask, 2 * num);
300*b5458e2cSKonstantin Ananyev 		entries += n;
301*b5458e2cSKonstantin Ananyev 
302*b5458e2cSKonstantin Ananyev 		/* repeat attempt to grab elems */
303*b5458e2cSKonstantin Ananyev 		reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
304*b5458e2cSKonstantin Ananyev 		if (entries >= reqn)
305*b5458e2cSKonstantin Ananyev 			n = __rte_soring_move_cons_head(r, ns, num, behavior,
306*b5458e2cSKonstantin Ananyev 				st, &cons_head, &cons_next, &entries);
307*b5458e2cSKonstantin Ananyev 		else
308*b5458e2cSKonstantin Ananyev 			n = 0;
309*b5458e2cSKonstantin Ananyev 	}
310*b5458e2cSKonstantin Ananyev 
311*b5458e2cSKonstantin Ananyev 	/* we have some elems to consume */
312*b5458e2cSKonstantin Ananyev 	if (n != 0) {
313*b5458e2cSKonstantin Ananyev 		__rte_ring_do_dequeue_elems(objs, &r[1], r->size,
314*b5458e2cSKonstantin Ananyev 			cons_head & r->mask, r->esize, n);
315*b5458e2cSKonstantin Ananyev 		if (meta != NULL)
316*b5458e2cSKonstantin Ananyev 			__rte_ring_do_dequeue_elems(meta, r->meta, r->size,
317*b5458e2cSKonstantin Ananyev 				cons_head & r->mask, r->msize, n);
318*b5458e2cSKonstantin Ananyev 		__rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0);
319*b5458e2cSKonstantin Ananyev 	}
320*b5458e2cSKonstantin Ananyev 
321*b5458e2cSKonstantin Ananyev 	if (available != NULL)
322*b5458e2cSKonstantin Ananyev 		*available = entries - n;
323*b5458e2cSKonstantin Ananyev 	return n;
324*b5458e2cSKonstantin Ananyev }
325*b5458e2cSKonstantin Ananyev 
326*b5458e2cSKonstantin Ananyev /*
327*b5458e2cSKonstantin Ananyev  * Verify internal SORING state.
328*b5458e2cSKonstantin Ananyev  * WARNING: if expected value is not equal to actual one, it means that for
329*b5458e2cSKonstantin Ananyev  * whatever reason SORING data constancy is broken. That is a very serious
330*b5458e2cSKonstantin Ananyev  * problem that most likely will cause race-conditions, memory corruption,
331*b5458e2cSKonstantin Ananyev  * program crash.
332*b5458e2cSKonstantin Ananyev  * To ease debugging it user might rebuild ring library with
333*b5458e2cSKonstantin Ananyev  * RTE_SORING_DEBUG enabled.
334*b5458e2cSKonstantin Ananyev  */
335*b5458e2cSKonstantin Ananyev static __rte_always_inline void
336*b5458e2cSKonstantin Ananyev soring_verify_state(const struct rte_soring *r, uint32_t stage, uint32_t idx,
337*b5458e2cSKonstantin Ananyev 	const char *msg, union soring_state val,  union soring_state exp)
338*b5458e2cSKonstantin Ananyev {
339*b5458e2cSKonstantin Ananyev 	if (val.raw != exp.raw) {
340*b5458e2cSKonstantin Ananyev #ifdef RTE_SORING_DEBUG
341*b5458e2cSKonstantin Ananyev 		rte_soring_dump(stderr, r);
342*b5458e2cSKonstantin Ananyev 		rte_panic("line:%d from:%s: soring=%p, stage=%#x, idx=%#x, "
343*b5458e2cSKonstantin Ananyev 			"expected={.stnum=%#x, .ftoken=%#x}, "
344*b5458e2cSKonstantin Ananyev 			"actual={.stnum=%#x, .ftoken=%#x};\n",
345*b5458e2cSKonstantin Ananyev 			__LINE__, msg, r, stage, idx,
346*b5458e2cSKonstantin Ananyev 			exp.stnum, exp.ftoken,
347*b5458e2cSKonstantin Ananyev 			val.stnum, val.ftoken);
348*b5458e2cSKonstantin Ananyev #else
349*b5458e2cSKonstantin Ananyev 		SORING_LOG(EMERG, "from:%s: soring=%p, stage=%#x, idx=%#x, "
350*b5458e2cSKonstantin Ananyev 			"expected={.stnum=%#x, .ftoken=%#x}, "
351*b5458e2cSKonstantin Ananyev 			"actual={.stnum=%#x, .ftoken=%#x};",
352*b5458e2cSKonstantin Ananyev 			msg, r, stage, idx,
353*b5458e2cSKonstantin Ananyev 			exp.stnum, exp.ftoken,
354*b5458e2cSKonstantin Ananyev 			val.stnum, val.ftoken);
355*b5458e2cSKonstantin Ananyev #endif
356*b5458e2cSKonstantin Ananyev 	}
357*b5458e2cSKonstantin Ananyev }
358*b5458e2cSKonstantin Ananyev 
359*b5458e2cSKonstantin Ananyev /* check and update state ring at acquire op*/
360*b5458e2cSKonstantin Ananyev static __rte_always_inline void
361*b5458e2cSKonstantin Ananyev acquire_state_update(const struct rte_soring *r, uint32_t stage, uint32_t idx,
362*b5458e2cSKonstantin Ananyev 	uint32_t ftoken, uint32_t num)
363*b5458e2cSKonstantin Ananyev {
364*b5458e2cSKonstantin Ananyev 	union soring_state st;
365*b5458e2cSKonstantin Ananyev 	const union soring_state est = {.raw = 0};
366*b5458e2cSKonstantin Ananyev 
367*b5458e2cSKonstantin Ananyev 	st.raw = rte_atomic_load_explicit(&r->state[idx].raw,
368*b5458e2cSKonstantin Ananyev 			rte_memory_order_relaxed);
369*b5458e2cSKonstantin Ananyev 	soring_verify_state(r, stage, idx, __func__, st, est);
370*b5458e2cSKonstantin Ananyev 
371*b5458e2cSKonstantin Ananyev 	st.ftoken = ftoken;
372*b5458e2cSKonstantin Ananyev 	st.stnum = (SORING_ST_START | num);
373*b5458e2cSKonstantin Ananyev 
374*b5458e2cSKonstantin Ananyev 	rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
375*b5458e2cSKonstantin Ananyev 			rte_memory_order_relaxed);
376*b5458e2cSKonstantin Ananyev }
377*b5458e2cSKonstantin Ananyev 
378*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t
379*b5458e2cSKonstantin Ananyev soring_acquire(struct rte_soring *r, void *objs, void *meta,
380*b5458e2cSKonstantin Ananyev 	uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
381*b5458e2cSKonstantin Ananyev 	uint32_t *ftoken, uint32_t *available)
382*b5458e2cSKonstantin Ananyev {
383*b5458e2cSKonstantin Ananyev 	uint32_t avail, head, idx, n, next, reqn;
384*b5458e2cSKonstantin Ananyev 	struct soring_stage *pstg;
385*b5458e2cSKonstantin Ananyev 	struct soring_stage_headtail *cons;
386*b5458e2cSKonstantin Ananyev 
387*b5458e2cSKonstantin Ananyev 	RTE_ASSERT(r != NULL && stage < r->nb_stage);
388*b5458e2cSKonstantin Ananyev 	RTE_ASSERT(meta == NULL || r->meta != NULL);
389*b5458e2cSKonstantin Ananyev 
390*b5458e2cSKonstantin Ananyev 	cons = &r->stage[stage].sht;
391*b5458e2cSKonstantin Ananyev 
392*b5458e2cSKonstantin Ananyev 	if (stage == 0)
393*b5458e2cSKonstantin Ananyev 		n = __rte_soring_stage_move_head(cons, &r->prod.ht, 0, num,
394*b5458e2cSKonstantin Ananyev 			behavior, &head, &next, &avail);
395*b5458e2cSKonstantin Ananyev 	else {
396*b5458e2cSKonstantin Ananyev 		pstg = r->stage + stage - 1;
397*b5458e2cSKonstantin Ananyev 
398*b5458e2cSKonstantin Ananyev 		/* try to grab exactly @num elems */
399*b5458e2cSKonstantin Ananyev 		n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num,
400*b5458e2cSKonstantin Ananyev 			RTE_RING_QUEUE_FIXED, &head, &next, &avail);
401*b5458e2cSKonstantin Ananyev 		if (n == 0) {
402*b5458e2cSKonstantin Ananyev 			/* try to finalize some elems from previous stage */
403*b5458e2cSKonstantin Ananyev 			n = __rte_soring_stage_finalize(&pstg->sht, stage - 1,
404*b5458e2cSKonstantin Ananyev 				r->state, r->mask, 2 * num);
405*b5458e2cSKonstantin Ananyev 			avail += n;
406*b5458e2cSKonstantin Ananyev 
407*b5458e2cSKonstantin Ananyev 			/* repeat attempt to grab elems */
408*b5458e2cSKonstantin Ananyev 			reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
409*b5458e2cSKonstantin Ananyev 			if (avail >= reqn)
410*b5458e2cSKonstantin Ananyev 				n = __rte_soring_stage_move_head(cons,
411*b5458e2cSKonstantin Ananyev 					&pstg->ht, 0, num, behavior, &head,
412*b5458e2cSKonstantin Ananyev 					&next, &avail);
413*b5458e2cSKonstantin Ananyev 			else
414*b5458e2cSKonstantin Ananyev 				n = 0;
415*b5458e2cSKonstantin Ananyev 		}
416*b5458e2cSKonstantin Ananyev 	}
417*b5458e2cSKonstantin Ananyev 
418*b5458e2cSKonstantin Ananyev 	if (n != 0) {
419*b5458e2cSKonstantin Ananyev 
420*b5458e2cSKonstantin Ananyev 		idx = head & r->mask;
421*b5458e2cSKonstantin Ananyev 		*ftoken = SORING_FTKN_MAKE(head, stage);
422*b5458e2cSKonstantin Ananyev 
423*b5458e2cSKonstantin Ananyev 		/* check and update state value */
424*b5458e2cSKonstantin Ananyev 		acquire_state_update(r, stage, idx, *ftoken, n);
425*b5458e2cSKonstantin Ananyev 
426*b5458e2cSKonstantin Ananyev 		/* copy elems that are ready for given stage */
427*b5458e2cSKonstantin Ananyev 		__rte_ring_do_dequeue_elems(objs, &r[1], r->size, idx,
428*b5458e2cSKonstantin Ananyev 				r->esize, n);
429*b5458e2cSKonstantin Ananyev 		if (meta != NULL)
430*b5458e2cSKonstantin Ananyev 			__rte_ring_do_dequeue_elems(meta, r->meta,
431*b5458e2cSKonstantin Ananyev 				r->size, idx, r->msize, n);
432*b5458e2cSKonstantin Ananyev 	}
433*b5458e2cSKonstantin Ananyev 
434*b5458e2cSKonstantin Ananyev 	if (available != NULL)
435*b5458e2cSKonstantin Ananyev 		*available = avail - n;
436*b5458e2cSKonstantin Ananyev 	return n;
437*b5458e2cSKonstantin Ananyev }
438*b5458e2cSKonstantin Ananyev 
439*b5458e2cSKonstantin Ananyev static __rte_always_inline void
440*b5458e2cSKonstantin Ananyev soring_release(struct rte_soring *r, const void *objs,
441*b5458e2cSKonstantin Ananyev 	const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken)
442*b5458e2cSKonstantin Ananyev {
443*b5458e2cSKonstantin Ananyev 	uint32_t idx, pos, tail;
444*b5458e2cSKonstantin Ananyev 	struct soring_stage *stg;
445*b5458e2cSKonstantin Ananyev 	union soring_state st;
446*b5458e2cSKonstantin Ananyev 
447*b5458e2cSKonstantin Ananyev 	const union soring_state est = {
448*b5458e2cSKonstantin Ananyev 		.stnum = (SORING_ST_START | n),
449*b5458e2cSKonstantin Ananyev 		.ftoken = ftoken,
450*b5458e2cSKonstantin Ananyev 	};
451*b5458e2cSKonstantin Ananyev 
452*b5458e2cSKonstantin Ananyev 	RTE_ASSERT(r != NULL && stage < r->nb_stage);
453*b5458e2cSKonstantin Ananyev 	RTE_ASSERT(meta == NULL || r->meta != NULL);
454*b5458e2cSKonstantin Ananyev 
455*b5458e2cSKonstantin Ananyev 	stg = r->stage + stage;
456*b5458e2cSKonstantin Ananyev 
457*b5458e2cSKonstantin Ananyev 	pos = SORING_FTKN_POS(ftoken, stage);
458*b5458e2cSKonstantin Ananyev 	idx = pos & r->mask;
459*b5458e2cSKonstantin Ananyev 	st.raw = rte_atomic_load_explicit(&r->state[idx].raw,
460*b5458e2cSKonstantin Ananyev 			rte_memory_order_relaxed);
461*b5458e2cSKonstantin Ananyev 
462*b5458e2cSKonstantin Ananyev 	/* check state ring contents */
463*b5458e2cSKonstantin Ananyev 	soring_verify_state(r, stage, idx, __func__, st, est);
464*b5458e2cSKonstantin Ananyev 
465*b5458e2cSKonstantin Ananyev 	/* update contents of the ring, if necessary */
466*b5458e2cSKonstantin Ananyev 	if (objs != NULL)
467*b5458e2cSKonstantin Ananyev 		__rte_ring_do_enqueue_elems(&r[1], objs, r->size, idx,
468*b5458e2cSKonstantin Ananyev 			r->esize, n);
469*b5458e2cSKonstantin Ananyev 	if (meta != NULL)
470*b5458e2cSKonstantin Ananyev 		__rte_ring_do_enqueue_elems(r->meta, meta, r->size, idx,
471*b5458e2cSKonstantin Ananyev 			r->msize, n);
472*b5458e2cSKonstantin Ananyev 
473*b5458e2cSKonstantin Ananyev 	/* set state to FINISH, make sure it is not reordered */
474*b5458e2cSKonstantin Ananyev 	rte_atomic_thread_fence(rte_memory_order_release);
475*b5458e2cSKonstantin Ananyev 
476*b5458e2cSKonstantin Ananyev 	st.stnum = SORING_ST_FINISH | n;
477*b5458e2cSKonstantin Ananyev 	rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
478*b5458e2cSKonstantin Ananyev 			rte_memory_order_relaxed);
479*b5458e2cSKonstantin Ananyev 
480*b5458e2cSKonstantin Ananyev 	/* try to do finalize(), if appropriate */
481*b5458e2cSKonstantin Ananyev 	tail = rte_atomic_load_explicit(&stg->sht.tail.pos,
482*b5458e2cSKonstantin Ananyev 			rte_memory_order_relaxed);
483*b5458e2cSKonstantin Ananyev 	if (tail == pos)
484*b5458e2cSKonstantin Ananyev 		__rte_soring_stage_finalize(&stg->sht, stage, r->state, r->mask,
485*b5458e2cSKonstantin Ananyev 				r->capacity);
486*b5458e2cSKonstantin Ananyev }
487*b5458e2cSKonstantin Ananyev 
488*b5458e2cSKonstantin Ananyev /*
489*b5458e2cSKonstantin Ananyev  * Public functions (data-path) start here.
490*b5458e2cSKonstantin Ananyev  */
491*b5458e2cSKonstantin Ananyev 
492*b5458e2cSKonstantin Ananyev void
493*b5458e2cSKonstantin Ananyev rte_soring_release(struct rte_soring *r, const void *objs,
494*b5458e2cSKonstantin Ananyev 	uint32_t stage, uint32_t n, uint32_t ftoken)
495*b5458e2cSKonstantin Ananyev {
496*b5458e2cSKonstantin Ananyev 	soring_release(r, objs, NULL, stage, n, ftoken);
497*b5458e2cSKonstantin Ananyev }
498*b5458e2cSKonstantin Ananyev 
499*b5458e2cSKonstantin Ananyev 
500*b5458e2cSKonstantin Ananyev void
501*b5458e2cSKonstantin Ananyev rte_soring_releasx(struct rte_soring *r, const void *objs,
502*b5458e2cSKonstantin Ananyev 	const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken)
503*b5458e2cSKonstantin Ananyev {
504*b5458e2cSKonstantin Ananyev 	soring_release(r, objs, meta, stage, n, ftoken);
505*b5458e2cSKonstantin Ananyev }
506*b5458e2cSKonstantin Ananyev 
507*b5458e2cSKonstantin Ananyev uint32_t
508*b5458e2cSKonstantin Ananyev rte_soring_enqueue_bulk(struct rte_soring *r, const void *objs, uint32_t n,
509*b5458e2cSKonstantin Ananyev 	uint32_t *free_space)
510*b5458e2cSKonstantin Ananyev {
511*b5458e2cSKonstantin Ananyev 	return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_FIXED,
512*b5458e2cSKonstantin Ananyev 			free_space);
513*b5458e2cSKonstantin Ananyev }
514*b5458e2cSKonstantin Ananyev 
515*b5458e2cSKonstantin Ananyev uint32_t
516*b5458e2cSKonstantin Ananyev rte_soring_enqueux_bulk(struct rte_soring *r, const void *objs,
517*b5458e2cSKonstantin Ananyev 	const void *meta, uint32_t n, uint32_t *free_space)
518*b5458e2cSKonstantin Ananyev {
519*b5458e2cSKonstantin Ananyev 	return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_FIXED,
520*b5458e2cSKonstantin Ananyev 			free_space);
521*b5458e2cSKonstantin Ananyev }
522*b5458e2cSKonstantin Ananyev 
523*b5458e2cSKonstantin Ananyev uint32_t
524*b5458e2cSKonstantin Ananyev rte_soring_enqueue_burst(struct rte_soring *r, const void *objs, uint32_t n,
525*b5458e2cSKonstantin Ananyev 	uint32_t *free_space)
526*b5458e2cSKonstantin Ananyev {
527*b5458e2cSKonstantin Ananyev 	return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_VARIABLE,
528*b5458e2cSKonstantin Ananyev 			free_space);
529*b5458e2cSKonstantin Ananyev }
530*b5458e2cSKonstantin Ananyev 
531*b5458e2cSKonstantin Ananyev uint32_t
532*b5458e2cSKonstantin Ananyev rte_soring_enqueux_burst(struct rte_soring *r, const void *objs,
533*b5458e2cSKonstantin Ananyev 	const void *meta, uint32_t n, uint32_t *free_space)
534*b5458e2cSKonstantin Ananyev {
535*b5458e2cSKonstantin Ananyev 	return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_VARIABLE,
536*b5458e2cSKonstantin Ananyev 			free_space);
537*b5458e2cSKonstantin Ananyev }
538*b5458e2cSKonstantin Ananyev 
539*b5458e2cSKonstantin Ananyev uint32_t
540*b5458e2cSKonstantin Ananyev rte_soring_dequeue_bulk(struct rte_soring *r, void *objs, uint32_t num,
541*b5458e2cSKonstantin Ananyev 	uint32_t *available)
542*b5458e2cSKonstantin Ananyev {
543*b5458e2cSKonstantin Ananyev 	return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_FIXED,
544*b5458e2cSKonstantin Ananyev 			available);
545*b5458e2cSKonstantin Ananyev }
546*b5458e2cSKonstantin Ananyev 
547*b5458e2cSKonstantin Ananyev uint32_t
548*b5458e2cSKonstantin Ananyev rte_soring_dequeux_bulk(struct rte_soring *r, void *objs, void *meta,
549*b5458e2cSKonstantin Ananyev 	uint32_t num, uint32_t *available)
550*b5458e2cSKonstantin Ananyev {
551*b5458e2cSKonstantin Ananyev 	return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_FIXED,
552*b5458e2cSKonstantin Ananyev 			available);
553*b5458e2cSKonstantin Ananyev }
554*b5458e2cSKonstantin Ananyev 
555*b5458e2cSKonstantin Ananyev uint32_t
556*b5458e2cSKonstantin Ananyev rte_soring_dequeue_burst(struct rte_soring *r, void *objs, uint32_t num,
557*b5458e2cSKonstantin Ananyev 	uint32_t *available)
558*b5458e2cSKonstantin Ananyev {
559*b5458e2cSKonstantin Ananyev 	return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_VARIABLE,
560*b5458e2cSKonstantin Ananyev 			available);
561*b5458e2cSKonstantin Ananyev }
562*b5458e2cSKonstantin Ananyev 
563*b5458e2cSKonstantin Ananyev uint32_t
564*b5458e2cSKonstantin Ananyev rte_soring_dequeux_burst(struct rte_soring *r, void *objs, void *meta,
565*b5458e2cSKonstantin Ananyev 	uint32_t num, uint32_t *available)
566*b5458e2cSKonstantin Ananyev {
567*b5458e2cSKonstantin Ananyev 	return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_VARIABLE,
568*b5458e2cSKonstantin Ananyev 			available);
569*b5458e2cSKonstantin Ananyev }
570*b5458e2cSKonstantin Ananyev 
571*b5458e2cSKonstantin Ananyev uint32_t
572*b5458e2cSKonstantin Ananyev rte_soring_acquire_bulk(struct rte_soring *r, void *objs,
573*b5458e2cSKonstantin Ananyev 	uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
574*b5458e2cSKonstantin Ananyev {
575*b5458e2cSKonstantin Ananyev 	return soring_acquire(r, objs, NULL, stage, num,
576*b5458e2cSKonstantin Ananyev 			RTE_RING_QUEUE_FIXED, ftoken, available);
577*b5458e2cSKonstantin Ananyev }
578*b5458e2cSKonstantin Ananyev 
579*b5458e2cSKonstantin Ananyev uint32_t
580*b5458e2cSKonstantin Ananyev rte_soring_acquirx_bulk(struct rte_soring *r, void *objs, void *meta,
581*b5458e2cSKonstantin Ananyev 	uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
582*b5458e2cSKonstantin Ananyev {
583*b5458e2cSKonstantin Ananyev 	return soring_acquire(r, objs, meta, stage, num,
584*b5458e2cSKonstantin Ananyev 			RTE_RING_QUEUE_FIXED, ftoken, available);
585*b5458e2cSKonstantin Ananyev }
586*b5458e2cSKonstantin Ananyev 
587*b5458e2cSKonstantin Ananyev uint32_t
588*b5458e2cSKonstantin Ananyev rte_soring_acquire_burst(struct rte_soring *r, void *objs,
589*b5458e2cSKonstantin Ananyev 	uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
590*b5458e2cSKonstantin Ananyev {
591*b5458e2cSKonstantin Ananyev 	return soring_acquire(r, objs, NULL, stage, num,
592*b5458e2cSKonstantin Ananyev 			RTE_RING_QUEUE_VARIABLE, ftoken, available);
593*b5458e2cSKonstantin Ananyev }
594*b5458e2cSKonstantin Ananyev 
595*b5458e2cSKonstantin Ananyev uint32_t
596*b5458e2cSKonstantin Ananyev rte_soring_acquirx_burst(struct rte_soring *r, void *objs, void *meta,
597*b5458e2cSKonstantin Ananyev 	uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
598*b5458e2cSKonstantin Ananyev {
599*b5458e2cSKonstantin Ananyev 	return soring_acquire(r, objs, meta, stage, num,
600*b5458e2cSKonstantin Ananyev 			RTE_RING_QUEUE_VARIABLE, ftoken, available);
601*b5458e2cSKonstantin Ananyev }
602*b5458e2cSKonstantin Ananyev 
603*b5458e2cSKonstantin Ananyev unsigned int
604*b5458e2cSKonstantin Ananyev rte_soring_count(const struct rte_soring *r)
605*b5458e2cSKonstantin Ananyev {
606*b5458e2cSKonstantin Ananyev 	uint32_t prod_tail = r->prod.ht.tail;
607*b5458e2cSKonstantin Ananyev 	uint32_t cons_tail = r->cons.ht.tail;
608*b5458e2cSKonstantin Ananyev 	uint32_t count = (prod_tail - cons_tail) & r->mask;
609*b5458e2cSKonstantin Ananyev 	return (count > r->capacity) ? r->capacity : count;
610*b5458e2cSKonstantin Ananyev }
611*b5458e2cSKonstantin Ananyev 
612*b5458e2cSKonstantin Ananyev unsigned int
613*b5458e2cSKonstantin Ananyev rte_soring_free_count(const struct rte_soring *r)
614*b5458e2cSKonstantin Ananyev {
615*b5458e2cSKonstantin Ananyev 	return r->capacity - rte_soring_count(r);
616*b5458e2cSKonstantin Ananyev }
617