xref: /dpdk/lib/ring/rte_ring.c (revision 700989f512bbc2ee9758a8a9cb6973cfdeda6f27)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2015 Intel Corporation
4  * Copyright (c) 2007,2008 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #include <stdalign.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <stdint.h>
14 #include <inttypes.h>
15 #include <errno.h>
16 #include <sys/queue.h>
17 
18 #include <rte_common.h>
19 #include <rte_log.h>
20 #include <rte_memzone.h>
21 #include <rte_malloc.h>
22 #include <rte_eal_memconfig.h>
23 #include <rte_errno.h>
24 #include <rte_string_fns.h>
25 #include <rte_tailq.h>
26 #include <rte_telemetry.h>
27 
28 #include "rte_ring.h"
29 #include "rte_ring_elem.h"
30 
31 RTE_LOG_REGISTER_DEFAULT(ring_logtype, INFO);
32 #define RTE_LOGTYPE_RING ring_logtype
33 #define RING_LOG(level, ...) \
34 	RTE_LOG_LINE(level, RING, "" __VA_ARGS__)
35 
36 TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
37 
38 static struct rte_tailq_elem rte_ring_tailq = {
39 	.name = RTE_TAILQ_RING_NAME,
40 };
41 EAL_REGISTER_TAILQ(rte_ring_tailq)
42 
43 /* mask of all valid flag values to ring_create() */
44 #define RING_F_MASK (RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ | \
45 		     RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ |	       \
46 		     RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ)
47 
48 /* true if x is a power of 2 */
49 #define POWEROF2(x) ((((x)-1) & (x)) == 0)
50 
51 /* by default set head/tail distance as 1/8 of ring capacity */
52 #define HTD_MAX_DEF	8
53 
54 /* return the size of memory occupied by a ring */
55 ssize_t
56 rte_ring_get_memsize_elem(unsigned int esize, unsigned int count)
57 {
58 	ssize_t sz;
59 
60 	/* Check if element size is a multiple of 4B */
61 	if (esize % 4 != 0) {
62 		RING_LOG(ERR, "element size is not a multiple of 4");
63 
64 		return -EINVAL;
65 	}
66 
67 	/* count must be a power of 2 */
68 	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
69 		RING_LOG(ERR,
70 			"Requested number of elements is invalid, must be power of 2, and not exceed %u",
71 			RTE_RING_SZ_MASK);
72 
73 		return -EINVAL;
74 	}
75 
76 	sz = sizeof(struct rte_ring) + (ssize_t)count * esize;
77 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
78 	return sz;
79 }
80 
81 /* return the size of memory occupied by a ring */
82 ssize_t
83 rte_ring_get_memsize(unsigned int count)
84 {
85 	return rte_ring_get_memsize_elem(sizeof(void *), count);
86 }
87 
88 /*
89  * internal helper function to reset prod/cons head-tail values.
90  */
91 static void
92 reset_headtail(void *p)
93 {
94 	struct rte_ring_headtail *ht;
95 	struct rte_ring_hts_headtail *ht_hts;
96 	struct rte_ring_rts_headtail *ht_rts;
97 
98 	ht = p;
99 	ht_hts = p;
100 	ht_rts = p;
101 
102 	switch (ht->sync_type) {
103 	case RTE_RING_SYNC_MT:
104 	case RTE_RING_SYNC_ST:
105 		ht->head = 0;
106 		ht->tail = 0;
107 		break;
108 	case RTE_RING_SYNC_MT_RTS:
109 		ht_rts->head.raw = 0;
110 		ht_rts->tail.raw = 0;
111 		break;
112 	case RTE_RING_SYNC_MT_HTS:
113 		ht_hts->ht.raw = 0;
114 		break;
115 	default:
116 		/* unknown sync mode */
117 		RTE_ASSERT(0);
118 	}
119 }
120 
121 void
122 rte_ring_reset(struct rte_ring *r)
123 {
124 	reset_headtail(&r->prod);
125 	reset_headtail(&r->cons);
126 }
127 
128 /*
129  * helper function, calculates sync_type values for prod and cons
130  * based on input flags. Returns zero at success or negative
131  * errno value otherwise.
132  */
133 static int
134 get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
135 	enum rte_ring_sync_type *cons_st)
136 {
137 	static const uint32_t prod_st_flags =
138 		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ);
139 	static const uint32_t cons_st_flags =
140 		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ);
141 
142 	switch (flags & prod_st_flags) {
143 	case 0:
144 		*prod_st = RTE_RING_SYNC_MT;
145 		break;
146 	case RING_F_SP_ENQ:
147 		*prod_st = RTE_RING_SYNC_ST;
148 		break;
149 	case RING_F_MP_RTS_ENQ:
150 		*prod_st = RTE_RING_SYNC_MT_RTS;
151 		break;
152 	case RING_F_MP_HTS_ENQ:
153 		*prod_st = RTE_RING_SYNC_MT_HTS;
154 		break;
155 	default:
156 		return -EINVAL;
157 	}
158 
159 	switch (flags & cons_st_flags) {
160 	case 0:
161 		*cons_st = RTE_RING_SYNC_MT;
162 		break;
163 	case RING_F_SC_DEQ:
164 		*cons_st = RTE_RING_SYNC_ST;
165 		break;
166 	case RING_F_MC_RTS_DEQ:
167 		*cons_st = RTE_RING_SYNC_MT_RTS;
168 		break;
169 	case RING_F_MC_HTS_DEQ:
170 		*cons_st = RTE_RING_SYNC_MT_HTS;
171 		break;
172 	default:
173 		return -EINVAL;
174 	}
175 
176 	return 0;
177 }
178 
179 int
180 rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
181 	unsigned int flags)
182 {
183 	int ret;
184 
185 	/* compilation-time checks */
186 	RTE_BUILD_BUG_ON((sizeof(struct rte_ring) &
187 			  RTE_CACHE_LINE_MASK) != 0);
188 	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) &
189 			  RTE_CACHE_LINE_MASK) != 0);
190 	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
191 			  RTE_CACHE_LINE_MASK) != 0);
192 
193 	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
194 		offsetof(struct rte_ring_hts_headtail, sync_type));
195 	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=
196 		offsetof(struct rte_ring_hts_headtail, ht.pos.tail));
197 
198 	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
199 		offsetof(struct rte_ring_rts_headtail, sync_type));
200 	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=
201 		offsetof(struct rte_ring_rts_headtail, tail.val.pos));
202 
203 	/* future proof flags, only allow supported values */
204 	if (flags & ~RING_F_MASK) {
205 		RING_LOG(ERR,
206 			"Unsupported flags requested %#x", flags);
207 		return -EINVAL;
208 	}
209 
210 	/* init the ring structure */
211 	memset(r, 0, sizeof(*r));
212 	ret = strlcpy(r->name, name, sizeof(r->name));
213 	if (ret < 0 || ret >= (int)sizeof(r->name))
214 		return -ENAMETOOLONG;
215 	r->flags = flags;
216 	ret = get_sync_type(flags, &r->prod.sync_type, &r->cons.sync_type);
217 	if (ret != 0)
218 		return ret;
219 
220 	if (flags & RING_F_EXACT_SZ) {
221 		r->size = rte_align32pow2(count + 1);
222 		r->mask = r->size - 1;
223 		r->capacity = count;
224 	} else {
225 		if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
226 			RING_LOG(ERR,
227 				"Requested size is invalid, must be power of 2, and not exceed the size limit %u",
228 				RTE_RING_SZ_MASK);
229 			return -EINVAL;
230 		}
231 		r->size = count;
232 		r->mask = count - 1;
233 		r->capacity = r->mask;
234 	}
235 
236 	/* set default values for head-tail distance */
237 	if (flags & RING_F_MP_RTS_ENQ)
238 		rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF);
239 	if (flags & RING_F_MC_RTS_DEQ)
240 		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
241 
242 	return 0;
243 }
244 
245 /* create the ring for a given element size */
246 struct rte_ring *
247 rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count,
248 		int socket_id, unsigned int flags)
249 {
250 	char mz_name[RTE_MEMZONE_NAMESIZE];
251 	struct rte_ring *r;
252 	struct rte_tailq_entry *te;
253 	const struct rte_memzone *mz;
254 	ssize_t ring_size;
255 	int mz_flags = 0;
256 	struct rte_ring_list* ring_list = NULL;
257 	const unsigned int requested_count = count;
258 	int ret;
259 
260 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
261 
262 	/* for an exact size ring, round up from count to a power of two */
263 	if (flags & RING_F_EXACT_SZ)
264 		count = rte_align32pow2(count + 1);
265 
266 	ring_size = rte_ring_get_memsize_elem(esize, count);
267 	if (ring_size < 0) {
268 		rte_errno = -ring_size;
269 		return NULL;
270 	}
271 
272 	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
273 		RTE_RING_MZ_PREFIX, name);
274 	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
275 		rte_errno = ENAMETOOLONG;
276 		return NULL;
277 	}
278 
279 	te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0);
280 	if (te == NULL) {
281 		RING_LOG(ERR, "Cannot reserve memory for tailq");
282 		rte_errno = ENOMEM;
283 		return NULL;
284 	}
285 
286 	rte_mcfg_tailq_write_lock();
287 
288 	/* reserve a memory zone for this ring. If we can't get rte_config or
289 	 * we are secondary process, the memzone_reserve function will set
290 	 * rte_errno for us appropriately - hence no check in this function
291 	 */
292 	mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id,
293 					 mz_flags, alignof(typeof(*r)));
294 	if (mz != NULL) {
295 		r = mz->addr;
296 		/* no need to check return value here, we already checked the
297 		 * arguments above */
298 		rte_ring_init(r, name, requested_count, flags);
299 
300 		te->data = (void *) r;
301 		r->memzone = mz;
302 
303 		TAILQ_INSERT_TAIL(ring_list, te, next);
304 	} else {
305 		r = NULL;
306 		RING_LOG(ERR, "Cannot reserve memory");
307 		rte_free(te);
308 	}
309 	rte_mcfg_tailq_write_unlock();
310 
311 	return r;
312 }
313 
314 /* create the ring */
315 struct rte_ring *
316 rte_ring_create(const char *name, unsigned int count, int socket_id,
317 		unsigned int flags)
318 {
319 	return rte_ring_create_elem(name, sizeof(void *), count, socket_id,
320 		flags);
321 }
322 
323 /* free the ring */
324 void
325 rte_ring_free(struct rte_ring *r)
326 {
327 	struct rte_ring_list *ring_list = NULL;
328 	struct rte_tailq_entry *te;
329 
330 	if (r == NULL)
331 		return;
332 
333 	/*
334 	 * Ring was not created with rte_ring_create,
335 	 * therefore, there is no memzone to free.
336 	 */
337 	if (r->memzone == NULL) {
338 		RING_LOG(ERR,
339 			"Cannot free ring, not created with rte_ring_create()");
340 		return;
341 	}
342 
343 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
344 	rte_mcfg_tailq_write_lock();
345 
346 	/* find out tailq entry */
347 	TAILQ_FOREACH(te, ring_list, next) {
348 		if (te->data == (void *) r)
349 			break;
350 	}
351 
352 	if (te == NULL) {
353 		rte_mcfg_tailq_write_unlock();
354 		return;
355 	}
356 
357 	TAILQ_REMOVE(ring_list, te, next);
358 
359 	rte_mcfg_tailq_write_unlock();
360 
361 	if (rte_memzone_free(r->memzone) != 0)
362 		RING_LOG(ERR, "Cannot free memory");
363 
364 	rte_free(te);
365 }
366 
367 static const char *
368 ring_get_sync_type(const enum rte_ring_sync_type st)
369 {
370 	switch (st) {
371 	case RTE_RING_SYNC_ST:
372 		return "ST";
373 	case RTE_RING_SYNC_MT:
374 		return "MT";
375 	case RTE_RING_SYNC_MT_RTS:
376 		return "MT_RTS";
377 	case RTE_RING_SYNC_MT_HTS:
378 		return "MT_HTS";
379 	default:
380 		return "unknown";
381 	}
382 }
383 
384 static void
385 ring_dump_ht_headtail(FILE *f, const char *prefix,
386 		const struct rte_ring_headtail *ht)
387 {
388 	fprintf(f, "%ssync_type=%s\n", prefix,
389 			ring_get_sync_type(ht->sync_type));
390 	fprintf(f, "%shead=%"PRIu32"\n", prefix, ht->head);
391 	fprintf(f, "%stail=%"PRIu32"\n", prefix, ht->tail);
392 }
393 
394 static void
395 ring_dump_rts_headtail(FILE *f, const char *prefix,
396 		const struct rte_ring_rts_headtail *rts)
397 {
398 	fprintf(f, "%ssync_type=%s\n", prefix,
399 			ring_get_sync_type(rts->sync_type));
400 	fprintf(f, "%shead.pos=%"PRIu32"\n", prefix, rts->head.val.pos);
401 	fprintf(f, "%shead.cnt=%"PRIu32"\n", prefix, rts->head.val.cnt);
402 	fprintf(f, "%stail.pos=%"PRIu32"\n", prefix, rts->tail.val.pos);
403 	fprintf(f, "%stail.cnt=%"PRIu32"\n", prefix, rts->tail.val.cnt);
404 	fprintf(f, "%shtd_max=%"PRIu32"\n", prefix, rts->htd_max);
405 }
406 
407 static void
408 ring_dump_hts_headtail(FILE *f, const char *prefix,
409 		const struct rte_ring_hts_headtail *hts)
410 {
411 	fprintf(f, "%ssync_type=%s\n", prefix,
412 			ring_get_sync_type(hts->sync_type));
413 	fprintf(f, "%shead=%"PRIu32"\n", prefix, hts->ht.pos.head);
414 	fprintf(f, "%stail=%"PRIu32"\n", prefix, hts->ht.pos.tail);
415 }
416 
417 void
418 rte_ring_headtail_dump(FILE *f, const char *prefix,
419 		const struct rte_ring_headtail *r)
420 {
421 	if (f == NULL || r == NULL)
422 		return;
423 
424 	prefix = (prefix != NULL) ? prefix : "";
425 
426 	switch (r->sync_type) {
427 	case RTE_RING_SYNC_ST:
428 	case RTE_RING_SYNC_MT:
429 		ring_dump_ht_headtail(f, prefix, r);
430 		break;
431 	case RTE_RING_SYNC_MT_RTS:
432 		ring_dump_rts_headtail(f, prefix,
433 				(const struct rte_ring_rts_headtail *)r);
434 		break;
435 	case RTE_RING_SYNC_MT_HTS:
436 		ring_dump_hts_headtail(f, prefix,
437 				(const struct rte_ring_hts_headtail *)r);
438 		break;
439 	default:
440 		RING_LOG(ERR, "Invalid ring sync type detected");
441 	}
442 }
443 
444 /* dump the status of the ring on the console */
445 void
446 rte_ring_dump(FILE *f, const struct rte_ring *r)
447 {
448 	if (f == NULL || r == NULL)
449 		return;
450 
451 	fprintf(f, "ring <%s>@%p\n", r->name, r);
452 	fprintf(f, "  flags=%x\n", r->flags);
453 	fprintf(f, "  size=%"PRIu32"\n", r->size);
454 	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
455 	fprintf(f, "  used=%u\n", rte_ring_count(r));
456 	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
457 
458 	rte_ring_headtail_dump(f, "  cons.", &(r->cons));
459 	rte_ring_headtail_dump(f, "  prod.", &(r->prod));
460 }
461 
462 /* dump the status of all rings on the console */
463 void
464 rte_ring_list_dump(FILE *f)
465 {
466 	const struct rte_tailq_entry *te;
467 	struct rte_ring_list *ring_list;
468 
469 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
470 
471 	rte_mcfg_tailq_read_lock();
472 
473 	TAILQ_FOREACH(te, ring_list, next) {
474 		rte_ring_dump(f, (struct rte_ring *) te->data);
475 	}
476 
477 	rte_mcfg_tailq_read_unlock();
478 }
479 
480 /* search a ring from its name */
481 struct rte_ring *
482 rte_ring_lookup(const char *name)
483 {
484 	struct rte_tailq_entry *te;
485 	struct rte_ring *r = NULL;
486 	struct rte_ring_list *ring_list;
487 
488 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
489 
490 	rte_mcfg_tailq_read_lock();
491 
492 	TAILQ_FOREACH(te, ring_list, next) {
493 		r = (struct rte_ring *) te->data;
494 		if (strncmp(name, r->name, RTE_RING_NAMESIZE) == 0)
495 			break;
496 	}
497 
498 	rte_mcfg_tailq_read_unlock();
499 
500 	if (te == NULL) {
501 		rte_errno = ENOENT;
502 		return NULL;
503 	}
504 
505 	return r;
506 }
507 
508 static void
509 ring_walk(void (*func)(struct rte_ring *, void *), void *arg)
510 {
511 	struct rte_ring_list *ring_list;
512 	struct rte_tailq_entry *tailq_entry;
513 
514 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
515 	rte_mcfg_tailq_read_lock();
516 
517 	TAILQ_FOREACH(tailq_entry, ring_list, next) {
518 		(*func)((struct rte_ring *) tailq_entry->data, arg);
519 	}
520 
521 	rte_mcfg_tailq_read_unlock();
522 }
523 
524 static void
525 ring_list_cb(struct rte_ring *r, void *arg)
526 {
527 	struct rte_tel_data *d = (struct rte_tel_data *)arg;
528 
529 	rte_tel_data_add_array_string(d, r->name);
530 }
531 
532 static int
533 ring_handle_list(const char *cmd __rte_unused,
534 		const char *params __rte_unused, struct rte_tel_data *d)
535 {
536 	rte_tel_data_start_array(d, RTE_TEL_STRING_VAL);
537 	ring_walk(ring_list_cb, d);
538 	return 0;
539 }
540 
541 static const char *
542 ring_prod_sync_type_to_name(struct rte_ring *r)
543 {
544 	switch (r->prod.sync_type) {
545 	case RTE_RING_SYNC_MT:
546 		return "MP";
547 	case RTE_RING_SYNC_ST:
548 		return "SP";
549 	case RTE_RING_SYNC_MT_RTS:
550 		return "MP_RTS";
551 	case RTE_RING_SYNC_MT_HTS:
552 		return "MP_HTS";
553 	default:
554 		return "Unknown";
555 	}
556 }
557 
558 static const char *
559 ring_cons_sync_type_to_name(struct rte_ring *r)
560 {
561 	switch (r->cons.sync_type) {
562 	case RTE_RING_SYNC_MT:
563 		return "MC";
564 	case RTE_RING_SYNC_ST:
565 		return "SC";
566 	case RTE_RING_SYNC_MT_RTS:
567 		return "MC_RTS";
568 	case RTE_RING_SYNC_MT_HTS:
569 		return "MC_HTS";
570 	default:
571 		return "Unknown";
572 	}
573 }
574 
575 struct ring_info_cb_arg {
576 	char *ring_name;
577 	struct rte_tel_data *d;
578 };
579 
580 static void
581 ring_info_cb(struct rte_ring *r, void *arg)
582 {
583 	struct ring_info_cb_arg *ring_arg = (struct ring_info_cb_arg *)arg;
584 	struct rte_tel_data *d = ring_arg->d;
585 	const struct rte_memzone *mz;
586 
587 	if (strncmp(r->name, ring_arg->ring_name, RTE_RING_NAMESIZE))
588 		return;
589 
590 	rte_tel_data_add_dict_string(d, "name", r->name);
591 	rte_tel_data_add_dict_int(d, "socket", r->memzone->socket_id);
592 	rte_tel_data_add_dict_int(d, "flags", r->flags);
593 	rte_tel_data_add_dict_string(d, "producer_type",
594 		ring_prod_sync_type_to_name(r));
595 	rte_tel_data_add_dict_string(d, "consumer_type",
596 		ring_cons_sync_type_to_name(r));
597 	rte_tel_data_add_dict_uint(d, "size", r->size);
598 	rte_tel_data_add_dict_uint_hex(d, "mask", r->mask, 0);
599 	rte_tel_data_add_dict_uint(d, "capacity", r->capacity);
600 	rte_tel_data_add_dict_uint(d, "used_count", rte_ring_count(r));
601 
602 	mz = r->memzone;
603 	if (mz == NULL)
604 		return;
605 	rte_tel_data_add_dict_string(d, "mz_name", mz->name);
606 	rte_tel_data_add_dict_uint(d, "mz_len", mz->len);
607 	rte_tel_data_add_dict_uint(d, "mz_hugepage_sz", mz->hugepage_sz);
608 	rte_tel_data_add_dict_int(d, "mz_socket_id", mz->socket_id);
609 	rte_tel_data_add_dict_uint_hex(d, "mz_flags", mz->flags, 0);
610 }
611 
612 static int
613 ring_handle_info(const char *cmd __rte_unused, const char *params,
614 		struct rte_tel_data *d)
615 {
616 	char name[RTE_RING_NAMESIZE] = {0};
617 	struct ring_info_cb_arg ring_arg;
618 
619 	if (params == NULL || strlen(params) == 0 ||
620 		strlen(params) >= RTE_RING_NAMESIZE)
621 		return -EINVAL;
622 
623 	rte_strlcpy(name, params, RTE_RING_NAMESIZE);
624 
625 	ring_arg.ring_name = name;
626 	ring_arg.d = d;
627 
628 	rte_tel_data_start_dict(d);
629 	ring_walk(ring_info_cb, &ring_arg);
630 
631 	return 0;
632 }
633 
634 RTE_INIT(ring_init_telemetry)
635 {
636 	rte_telemetry_register_cmd("/ring/list", ring_handle_list,
637 		"Returns list of available rings. Takes no parameters");
638 	rte_telemetry_register_cmd("/ring/info", ring_handle_info,
639 		"Returns ring info. Parameters: ring_name.");
640 }
641