xref: /dpdk/lib/ring/rte_ring_elem_pvt.h (revision e4251abd4a3a1dbf7d613fa407e4d2843708a843)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2017,2018 HXT-semitech Corporation.
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_ELEM_PVT_H_
11 #define _RTE_RING_ELEM_PVT_H_
12 
13 #if defined(RTE_TOOLCHAIN_GCC) && (GCC_VERSION >= 120000)
14 #pragma GCC diagnostic push
15 #pragma GCC diagnostic ignored "-Wstringop-overflow"
16 #pragma GCC diagnostic ignored "-Wstringop-overread"
17 #endif
18 
19 static __rte_always_inline void
20 __rte_ring_enqueue_elems_32(void *ring_table, const void *obj_table,
21 	uint32_t size, uint32_t idx, uint32_t n)
22 {
23 	unsigned int i;
24 
25 	uint32_t *ring = (uint32_t *)ring_table;
26 	const uint32_t *obj = (const uint32_t *)obj_table;
27 
28 	if (likely(idx + n <= size)) {
29 		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
30 			ring[idx] = obj[i];
31 			ring[idx + 1] = obj[i + 1];
32 			ring[idx + 2] = obj[i + 2];
33 			ring[idx + 3] = obj[i + 3];
34 			ring[idx + 4] = obj[i + 4];
35 			ring[idx + 5] = obj[i + 5];
36 			ring[idx + 6] = obj[i + 6];
37 			ring[idx + 7] = obj[i + 7];
38 		}
39 		switch (n & 0x7) {
40 		case 7:
41 			ring[idx++] = obj[i++]; /* fallthrough */
42 		case 6:
43 			ring[idx++] = obj[i++]; /* fallthrough */
44 		case 5:
45 			ring[idx++] = obj[i++]; /* fallthrough */
46 		case 4:
47 			ring[idx++] = obj[i++]; /* fallthrough */
48 		case 3:
49 			ring[idx++] = obj[i++]; /* fallthrough */
50 		case 2:
51 			ring[idx++] = obj[i++]; /* fallthrough */
52 		case 1:
53 			ring[idx++] = obj[i++]; /* fallthrough */
54 		}
55 	} else {
56 		for (i = 0; idx < size; i++, idx++)
57 			ring[idx] = obj[i];
58 		/* Start at the beginning */
59 		for (idx = 0; i < n; i++, idx++)
60 			ring[idx] = obj[i];
61 	}
62 }
63 
64 static __rte_always_inline void
65 __rte_ring_enqueue_elems_64(void *ring_table, const void *obj_table,
66 	uint32_t size, uint32_t idx, uint32_t n)
67 {
68 	unsigned int i;
69 
70 	uint64_t *ring = (uint64_t *)ring_table;
71 	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
72 
73 	if (likely(idx + n <= size)) {
74 		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
75 			ring[idx] = obj[i];
76 			ring[idx + 1] = obj[i + 1];
77 			ring[idx + 2] = obj[i + 2];
78 			ring[idx + 3] = obj[i + 3];
79 		}
80 		switch (n & 0x3) {
81 		case 3:
82 			ring[idx++] = obj[i++]; /* fallthrough */
83 		case 2:
84 			ring[idx++] = obj[i++]; /* fallthrough */
85 		case 1:
86 			ring[idx++] = obj[i++];
87 		}
88 	} else {
89 		for (i = 0; idx < size; i++, idx++)
90 			ring[idx] = obj[i];
91 		/* Start at the beginning */
92 		for (idx = 0; i < n; i++, idx++)
93 			ring[idx] = obj[i];
94 	}
95 }
96 
97 static __rte_always_inline void
98 __rte_ring_enqueue_elems_128(void *ring_table, const void *obj_table,
99 	uint32_t size, uint32_t idx, uint32_t n)
100 {
101 	unsigned int i;
102 
103 	rte_int128_t *ring = (rte_int128_t *)ring_table;
104 	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
105 
106 	if (likely(idx + n <= size)) {
107 		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
108 			memcpy((void *)(ring + idx),
109 				(const void *)(obj + i), 32);
110 		switch (n & 0x1) {
111 		case 1:
112 			memcpy((void *)(ring + idx),
113 				(const void *)(obj + i), 16);
114 		}
115 	} else {
116 		for (i = 0; idx < size; i++, idx++)
117 			memcpy((void *)(ring + idx),
118 				(const void *)(obj + i), 16);
119 		/* Start at the beginning */
120 		for (idx = 0; i < n; i++, idx++)
121 			memcpy((void *)(ring + idx),
122 				(const void *)(obj + i), 16);
123 	}
124 }
125 
126 /* the actual enqueue of elements on the ring.
127  * Placed here since identical code needed in both
128  * single and multi producer enqueue functions.
129  */
130 static __rte_always_inline void
131 __rte_ring_do_enqueue_elems(void *ring_table, const void *obj_table,
132 	uint32_t size, uint32_t idx, uint32_t esize, uint32_t num)
133 {
134 	/* 8B and 16B copies implemented individually to retain
135 	 * the current performance.
136 	 */
137 	if (esize == 8)
138 		__rte_ring_enqueue_elems_64(ring_table, obj_table, size,
139 				idx, num);
140 	else if (esize == 16)
141 		__rte_ring_enqueue_elems_128(ring_table, obj_table, size,
142 				idx, num);
143 	else {
144 		uint32_t scale, nr_idx, nr_num, nr_size;
145 
146 		/* Normalize to uint32_t */
147 		scale = esize / sizeof(uint32_t);
148 		nr_num = num * scale;
149 		nr_idx = idx * scale;
150 		nr_size = size * scale;
151 		__rte_ring_enqueue_elems_32(ring_table, obj_table, nr_size,
152 				nr_idx, nr_num);
153 	}
154 }
155 
156 static __rte_always_inline void
157 __rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head,
158 		const void *obj_table, uint32_t esize, uint32_t num)
159 {
160 	__rte_ring_do_enqueue_elems(&r[1], obj_table, r->size,
161 			prod_head & r->mask, esize, num);
162 }
163 
164 static __rte_always_inline void
165 __rte_ring_dequeue_elems_32(void *obj_table, const void *ring_table,
166 	uint32_t size, uint32_t idx, uint32_t n)
167 {
168 	unsigned int i;
169 	uint32_t *obj = (uint32_t *)obj_table;
170 	const uint32_t *ring = (const uint32_t *)ring_table;
171 
172 	if (likely(idx + n <= size)) {
173 		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
174 			obj[i] = ring[idx];
175 			obj[i + 1] = ring[idx + 1];
176 			obj[i + 2] = ring[idx + 2];
177 			obj[i + 3] = ring[idx + 3];
178 			obj[i + 4] = ring[idx + 4];
179 			obj[i + 5] = ring[idx + 5];
180 			obj[i + 6] = ring[idx + 6];
181 			obj[i + 7] = ring[idx + 7];
182 		}
183 		switch (n & 0x7) {
184 		case 7:
185 			obj[i++] = ring[idx++]; /* fallthrough */
186 		case 6:
187 			obj[i++] = ring[idx++]; /* fallthrough */
188 		case 5:
189 			obj[i++] = ring[idx++]; /* fallthrough */
190 		case 4:
191 			obj[i++] = ring[idx++]; /* fallthrough */
192 		case 3:
193 			obj[i++] = ring[idx++]; /* fallthrough */
194 		case 2:
195 			obj[i++] = ring[idx++]; /* fallthrough */
196 		case 1:
197 			obj[i++] = ring[idx++]; /* fallthrough */
198 		}
199 	} else {
200 		for (i = 0; idx < size; i++, idx++)
201 			obj[i] = ring[idx];
202 		/* Start at the beginning */
203 		for (idx = 0; i < n; i++, idx++)
204 			obj[i] = ring[idx];
205 	}
206 }
207 
208 static __rte_always_inline void
209 __rte_ring_dequeue_elems_64(void *obj_table, const void *ring_table,
210 	uint32_t size, uint32_t idx, uint32_t n)
211 {
212 	unsigned int i;
213 	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
214 	const uint64_t *ring = (const uint64_t *)ring_table;
215 
216 	if (likely(idx + n <= size)) {
217 		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
218 			obj[i] = ring[idx];
219 			obj[i + 1] = ring[idx + 1];
220 			obj[i + 2] = ring[idx + 2];
221 			obj[i + 3] = ring[idx + 3];
222 		}
223 		switch (n & 0x3) {
224 		case 3:
225 			obj[i++] = ring[idx++]; /* fallthrough */
226 		case 2:
227 			obj[i++] = ring[idx++]; /* fallthrough */
228 		case 1:
229 			obj[i++] = ring[idx++]; /* fallthrough */
230 		}
231 	} else {
232 		for (i = 0; idx < size; i++, idx++)
233 			obj[i] = ring[idx];
234 		/* Start at the beginning */
235 		for (idx = 0; i < n; i++, idx++)
236 			obj[i] = ring[idx];
237 	}
238 }
239 
240 static __rte_always_inline void
241 __rte_ring_dequeue_elems_128(void *obj_table, const void *ring_table,
242 	uint32_t size, uint32_t idx, uint32_t n)
243 {
244 	unsigned int i;
245 	rte_int128_t *obj = (rte_int128_t *)obj_table;
246 	const rte_int128_t *ring = (const rte_int128_t *)ring_table;
247 
248 	if (likely(idx + n <= size)) {
249 		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
250 			memcpy((obj + i), (const void *)(ring + idx), 32);
251 		switch (n & 0x1) {
252 		case 1:
253 			memcpy((obj + i), (const void *)(ring + idx), 16);
254 		}
255 	} else {
256 		for (i = 0; idx < size; i++, idx++)
257 			memcpy((obj + i), (const void *)(ring + idx), 16);
258 		/* Start at the beginning */
259 		for (idx = 0; i < n; i++, idx++)
260 			memcpy((obj + i), (const void *)(ring + idx), 16);
261 	}
262 }
263 
264 /* the actual dequeue of elements from the ring.
265  * Placed here since identical code needed in both
266  * single and multi producer enqueue functions.
267  */
268 static __rte_always_inline void
269 __rte_ring_do_dequeue_elems(void *obj_table, const void *ring_table,
270 	uint32_t size, uint32_t idx, uint32_t esize, uint32_t num)
271 {
272 	/* 8B and 16B copies implemented individually to retain
273 	 * the current performance.
274 	 */
275 	if (esize == 8)
276 		__rte_ring_dequeue_elems_64(obj_table, ring_table, size,
277 				idx, num);
278 	else if (esize == 16)
279 		__rte_ring_dequeue_elems_128(obj_table, ring_table, size,
280 				idx, num);
281 	else {
282 		uint32_t scale, nr_idx, nr_num, nr_size;
283 
284 		/* Normalize to uint32_t */
285 		scale = esize / sizeof(uint32_t);
286 		nr_num = num * scale;
287 		nr_idx = idx * scale;
288 		nr_size = size * scale;
289 		__rte_ring_dequeue_elems_32(obj_table, ring_table, nr_size,
290 				nr_idx, nr_num);
291 	}
292 }
293 
294 static __rte_always_inline void
295 __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
296 		void *obj_table, uint32_t esize, uint32_t num)
297 {
298 	__rte_ring_do_dequeue_elems(obj_table, &r[1], r->size,
299 			cons_head & r->mask, esize, num);
300 }
301 
302 /* Between load and load. there might be cpu reorder in weak model
303  * (powerpc/arm).
304  * There are 2 choices for the users
305  * 1.use rmb() memory barrier
306  * 2.use one-direction load_acquire/store_release barrier
307  * It depends on performance test results.
308  */
309 #ifdef RTE_USE_C11_MEM_MODEL
310 #include "rte_ring_c11_pvt.h"
311 #else
312 #include "rte_ring_generic_pvt.h"
313 #endif
314 
315 /**
316  * @internal This function updates the producer head for enqueue
317  *
318  * @param r
319  *   A pointer to the ring structure
320  * @param is_sp
321  *   Indicates whether multi-producer path is needed or not
322  * @param n
323  *   The number of elements we will want to enqueue, i.e. how far should the
324  *   head be moved
325  * @param behavior
326  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
327  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
328  * @param old_head
329  *   Returns head value as it was before the move, i.e. where enqueue starts
330  * @param new_head
331  *   Returns the current/new head value i.e. where enqueue finishes
332  * @param free_entries
333  *   Returns the amount of free space in the ring BEFORE head was moved
334  * @return
335  *   Actual number of objects enqueued.
336  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
337  */
338 static __rte_always_inline unsigned int
339 __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
340 		unsigned int n, enum rte_ring_queue_behavior behavior,
341 		uint32_t *old_head, uint32_t *new_head,
342 		uint32_t *free_entries)
343 {
344 	return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
345 			is_sp, n, behavior, old_head, new_head, free_entries);
346 }
347 
348 /**
349  * @internal This function updates the consumer head for dequeue
350  *
351  * @param r
352  *   A pointer to the ring structure
353  * @param is_sc
354  *   Indicates whether multi-consumer path is needed or not
355  * @param n
356  *   The number of elements we will want to dequeue, i.e. how far should the
357  *   head be moved
358  * @param behavior
359  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
360  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
361  * @param old_head
362  *   Returns head value as it was before the move, i.e. where dequeue starts
363  * @param new_head
364  *   Returns the current/new head value i.e. where dequeue finishes
365  * @param entries
366  *   Returns the number of entries in the ring BEFORE head was moved
367  * @return
368  *   - Actual number of objects dequeued.
369  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
370  */
371 static __rte_always_inline unsigned int
372 __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
373 		unsigned int n, enum rte_ring_queue_behavior behavior,
374 		uint32_t *old_head, uint32_t *new_head,
375 		uint32_t *entries)
376 {
377 	return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
378 			is_sc, n, behavior, old_head, new_head, entries);
379 }
380 
381 /**
382  * @internal Enqueue several objects on the ring
383  *
384  * @param r
385  *   A pointer to the ring structure.
386  * @param obj_table
387  *   A pointer to a table of objects.
388  * @param esize
389  *   The size of ring element, in bytes. It must be a multiple of 4.
390  *   This must be the same value used while creating the ring. Otherwise
391  *   the results are undefined.
392  * @param n
393  *   The number of objects to add in the ring from the obj_table.
394  * @param behavior
395  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
396  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
397  * @param is_sp
398  *   Indicates whether to use single producer or multi-producer head update
399  * @param free_space
400  *   returns the amount of space after the enqueue operation has finished
401  * @return
402  *   Actual number of objects enqueued.
403  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
404  */
405 static __rte_always_inline unsigned int
406 __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
407 		unsigned int esize, unsigned int n,
408 		enum rte_ring_queue_behavior behavior, unsigned int is_sp,
409 		unsigned int *free_space)
410 {
411 	uint32_t prod_head, prod_next;
412 	uint32_t free_entries;
413 
414 	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
415 			&prod_head, &prod_next, &free_entries);
416 	if (n == 0)
417 		goto end;
418 
419 	__rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
420 
421 	__rte_ring_update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
422 end:
423 	if (free_space != NULL)
424 		*free_space = free_entries - n;
425 	return n;
426 }
427 
428 /**
429  * @internal Dequeue several objects from the ring
430  *
431  * @param r
432  *   A pointer to the ring structure.
433  * @param obj_table
434  *   A pointer to a table of objects.
435  * @param esize
436  *   The size of ring element, in bytes. It must be a multiple of 4.
437  *   This must be the same value used while creating the ring. Otherwise
438  *   the results are undefined.
439  * @param n
440  *   The number of objects to pull from the ring.
441  * @param behavior
442  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
443  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
444  * @param is_sc
445  *   Indicates whether to use single consumer or multi-consumer head update
446  * @param available
447  *   returns the number of remaining ring entries after the dequeue has finished
448  * @return
449  *   - Actual number of objects dequeued.
450  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
451  */
452 static __rte_always_inline unsigned int
453 __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
454 		unsigned int esize, unsigned int n,
455 		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
456 		unsigned int *available)
457 {
458 	uint32_t cons_head, cons_next;
459 	uint32_t entries;
460 
461 	n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
462 			&cons_head, &cons_next, &entries);
463 	if (n == 0)
464 		goto end;
465 
466 	__rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
467 
468 	__rte_ring_update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
469 
470 end:
471 	if (available != NULL)
472 		*available = entries - n;
473 	return n;
474 }
475 
476 #if defined(RTE_TOOLCHAIN_GCC) && (GCC_VERSION >= 120000)
477 #pragma GCC diagnostic pop
478 #endif
479 
480 #endif /* _RTE_RING_ELEM_PVT_H_ */
481