xref: /dpdk/lib/ring/rte_ring_peek_zc.h (revision 719834a6849e1daf4a70ff7742bbcc3ae7e25607)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2020 Arm Limited
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_PEEK_ZC_H_
11 #define _RTE_RING_PEEK_ZC_H_
12 
13 /**
14  * @file
15  * It is not recommended to include this file directly.
16  * Please include <rte_ring_elem.h> instead.
17  *
18  * Ring Peek Zero Copy APIs
19  * These APIs make it possible to split public enqueue/dequeue API
20  * into 3 parts:
21  * - enqueue/dequeue start
22  * - copy data to/from the ring
23  * - enqueue/dequeue finish
24  * Along with the advantages of the peek APIs, these APIs provide the ability
25  * to avoid copying of the data to temporary area (for ex: array of mbufs
26  * on the stack).
27  *
28  * Note that currently these APIs are available only for two sync modes:
29  * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST)
30  * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS).
31  * It is user's responsibility to create/init ring with appropriate sync
32  * modes selected.
33  *
34  * Following are some examples showing the API usage.
35  * 1)
36  * struct elem_obj {uint64_t a; uint32_t b, c;};
37  * struct elem_obj *obj;
38  *
39  * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS
40  * // Reserve space on the ring
41  * n = rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(elem_obj), 1, &zcd, NULL);
42  *
43  * // Produce the data directly on the ring memory
44  * obj = (struct elem_obj *)zcd->ptr1;
45  * obj->a = rte_get_a();
46  * obj->b = rte_get_b();
47  * obj->c = rte_get_c();
48  * rte_ring_enqueue_zc_elem_finish(ring, n);
49  *
50  * 2)
51  * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS
52  * // Reserve space on the ring
53  * n = rte_ring_enqueue_zc_burst_start(r, 32, &zcd, NULL);
54  *
55  * // Pkt I/O core polls packets from the NIC
56  * if (n != 0) {
57  *	nb_rx = rte_eth_rx_burst(portid, queueid, zcd->ptr1, zcd->n1);
58  *	if (nb_rx == zcd->n1 && n != zcd->n1)
59  *		nb_rx = rte_eth_rx_burst(portid, queueid,
60  *						zcd->ptr2, n - zcd->n1);
61  *
62  *	// Provide packets to the packet processing cores
63  *	rte_ring_enqueue_zc_finish(r, nb_rx);
64  * }
65  *
66  * Note that between _start_ and _finish_ none other thread can proceed
67  * with enqueue/dequeue operation till _finish_ completes.
68  */
69 
70 #include <rte_ring_peek_elem_pvt.h>
71 
72 #ifdef __cplusplus
73 extern "C" {
74 #endif
75 
76 /**
77  * Ring zero-copy information structure.
78  *
79  * This structure contains the pointers and length of the space
80  * reserved on the ring storage.
81  */
82 struct __rte_cache_aligned rte_ring_zc_data {
83 	/* Pointer to the first space in the ring */
84 	void *ptr1;
85 	/* Pointer to the second space in the ring if there is wrap-around.
86 	 * It contains valid value only if wrap-around happens.
87 	 */
88 	void *ptr2;
89 	/* Number of elements in the first pointer. If this is equal to
90 	 * the number of elements requested, then ptr2 is NULL.
91 	 * Otherwise, subtracting n1 from number of elements requested
92 	 * will give the number of elements available at ptr2.
93 	 */
94 	unsigned int n1;
95 };
96 
97 static __rte_always_inline void
98 __rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
99 	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2)
100 {
101 	uint32_t idx, scale, nr_idx;
102 	uint32_t *ring = (uint32_t *)&r[1];
103 
104 	/* Normalize to uint32_t */
105 	scale = esize / sizeof(uint32_t);
106 	idx = head & r->mask;
107 	nr_idx = idx * scale;
108 
109 	*dst1 = ring + nr_idx;
110 	*n1 = num;
111 
112 	if (idx + num > r->size) {
113 		*n1 = r->size - idx;
114 		*dst2 = ring;
115 	} else {
116 		*dst2 = NULL;
117 	}
118 }
119 
120 /**
121  * @internal This function moves prod head value.
122  */
123 static __rte_always_inline unsigned int
124 __rte_ring_do_enqueue_zc_elem_start(struct rte_ring *r, unsigned int esize,
125 		uint32_t n, enum rte_ring_queue_behavior behavior,
126 		struct rte_ring_zc_data *zcd, unsigned int *free_space)
127 {
128 	uint32_t free, head, next;
129 
130 	switch (r->prod.sync_type) {
131 	case RTE_RING_SYNC_ST:
132 		n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n,
133 			behavior, &head, &next, &free);
134 		break;
135 	case RTE_RING_SYNC_MT_HTS:
136 		n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free);
137 		break;
138 	case RTE_RING_SYNC_MT:
139 	case RTE_RING_SYNC_MT_RTS:
140 	default:
141 		/* unsupported mode, shouldn't be here */
142 		RTE_ASSERT(0);
143 		n = 0;
144 		free = 0;
145 		return n;
146 	}
147 
148 	__rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1,
149 		&zcd->n1, &zcd->ptr2);
150 
151 	if (free_space != NULL)
152 		*free_space = free - n;
153 	return n;
154 }
155 
156 /**
157  * Start to enqueue several objects on the ring.
158  * Note that no actual objects are put in the queue by this function,
159  * it just reserves space for the user on the ring.
160  * User has to copy objects into the queue using the returned pointers.
161  * User should call rte_ring_enqueue_zc_elem_finish to complete the
162  * enqueue operation.
163  *
164  * @param r
165  *   A pointer to the ring structure.
166  * @param esize
167  *   The size of ring element, in bytes. It must be a multiple of 4.
168  * @param n
169  *   The number of objects to add in the ring.
170  * @param zcd
171  *   Structure containing the pointers and length of the space
172  *   reserved on the ring storage.
173  * @param free_space
174  *   If non-NULL, returns the amount of space in the ring after the
175  *   reservation operation has finished.
176  * @return
177  *   The number of objects that can be enqueued, either 0 or n
178  */
179 static __rte_always_inline unsigned int
180 rte_ring_enqueue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize,
181 	unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space)
182 {
183 	return __rte_ring_do_enqueue_zc_elem_start(r, esize, n,
184 			RTE_RING_QUEUE_FIXED, zcd, free_space);
185 }
186 
187 /**
188  * Start to enqueue several pointers to objects on the ring.
189  * Note that no actual pointers are put in the queue by this function,
190  * it just reserves space for the user on the ring.
191  * User has to copy pointers to objects into the queue using the
192  * returned pointers.
193  * User should call rte_ring_enqueue_zc_finish to complete the
194  * enqueue operation.
195  *
196  * @param r
197  *   A pointer to the ring structure.
198  * @param n
199  *   The number of objects to add in the ring.
200  * @param zcd
201  *   Structure containing the pointers and length of the space
202  *   reserved on the ring storage.
203  * @param free_space
204  *   If non-NULL, returns the amount of space in the ring after the
205  *   reservation operation has finished.
206  * @return
207  *   The number of objects that can be enqueued, either 0 or n
208  */
209 static __rte_always_inline unsigned int
210 rte_ring_enqueue_zc_bulk_start(struct rte_ring *r, unsigned int n,
211 	struct rte_ring_zc_data *zcd, unsigned int *free_space)
212 {
213 	return rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(uintptr_t), n,
214 							zcd, free_space);
215 }
216 
217 /**
218  * Start to enqueue several objects on the ring.
219  * Note that no actual objects are put in the queue by this function,
220  * it just reserves space for the user on the ring.
221  * User has to copy objects into the queue using the returned pointers.
222  * User should call rte_ring_enqueue_zc_elem_finish to complete the
223  * enqueue operation.
224  *
225  * @param r
226  *   A pointer to the ring structure.
227  * @param esize
228  *   The size of ring element, in bytes. It must be a multiple of 4.
229  * @param n
230  *   The number of objects to add in the ring.
231  * @param zcd
232  *   Structure containing the pointers and length of the space
233  *   reserved on the ring storage.
234  * @param free_space
235  *   If non-NULL, returns the amount of space in the ring after the
236  *   reservation operation has finished.
237  * @return
238  *   The number of objects that can be enqueued, either 0 or n
239  */
240 static __rte_always_inline unsigned int
241 rte_ring_enqueue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize,
242 	unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space)
243 {
244 	return __rte_ring_do_enqueue_zc_elem_start(r, esize, n,
245 			RTE_RING_QUEUE_VARIABLE, zcd, free_space);
246 }
247 
248 /**
249  * Start to enqueue several pointers to objects on the ring.
250  * Note that no actual pointers are put in the queue by this function,
251  * it just reserves space for the user on the ring.
252  * User has to copy pointers to objects into the queue using the
253  * returned pointers.
254  * User should call rte_ring_enqueue_zc_finish to complete the
255  * enqueue operation.
256  *
257  * @param r
258  *   A pointer to the ring structure.
259  * @param n
260  *   The number of objects to add in the ring.
261  * @param zcd
262  *   Structure containing the pointers and length of the space
263  *   reserved on the ring storage.
264  * @param free_space
265  *   If non-NULL, returns the amount of space in the ring after the
266  *   reservation operation has finished.
267  * @return
268  *   The number of objects that can be enqueued, either 0 or n.
269  */
270 static __rte_always_inline unsigned int
271 rte_ring_enqueue_zc_burst_start(struct rte_ring *r, unsigned int n,
272 	struct rte_ring_zc_data *zcd, unsigned int *free_space)
273 {
274 	return rte_ring_enqueue_zc_burst_elem_start(r, sizeof(uintptr_t), n,
275 							zcd, free_space);
276 }
277 
278 /**
279  * Complete enqueuing several objects on the ring.
280  * Note that number of objects to enqueue should not exceed previous
281  * enqueue_start return value.
282  *
283  * @param r
284  *   A pointer to the ring structure.
285  * @param n
286  *   The number of objects to add to the ring.
287  */
288 static __rte_always_inline void
289 rte_ring_enqueue_zc_elem_finish(struct rte_ring *r, unsigned int n)
290 {
291 	uint32_t tail;
292 
293 	switch (r->prod.sync_type) {
294 	case RTE_RING_SYNC_ST:
295 		n = __rte_ring_st_get_tail(&r->prod, &tail, n);
296 		__rte_ring_st_set_head_tail(&r->prod, tail, n, 1);
297 		break;
298 	case RTE_RING_SYNC_MT_HTS:
299 		n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n);
300 		__rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1);
301 		break;
302 	case RTE_RING_SYNC_MT:
303 	case RTE_RING_SYNC_MT_RTS:
304 	default:
305 		/* unsupported mode, shouldn't be here */
306 		RTE_ASSERT(0);
307 	}
308 }
309 
310 /**
311  * Complete enqueuing several pointers to objects on the ring.
312  * Note that number of objects to enqueue should not exceed previous
313  * enqueue_start return value.
314  *
315  * @param r
316  *   A pointer to the ring structure.
317  * @param n
318  *   The number of pointers to objects to add to the ring.
319  */
320 static __rte_always_inline void
321 rte_ring_enqueue_zc_finish(struct rte_ring *r, unsigned int n)
322 {
323 	rte_ring_enqueue_zc_elem_finish(r, n);
324 }
325 
326 /**
327  * @internal This function moves cons head value and copies up to *n*
328  * objects from the ring to the user provided obj_table.
329  */
330 static __rte_always_inline unsigned int
331 __rte_ring_do_dequeue_zc_elem_start(struct rte_ring *r,
332 	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
333 	struct rte_ring_zc_data *zcd, unsigned int *available)
334 {
335 	uint32_t avail, head, next;
336 
337 	switch (r->cons.sync_type) {
338 	case RTE_RING_SYNC_ST:
339 		n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n,
340 			behavior, &head, &next, &avail);
341 		break;
342 	case RTE_RING_SYNC_MT_HTS:
343 		n = __rte_ring_hts_move_cons_head(r, n, behavior,
344 			&head, &avail);
345 		break;
346 	case RTE_RING_SYNC_MT:
347 	case RTE_RING_SYNC_MT_RTS:
348 	default:
349 		/* unsupported mode, shouldn't be here */
350 		RTE_ASSERT(0);
351 		n = 0;
352 		avail = 0;
353 		return n;
354 	}
355 
356 	__rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1,
357 		&zcd->n1, &zcd->ptr2);
358 
359 	if (available != NULL)
360 		*available = avail - n;
361 	return n;
362 }
363 
364 /**
365  * Start to dequeue several objects from the ring.
366  * Note that no actual objects are copied from the queue by this function.
367  * User has to copy objects from the queue using the returned pointers.
368  * User should call rte_ring_dequeue_zc_elem_finish to complete the
369  * dequeue operation.
370  *
371  * @param r
372  *   A pointer to the ring structure.
373  * @param esize
374  *   The size of ring element, in bytes. It must be a multiple of 4.
375  * @param n
376  *   The number of objects to remove from the ring.
377  * @param zcd
378  *   Structure containing the pointers and length of the space
379  *   reserved on the ring storage.
380  * @param available
381  *   If non-NULL, returns the number of remaining ring entries after the
382  *   dequeue has finished.
383  * @return
384  *   The number of objects that can be dequeued, either 0 or n.
385  */
386 static __rte_always_inline unsigned int
387 rte_ring_dequeue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize,
388 	unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available)
389 {
390 	return __rte_ring_do_dequeue_zc_elem_start(r, esize, n,
391 			RTE_RING_QUEUE_FIXED, zcd, available);
392 }
393 
394 /**
395  * Start to dequeue several pointers to objects from the ring.
396  * Note that no actual pointers are removed from the queue by this function.
397  * User has to copy pointers to objects from the queue using the
398  * returned pointers.
399  * User should call rte_ring_dequeue_zc_finish to complete the
400  * dequeue operation.
401  *
402  * @param r
403  *   A pointer to the ring structure.
404  * @param n
405  *   The number of objects to remove from the ring.
406  * @param zcd
407  *   Structure containing the pointers and length of the space
408  *   reserved on the ring storage.
409  * @param available
410  *   If non-NULL, returns the number of remaining ring entries after the
411  *   dequeue has finished.
412  * @return
413  *   The number of objects that can be dequeued, either 0 or n.
414  */
415 static __rte_always_inline unsigned int
416 rte_ring_dequeue_zc_bulk_start(struct rte_ring *r, unsigned int n,
417 	struct rte_ring_zc_data *zcd, unsigned int *available)
418 {
419 	return rte_ring_dequeue_zc_bulk_elem_start(r, sizeof(uintptr_t),
420 		n, zcd, available);
421 }
422 
423 /**
424  * Start to dequeue several objects from the ring.
425  * Note that no actual objects are copied from the queue by this function.
426  * User has to copy objects from the queue using the returned pointers.
427  * User should call rte_ring_dequeue_zc_elem_finish to complete the
428  * dequeue operation.
429  *
430  * @param r
431  *   A pointer to the ring structure.
432  * @param esize
433  *   The size of ring element, in bytes. It must be a multiple of 4.
434  *   This must be the same value used while creating the ring. Otherwise
435  *   the results are undefined.
436  * @param n
437  *   The number of objects to dequeue from the ring.
438  * @param zcd
439  *   Structure containing the pointers and length of the space
440  *   reserved on the ring storage.
441  * @param available
442  *   If non-NULL, returns the number of remaining ring entries after the
443  *   dequeue has finished.
444  * @return
445  *   The number of objects that can be dequeued, either 0 or n.
446  */
447 static __rte_always_inline unsigned int
448 rte_ring_dequeue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize,
449 	unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available)
450 {
451 	return __rte_ring_do_dequeue_zc_elem_start(r, esize, n,
452 			RTE_RING_QUEUE_VARIABLE, zcd, available);
453 }
454 
455 /**
456  * Start to dequeue several pointers to objects from the ring.
457  * Note that no actual pointers are removed from the queue by this function.
458  * User has to copy pointers to objects from the queue using the
459  * returned pointers.
460  * User should call rte_ring_dequeue_zc_finish to complete the
461  * dequeue operation.
462  *
463  * @param r
464  *   A pointer to the ring structure.
465  * @param n
466  *   The number of objects to remove from the ring.
467  * @param zcd
468  *   Structure containing the pointers and length of the space
469  *   reserved on the ring storage.
470  * @param available
471  *   If non-NULL, returns the number of remaining ring entries after the
472  *   dequeue has finished.
473  * @return
474  *   The number of objects that can be dequeued, either 0 or n.
475  */
476 static __rte_always_inline unsigned int
477 rte_ring_dequeue_zc_burst_start(struct rte_ring *r, unsigned int n,
478 		struct rte_ring_zc_data *zcd, unsigned int *available)
479 {
480 	return rte_ring_dequeue_zc_burst_elem_start(r, sizeof(uintptr_t), n,
481 			zcd, available);
482 }
483 
484 /**
485  * Complete dequeuing several objects from the ring.
486  * Note that number of objects to dequeued should not exceed previous
487  * dequeue_start return value.
488  *
489  * @param r
490  *   A pointer to the ring structure.
491  * @param n
492  *   The number of objects to remove from the ring.
493  */
494 static __rte_always_inline void
495 rte_ring_dequeue_zc_elem_finish(struct rte_ring *r, unsigned int n)
496 {
497 	uint32_t tail;
498 
499 	switch (r->cons.sync_type) {
500 	case RTE_RING_SYNC_ST:
501 		n = __rte_ring_st_get_tail(&r->cons, &tail, n);
502 		__rte_ring_st_set_head_tail(&r->cons, tail, n, 0);
503 		break;
504 	case RTE_RING_SYNC_MT_HTS:
505 		n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n);
506 		__rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0);
507 		break;
508 	case RTE_RING_SYNC_MT:
509 	case RTE_RING_SYNC_MT_RTS:
510 	default:
511 		/* unsupported mode, shouldn't be here */
512 		RTE_ASSERT(0);
513 	}
514 }
515 
516 /**
517  * Complete dequeuing several objects from the ring.
518  * Note that number of objects to dequeued should not exceed previous
519  * dequeue_start return value.
520  *
521  * @param r
522  *   A pointer to the ring structure.
523  * @param n
524  *   The number of objects to remove from the ring.
525  */
526 static __rte_always_inline void
527 rte_ring_dequeue_zc_finish(struct rte_ring *r, unsigned int n)
528 {
529 	rte_ring_dequeue_elem_finish(r, n);
530 }
531 
532 #ifdef __cplusplus
533 }
534 #endif
535 
536 #endif /* _RTE_RING_PEEK_ZC_H_ */
537