xref: /dpdk/lib/ring/soring.c (revision b5458e2cc48349b314c7354e4ddfd2100bd55c29)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2024 Huawei Technologies Co., Ltd
3  */
4 
5 /**
6  * @file
7  * This file contains implementation of SORING 'datapath' functions.
8  *
9  * Brief description:
10  * ==================
11  * enqueue/dequeue works the same as for conventional rte_ring:
12  * any rte_ring sync types can be used, etc.
13  * Plus there could be multiple 'stages'.
14  * For each stage there is an acquire (start) and release (finish) operation.
15  * After some elems are 'acquired' - user can safely assume that he has
16  * exclusive possession of these elems till 'release' for them is done.
17  * Note that right now user has to release exactly the same number of elems
18  * he acquired before.
19  * After 'release', elems can be 'acquired' by next stage and/or dequeued
20  * (in case of last stage).
21  *
22  * Internal structure:
23  * ===================
24  * In addition to 'normal' ring of elems, it also has a ring of states of the
25  * same size. Each state[] corresponds to exactly one elem[].
26  * state[] will be used by acquire/release/dequeue functions to store internal
27  * information and should not be accessed by the user directly.
28  *
29  * How it works:
30  * =============
31  * 'acquire()' just moves stage's head (same as rte_ring move_head does),
32  * plus it saves in state[stage.cur_head] information about how many elems
33  * were acquired, current head position and special flag value to indicate
34  * that elems are acquired (SORING_ST_START).
35  * Note that 'acquire()' returns to the user a special 'ftoken' that user has
36  * to provide for 'release()' (in fact it is just a position for current head
37  * plus current stage index).
38  * 'release()' extracts old head value from provided ftoken and checks that
39  * corresponding 'state[]' contains expected values(mostly for sanity
40  * purposes).
41  * Then it marks this state[] with 'SORING_ST_FINISH' flag to indicate
42  * that given subset of objects was released.
43  * After that, it checks does old head value equals to current tail value?
44  * If yes, then it performs  'finalize()' operation, otherwise 'release()'
45  * just returns (without spinning on stage tail value).
46  * As updated state[] is shared by all threads, some other thread can do
47  * 'finalize()' for given stage.
48  * That allows 'release()' to avoid excessive waits on the tail value.
49  * Main purpose of 'finalize()' operation is to walk through 'state[]'
50  * from current stage tail up to its head, check state[] and move stage tail
51  * through elements that already are in SORING_ST_FINISH state.
52  * Along with that, corresponding state[] values are reset to zero.
53  * Note that 'finalize()' for given stage can be done from multiple places:
54  * 'release()' for that stage or from 'acquire()' for next stage
55  * even from consumer's 'dequeue()' - in case given stage is the last one.
56  * So 'finalize()' has to be MT-safe and inside it we have to
57  * guarantee that only one thread will update state[] and stage's tail values.
58  */
59 
60 #include "soring.h"
61 
62 /*
63  * Inline functions (fastpath) start here.
64  */
65 static __rte_always_inline uint32_t
66 __rte_soring_stage_finalize(struct soring_stage_headtail *sht, uint32_t stage,
67 	union soring_state *rstate, uint32_t rmask, uint32_t maxn)
68 {
69 	int32_t rc;
70 	uint32_t ftkn, head, i, idx, k, n, tail;
71 	union soring_stage_tail nt, ot;
72 	union soring_state st;
73 
74 	/* try to grab exclusive right to update tail value */
75 	ot.raw = rte_atomic_load_explicit(&sht->tail.raw,
76 			rte_memory_order_acquire);
77 
78 	/* other thread already finalizing it for us */
79 	if (ot.sync != 0)
80 		return 0;
81 
82 	nt.pos = ot.pos;
83 	nt.sync = 1;
84 	rc = rte_atomic_compare_exchange_strong_explicit(&sht->tail.raw,
85 		(uint64_t *)(uintptr_t)&ot.raw, nt.raw,
86 		rte_memory_order_release, rte_memory_order_relaxed);
87 
88 	/* other thread won the race */
89 	if (rc == 0)
90 		return 0;
91 
92 	/* Ensure the head is read before rstate[] */
93 	head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
94 	rte_atomic_thread_fence(rte_memory_order_acquire);
95 
96 	/*
97 	 * start with current tail and walk through states that are
98 	 * already finished.
99 	 */
100 
101 	n = RTE_MIN(head - ot.pos, maxn);
102 	for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
103 
104 		idx = tail & rmask;
105 		ftkn = SORING_FTKN_MAKE(tail, stage);
106 
107 		st.raw = rte_atomic_load_explicit(&rstate[idx].raw,
108 			rte_memory_order_relaxed);
109 		if ((st.stnum & SORING_ST_MASK) != SORING_ST_FINISH ||
110 				st.ftoken != ftkn)
111 			break;
112 
113 		k = st.stnum & ~SORING_ST_MASK;
114 		rte_atomic_store_explicit(&rstate[idx].raw, 0,
115 				rte_memory_order_relaxed);
116 	}
117 
118 
119 	/* release exclusive right to update along with new tail value */
120 	ot.pos = tail;
121 	rte_atomic_store_explicit(&sht->tail.raw, ot.raw,
122 			rte_memory_order_release);
123 
124 	return i;
125 }
126 
127 static __rte_always_inline uint32_t
128 __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
129 	enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
130 	uint32_t *head, uint32_t *next, uint32_t *free)
131 {
132 	uint32_t n;
133 
134 	switch (st) {
135 	case RTE_RING_SYNC_ST:
136 	case RTE_RING_SYNC_MT:
137 		n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
138 			r->capacity, st, num, behavior, head, next, free);
139 		break;
140 	case RTE_RING_SYNC_MT_RTS:
141 		n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
142 			r->capacity, num, behavior, head, free);
143 		*next = *head + n;
144 		break;
145 	case RTE_RING_SYNC_MT_HTS:
146 		n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
147 			r->capacity, num, behavior, head, free);
148 		*next = *head + n;
149 		break;
150 	default:
151 		/* unsupported mode, shouldn't be here */
152 		RTE_ASSERT(0);
153 		*free = 0;
154 		n = 0;
155 	}
156 
157 	return n;
158 }
159 
160 static __rte_always_inline uint32_t
161 __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
162 	enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
163 	uint32_t *head, uint32_t *next, uint32_t *avail)
164 {
165 	uint32_t n;
166 
167 	switch (st) {
168 	case RTE_RING_SYNC_ST:
169 	case RTE_RING_SYNC_MT:
170 		n = __rte_ring_headtail_move_head(&r->cons.ht,
171 			&r->stage[stage].ht, 0, st, num, behavior,
172 			head, next, avail);
173 		break;
174 	case RTE_RING_SYNC_MT_RTS:
175 		n = __rte_ring_rts_move_head(&r->cons.rts, &r->stage[stage].ht,
176 			0, num, behavior, head, avail);
177 		*next = *head + n;
178 		break;
179 	case RTE_RING_SYNC_MT_HTS:
180 		n = __rte_ring_hts_move_head(&r->cons.hts, &r->stage[stage].ht,
181 			0, num, behavior, head, avail);
182 		*next = *head + n;
183 		break;
184 	default:
185 		/* unsupported mode, shouldn't be here */
186 		RTE_ASSERT(0);
187 		*avail = 0;
188 		n = 0;
189 	}
190 
191 	return n;
192 }
193 
194 static __rte_always_inline void
195 __rte_soring_update_tail(struct __rte_ring_headtail *rht,
196 	enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq)
197 {
198 	uint32_t n;
199 
200 	switch (st) {
201 	case RTE_RING_SYNC_ST:
202 	case RTE_RING_SYNC_MT:
203 		__rte_ring_update_tail(&rht->ht, head, next, st, enq);
204 		break;
205 	case RTE_RING_SYNC_MT_RTS:
206 		__rte_ring_rts_update_tail(&rht->rts);
207 		break;
208 	case RTE_RING_SYNC_MT_HTS:
209 		n = next - head;
210 		__rte_ring_hts_update_tail(&rht->hts, head, n, enq);
211 		break;
212 	default:
213 		/* unsupported mode, shouldn't be here */
214 		RTE_ASSERT(0);
215 	}
216 }
217 
218 static __rte_always_inline uint32_t
219 __rte_soring_stage_move_head(struct soring_stage_headtail *d,
220 	const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
221 	enum rte_ring_queue_behavior behavior,
222 	uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
223 {
224 	uint32_t n, tail;
225 
226 	*old_head = rte_atomic_load_explicit(&d->head,
227 			rte_memory_order_relaxed);
228 
229 	do {
230 		n = num;
231 
232 		/* Ensure the head is read before tail */
233 		rte_atomic_thread_fence(rte_memory_order_acquire);
234 
235 		tail = rte_atomic_load_explicit(&s->tail,
236 				rte_memory_order_acquire);
237 		*avail = capacity + tail - *old_head;
238 		if (n > *avail)
239 			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
240 		if (n == 0)
241 			return 0;
242 		*new_head = *old_head + n;
243 	} while (rte_atomic_compare_exchange_strong_explicit(&d->head,
244 			old_head, *new_head, rte_memory_order_acq_rel,
245 			rte_memory_order_relaxed) == 0);
246 
247 	return n;
248 }
249 
250 static __rte_always_inline uint32_t
251 soring_enqueue(struct rte_soring *r, const void *objs,
252 	const void *meta, uint32_t n, enum rte_ring_queue_behavior behavior,
253 	uint32_t *free_space)
254 {
255 	enum rte_ring_sync_type st;
256 	uint32_t nb_free, prod_head, prod_next;
257 
258 	RTE_ASSERT(r != NULL && r->nb_stage > 0);
259 	RTE_ASSERT(meta == NULL || r->meta != NULL);
260 
261 	st = r->prod.ht.sync_type;
262 
263 	n = __rte_soring_move_prod_head(r, n, behavior, st,
264 			&prod_head, &prod_next, &nb_free);
265 	if (n != 0) {
266 		__rte_ring_do_enqueue_elems(&r[1], objs, r->size,
267 			prod_head & r->mask, r->esize, n);
268 		if (meta != NULL)
269 			__rte_ring_do_enqueue_elems(r->meta, meta, r->size,
270 				prod_head & r->mask, r->msize, n);
271 		__rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1);
272 	}
273 
274 	if (free_space != NULL)
275 		*free_space = nb_free - n;
276 	return n;
277 }
278 
279 static __rte_always_inline uint32_t
280 soring_dequeue(struct rte_soring *r, void *objs, void *meta,
281 	uint32_t num, enum rte_ring_queue_behavior behavior,
282 	uint32_t *available)
283 {
284 	enum rte_ring_sync_type st;
285 	uint32_t entries, cons_head, cons_next, n, ns, reqn;
286 
287 	RTE_ASSERT(r != NULL && r->nb_stage > 0);
288 	RTE_ASSERT(meta == NULL || r->meta != NULL);
289 
290 	ns = r->nb_stage - 1;
291 	st = r->cons.ht.sync_type;
292 
293 	/* try to grab exactly @num elems first */
294 	n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st,
295 			&cons_head, &cons_next, &entries);
296 	if (n == 0) {
297 		/* try to finalize some elems from previous stage */
298 		n = __rte_soring_stage_finalize(&r->stage[ns].sht, ns,
299 			r->state, r->mask, 2 * num);
300 		entries += n;
301 
302 		/* repeat attempt to grab elems */
303 		reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
304 		if (entries >= reqn)
305 			n = __rte_soring_move_cons_head(r, ns, num, behavior,
306 				st, &cons_head, &cons_next, &entries);
307 		else
308 			n = 0;
309 	}
310 
311 	/* we have some elems to consume */
312 	if (n != 0) {
313 		__rte_ring_do_dequeue_elems(objs, &r[1], r->size,
314 			cons_head & r->mask, r->esize, n);
315 		if (meta != NULL)
316 			__rte_ring_do_dequeue_elems(meta, r->meta, r->size,
317 				cons_head & r->mask, r->msize, n);
318 		__rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0);
319 	}
320 
321 	if (available != NULL)
322 		*available = entries - n;
323 	return n;
324 }
325 
326 /*
327  * Verify internal SORING state.
328  * WARNING: if expected value is not equal to actual one, it means that for
329  * whatever reason SORING data constancy is broken. That is a very serious
330  * problem that most likely will cause race-conditions, memory corruption,
331  * program crash.
332  * To ease debugging it user might rebuild ring library with
333  * RTE_SORING_DEBUG enabled.
334  */
335 static __rte_always_inline void
336 soring_verify_state(const struct rte_soring *r, uint32_t stage, uint32_t idx,
337 	const char *msg, union soring_state val,  union soring_state exp)
338 {
339 	if (val.raw != exp.raw) {
340 #ifdef RTE_SORING_DEBUG
341 		rte_soring_dump(stderr, r);
342 		rte_panic("line:%d from:%s: soring=%p, stage=%#x, idx=%#x, "
343 			"expected={.stnum=%#x, .ftoken=%#x}, "
344 			"actual={.stnum=%#x, .ftoken=%#x};\n",
345 			__LINE__, msg, r, stage, idx,
346 			exp.stnum, exp.ftoken,
347 			val.stnum, val.ftoken);
348 #else
349 		SORING_LOG(EMERG, "from:%s: soring=%p, stage=%#x, idx=%#x, "
350 			"expected={.stnum=%#x, .ftoken=%#x}, "
351 			"actual={.stnum=%#x, .ftoken=%#x};",
352 			msg, r, stage, idx,
353 			exp.stnum, exp.ftoken,
354 			val.stnum, val.ftoken);
355 #endif
356 	}
357 }
358 
359 /* check and update state ring at acquire op*/
360 static __rte_always_inline void
361 acquire_state_update(const struct rte_soring *r, uint32_t stage, uint32_t idx,
362 	uint32_t ftoken, uint32_t num)
363 {
364 	union soring_state st;
365 	const union soring_state est = {.raw = 0};
366 
367 	st.raw = rte_atomic_load_explicit(&r->state[idx].raw,
368 			rte_memory_order_relaxed);
369 	soring_verify_state(r, stage, idx, __func__, st, est);
370 
371 	st.ftoken = ftoken;
372 	st.stnum = (SORING_ST_START | num);
373 
374 	rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
375 			rte_memory_order_relaxed);
376 }
377 
378 static __rte_always_inline uint32_t
379 soring_acquire(struct rte_soring *r, void *objs, void *meta,
380 	uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
381 	uint32_t *ftoken, uint32_t *available)
382 {
383 	uint32_t avail, head, idx, n, next, reqn;
384 	struct soring_stage *pstg;
385 	struct soring_stage_headtail *cons;
386 
387 	RTE_ASSERT(r != NULL && stage < r->nb_stage);
388 	RTE_ASSERT(meta == NULL || r->meta != NULL);
389 
390 	cons = &r->stage[stage].sht;
391 
392 	if (stage == 0)
393 		n = __rte_soring_stage_move_head(cons, &r->prod.ht, 0, num,
394 			behavior, &head, &next, &avail);
395 	else {
396 		pstg = r->stage + stage - 1;
397 
398 		/* try to grab exactly @num elems */
399 		n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num,
400 			RTE_RING_QUEUE_FIXED, &head, &next, &avail);
401 		if (n == 0) {
402 			/* try to finalize some elems from previous stage */
403 			n = __rte_soring_stage_finalize(&pstg->sht, stage - 1,
404 				r->state, r->mask, 2 * num);
405 			avail += n;
406 
407 			/* repeat attempt to grab elems */
408 			reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
409 			if (avail >= reqn)
410 				n = __rte_soring_stage_move_head(cons,
411 					&pstg->ht, 0, num, behavior, &head,
412 					&next, &avail);
413 			else
414 				n = 0;
415 		}
416 	}
417 
418 	if (n != 0) {
419 
420 		idx = head & r->mask;
421 		*ftoken = SORING_FTKN_MAKE(head, stage);
422 
423 		/* check and update state value */
424 		acquire_state_update(r, stage, idx, *ftoken, n);
425 
426 		/* copy elems that are ready for given stage */
427 		__rte_ring_do_dequeue_elems(objs, &r[1], r->size, idx,
428 				r->esize, n);
429 		if (meta != NULL)
430 			__rte_ring_do_dequeue_elems(meta, r->meta,
431 				r->size, idx, r->msize, n);
432 	}
433 
434 	if (available != NULL)
435 		*available = avail - n;
436 	return n;
437 }
438 
439 static __rte_always_inline void
440 soring_release(struct rte_soring *r, const void *objs,
441 	const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken)
442 {
443 	uint32_t idx, pos, tail;
444 	struct soring_stage *stg;
445 	union soring_state st;
446 
447 	const union soring_state est = {
448 		.stnum = (SORING_ST_START | n),
449 		.ftoken = ftoken,
450 	};
451 
452 	RTE_ASSERT(r != NULL && stage < r->nb_stage);
453 	RTE_ASSERT(meta == NULL || r->meta != NULL);
454 
455 	stg = r->stage + stage;
456 
457 	pos = SORING_FTKN_POS(ftoken, stage);
458 	idx = pos & r->mask;
459 	st.raw = rte_atomic_load_explicit(&r->state[idx].raw,
460 			rte_memory_order_relaxed);
461 
462 	/* check state ring contents */
463 	soring_verify_state(r, stage, idx, __func__, st, est);
464 
465 	/* update contents of the ring, if necessary */
466 	if (objs != NULL)
467 		__rte_ring_do_enqueue_elems(&r[1], objs, r->size, idx,
468 			r->esize, n);
469 	if (meta != NULL)
470 		__rte_ring_do_enqueue_elems(r->meta, meta, r->size, idx,
471 			r->msize, n);
472 
473 	/* set state to FINISH, make sure it is not reordered */
474 	rte_atomic_thread_fence(rte_memory_order_release);
475 
476 	st.stnum = SORING_ST_FINISH | n;
477 	rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
478 			rte_memory_order_relaxed);
479 
480 	/* try to do finalize(), if appropriate */
481 	tail = rte_atomic_load_explicit(&stg->sht.tail.pos,
482 			rte_memory_order_relaxed);
483 	if (tail == pos)
484 		__rte_soring_stage_finalize(&stg->sht, stage, r->state, r->mask,
485 				r->capacity);
486 }
487 
488 /*
489  * Public functions (data-path) start here.
490  */
491 
492 void
493 rte_soring_release(struct rte_soring *r, const void *objs,
494 	uint32_t stage, uint32_t n, uint32_t ftoken)
495 {
496 	soring_release(r, objs, NULL, stage, n, ftoken);
497 }
498 
499 
500 void
501 rte_soring_releasx(struct rte_soring *r, const void *objs,
502 	const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken)
503 {
504 	soring_release(r, objs, meta, stage, n, ftoken);
505 }
506 
507 uint32_t
508 rte_soring_enqueue_bulk(struct rte_soring *r, const void *objs, uint32_t n,
509 	uint32_t *free_space)
510 {
511 	return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_FIXED,
512 			free_space);
513 }
514 
515 uint32_t
516 rte_soring_enqueux_bulk(struct rte_soring *r, const void *objs,
517 	const void *meta, uint32_t n, uint32_t *free_space)
518 {
519 	return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_FIXED,
520 			free_space);
521 }
522 
523 uint32_t
524 rte_soring_enqueue_burst(struct rte_soring *r, const void *objs, uint32_t n,
525 	uint32_t *free_space)
526 {
527 	return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_VARIABLE,
528 			free_space);
529 }
530 
531 uint32_t
532 rte_soring_enqueux_burst(struct rte_soring *r, const void *objs,
533 	const void *meta, uint32_t n, uint32_t *free_space)
534 {
535 	return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_VARIABLE,
536 			free_space);
537 }
538 
539 uint32_t
540 rte_soring_dequeue_bulk(struct rte_soring *r, void *objs, uint32_t num,
541 	uint32_t *available)
542 {
543 	return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_FIXED,
544 			available);
545 }
546 
547 uint32_t
548 rte_soring_dequeux_bulk(struct rte_soring *r, void *objs, void *meta,
549 	uint32_t num, uint32_t *available)
550 {
551 	return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_FIXED,
552 			available);
553 }
554 
555 uint32_t
556 rte_soring_dequeue_burst(struct rte_soring *r, void *objs, uint32_t num,
557 	uint32_t *available)
558 {
559 	return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_VARIABLE,
560 			available);
561 }
562 
563 uint32_t
564 rte_soring_dequeux_burst(struct rte_soring *r, void *objs, void *meta,
565 	uint32_t num, uint32_t *available)
566 {
567 	return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_VARIABLE,
568 			available);
569 }
570 
571 uint32_t
572 rte_soring_acquire_bulk(struct rte_soring *r, void *objs,
573 	uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
574 {
575 	return soring_acquire(r, objs, NULL, stage, num,
576 			RTE_RING_QUEUE_FIXED, ftoken, available);
577 }
578 
579 uint32_t
580 rte_soring_acquirx_bulk(struct rte_soring *r, void *objs, void *meta,
581 	uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
582 {
583 	return soring_acquire(r, objs, meta, stage, num,
584 			RTE_RING_QUEUE_FIXED, ftoken, available);
585 }
586 
587 uint32_t
588 rte_soring_acquire_burst(struct rte_soring *r, void *objs,
589 	uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
590 {
591 	return soring_acquire(r, objs, NULL, stage, num,
592 			RTE_RING_QUEUE_VARIABLE, ftoken, available);
593 }
594 
595 uint32_t
596 rte_soring_acquirx_burst(struct rte_soring *r, void *objs, void *meta,
597 	uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
598 {
599 	return soring_acquire(r, objs, meta, stage, num,
600 			RTE_RING_QUEUE_VARIABLE, ftoken, available);
601 }
602 
603 unsigned int
604 rte_soring_count(const struct rte_soring *r)
605 {
606 	uint32_t prod_tail = r->prod.ht.tail;
607 	uint32_t cons_tail = r->cons.ht.tail;
608 	uint32_t count = (prod_tail - cons_tail) & r->mask;
609 	return (count > r->capacity) ? r->capacity : count;
610 }
611 
612 unsigned int
613 rte_soring_free_count(const struct rte_soring *r)
614 {
615 	return r->capacity - rte_soring_count(r);
616 }
617