xref: /dpdk/lib/ring/rte_ring.c (revision e9fd1ebf981f361844aea9ec94e17f4bda5e1479)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2015 Intel Corporation
4  * Copyright (c) 2007,2008 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #include <stdalign.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <stdint.h>
14 #include <inttypes.h>
15 #include <errno.h>
16 #include <sys/queue.h>
17 
18 #include <rte_common.h>
19 #include <rte_log.h>
20 #include <rte_memzone.h>
21 #include <rte_malloc.h>
22 #include <rte_eal_memconfig.h>
23 #include <rte_errno.h>
24 #include <rte_string_fns.h>
25 #include <rte_tailq.h>
26 #include <rte_telemetry.h>
27 
28 #include "rte_ring.h"
29 #include "rte_ring_elem.h"
30 
31 RTE_LOG_REGISTER_DEFAULT(ring_logtype, INFO);
32 #define RTE_LOGTYPE_RING ring_logtype
33 #define RING_LOG(level, ...) \
34 	RTE_LOG_LINE(level, RING, "" __VA_ARGS__)
35 
36 TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
37 
38 static struct rte_tailq_elem rte_ring_tailq = {
39 	.name = RTE_TAILQ_RING_NAME,
40 };
41 EAL_REGISTER_TAILQ(rte_ring_tailq)
42 
43 /* mask of all valid flag values to ring_create() */
44 #define RING_F_MASK (RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ | \
45 		     RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ |	       \
46 		     RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ)
47 
48 /* true if x is a power of 2 */
49 #define POWEROF2(x) ((((x)-1) & (x)) == 0)
50 
51 /* by default set head/tail distance as 1/8 of ring capacity */
52 #define HTD_MAX_DEF	8
53 
54 /* return the size of memory occupied by a ring */
55 ssize_t
56 rte_ring_get_memsize_elem(unsigned int esize, unsigned int count)
57 {
58 	ssize_t sz;
59 
60 	/* Check if element size is a multiple of 4B */
61 	if (esize % 4 != 0) {
62 		RING_LOG(ERR, "element size is not a multiple of 4");
63 
64 		return -EINVAL;
65 	}
66 
67 	/* count must be a power of 2 */
68 	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
69 		RING_LOG(ERR,
70 			"Requested number of elements is invalid, must be power of 2, and not exceed %u",
71 			RTE_RING_SZ_MASK);
72 
73 		return -EINVAL;
74 	}
75 
76 	sz = sizeof(struct rte_ring) + (ssize_t)count * esize;
77 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
78 	return sz;
79 }
80 
81 /* return the size of memory occupied by a ring */
82 ssize_t
83 rte_ring_get_memsize(unsigned int count)
84 {
85 	return rte_ring_get_memsize_elem(sizeof(void *), count);
86 }
87 
88 /*
89  * internal helper function to reset prod/cons head-tail values.
90  */
91 static void
92 reset_headtail(void *p)
93 {
94 	struct rte_ring_headtail *ht;
95 	struct rte_ring_hts_headtail *ht_hts;
96 	struct rte_ring_rts_headtail *ht_rts;
97 
98 	ht = p;
99 	ht_hts = p;
100 	ht_rts = p;
101 
102 	switch (ht->sync_type) {
103 	case RTE_RING_SYNC_MT:
104 	case RTE_RING_SYNC_ST:
105 		ht->head = 0;
106 		ht->tail = 0;
107 		break;
108 	case RTE_RING_SYNC_MT_RTS:
109 		ht_rts->head.raw = 0;
110 		ht_rts->tail.raw = 0;
111 		break;
112 	case RTE_RING_SYNC_MT_HTS:
113 		ht_hts->ht.raw = 0;
114 		break;
115 	default:
116 		/* unknown sync mode */
117 		RTE_ASSERT(0);
118 	}
119 }
120 
121 void
122 rte_ring_reset(struct rte_ring *r)
123 {
124 	reset_headtail(&r->prod);
125 	reset_headtail(&r->cons);
126 }
127 
128 /*
129  * helper function, calculates sync_type values for prod and cons
130  * based on input flags. Returns zero at success or negative
131  * errno value otherwise.
132  */
133 static int
134 get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
135 	enum rte_ring_sync_type *cons_st)
136 {
137 	static const uint32_t prod_st_flags =
138 		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ);
139 	static const uint32_t cons_st_flags =
140 		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ);
141 
142 	switch (flags & prod_st_flags) {
143 	case 0:
144 		*prod_st = RTE_RING_SYNC_MT;
145 		break;
146 	case RING_F_SP_ENQ:
147 		*prod_st = RTE_RING_SYNC_ST;
148 		break;
149 	case RING_F_MP_RTS_ENQ:
150 		*prod_st = RTE_RING_SYNC_MT_RTS;
151 		break;
152 	case RING_F_MP_HTS_ENQ:
153 		*prod_st = RTE_RING_SYNC_MT_HTS;
154 		break;
155 	default:
156 		return -EINVAL;
157 	}
158 
159 	switch (flags & cons_st_flags) {
160 	case 0:
161 		*cons_st = RTE_RING_SYNC_MT;
162 		break;
163 	case RING_F_SC_DEQ:
164 		*cons_st = RTE_RING_SYNC_ST;
165 		break;
166 	case RING_F_MC_RTS_DEQ:
167 		*cons_st = RTE_RING_SYNC_MT_RTS;
168 		break;
169 	case RING_F_MC_HTS_DEQ:
170 		*cons_st = RTE_RING_SYNC_MT_HTS;
171 		break;
172 	default:
173 		return -EINVAL;
174 	}
175 
176 	return 0;
177 }
178 
179 int
180 rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
181 	unsigned int flags)
182 {
183 	int ret;
184 
185 	/* compilation-time checks */
186 	RTE_BUILD_BUG_ON((sizeof(struct rte_ring) &
187 			  RTE_CACHE_LINE_MASK) != 0);
188 	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) &
189 			  RTE_CACHE_LINE_MASK) != 0);
190 	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
191 			  RTE_CACHE_LINE_MASK) != 0);
192 
193 	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
194 		offsetof(struct rte_ring_hts_headtail, sync_type));
195 	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=
196 		offsetof(struct rte_ring_hts_headtail, ht.pos.tail));
197 
198 	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
199 		offsetof(struct rte_ring_rts_headtail, sync_type));
200 	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=
201 		offsetof(struct rte_ring_rts_headtail, tail.val.pos));
202 
203 	/* future proof flags, only allow supported values */
204 	if (flags & ~RING_F_MASK) {
205 		RING_LOG(ERR,
206 			"Unsupported flags requested %#x", flags);
207 		return -EINVAL;
208 	}
209 
210 	/* init the ring structure */
211 	memset(r, 0, sizeof(*r));
212 	ret = strlcpy(r->name, name, sizeof(r->name));
213 	if (ret < 0 || ret >= (int)sizeof(r->name))
214 		return -ENAMETOOLONG;
215 	r->flags = flags;
216 	ret = get_sync_type(flags, &r->prod.sync_type, &r->cons.sync_type);
217 	if (ret != 0)
218 		return ret;
219 
220 	if (flags & RING_F_EXACT_SZ) {
221 		r->size = rte_align32pow2(count + 1);
222 		r->mask = r->size - 1;
223 		r->capacity = count;
224 	} else {
225 		if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
226 			RING_LOG(ERR,
227 				"Requested size is invalid, must be power of 2, and not exceed the size limit %u",
228 				RTE_RING_SZ_MASK);
229 			return -EINVAL;
230 		}
231 		r->size = count;
232 		r->mask = count - 1;
233 		r->capacity = r->mask;
234 	}
235 
236 	/* set default values for head-tail distance */
237 	if (flags & RING_F_MP_RTS_ENQ)
238 		rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF);
239 	if (flags & RING_F_MC_RTS_DEQ)
240 		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
241 
242 	return 0;
243 }
244 
245 /* create the ring for a given element size */
246 struct rte_ring *
247 rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count,
248 		int socket_id, unsigned int flags)
249 {
250 	char mz_name[RTE_MEMZONE_NAMESIZE];
251 	struct rte_ring *r;
252 	struct rte_tailq_entry *te;
253 	const struct rte_memzone *mz;
254 	ssize_t ring_size;
255 	int mz_flags = 0;
256 	struct rte_ring_list* ring_list = NULL;
257 	const unsigned int requested_count = count;
258 	int ret;
259 
260 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
261 
262 	/* for an exact size ring, round up from count to a power of two */
263 	if (flags & RING_F_EXACT_SZ)
264 		count = rte_align32pow2(count + 1);
265 
266 	ring_size = rte_ring_get_memsize_elem(esize, count);
267 	if (ring_size < 0) {
268 		rte_errno = -ring_size;
269 		return NULL;
270 	}
271 
272 	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
273 		RTE_RING_MZ_PREFIX, name);
274 	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
275 		rte_errno = ENAMETOOLONG;
276 		return NULL;
277 	}
278 
279 	te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0);
280 	if (te == NULL) {
281 		RING_LOG(ERR, "Cannot reserve memory for tailq");
282 		rte_errno = ENOMEM;
283 		return NULL;
284 	}
285 
286 	rte_mcfg_tailq_write_lock();
287 
288 	/* reserve a memory zone for this ring. If we can't get rte_config or
289 	 * we are secondary process, the memzone_reserve function will set
290 	 * rte_errno for us appropriately - hence no check in this function
291 	 */
292 	mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id,
293 					 mz_flags, alignof(typeof(*r)));
294 	if (mz != NULL) {
295 		r = mz->addr;
296 		/* no need to check return value here, we already checked the
297 		 * arguments above */
298 		rte_ring_init(r, name, requested_count, flags);
299 
300 		te->data = (void *) r;
301 		r->memzone = mz;
302 
303 		TAILQ_INSERT_TAIL(ring_list, te, next);
304 	} else {
305 		r = NULL;
306 		RING_LOG(ERR, "Cannot reserve memory");
307 		rte_free(te);
308 	}
309 	rte_mcfg_tailq_write_unlock();
310 
311 	return r;
312 }
313 
314 /* create the ring */
315 struct rte_ring *
316 rte_ring_create(const char *name, unsigned int count, int socket_id,
317 		unsigned int flags)
318 {
319 	return rte_ring_create_elem(name, sizeof(void *), count, socket_id,
320 		flags);
321 }
322 
323 /* free the ring */
324 void
325 rte_ring_free(struct rte_ring *r)
326 {
327 	struct rte_ring_list *ring_list = NULL;
328 	struct rte_tailq_entry *te;
329 
330 	if (r == NULL)
331 		return;
332 
333 	/*
334 	 * Ring was not created with rte_ring_create,
335 	 * therefore, there is no memzone to free.
336 	 */
337 	if (r->memzone == NULL) {
338 		RING_LOG(ERR,
339 			"Cannot free ring, not created with rte_ring_create()");
340 		return;
341 	}
342 
343 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
344 	rte_mcfg_tailq_write_lock();
345 
346 	/* find out tailq entry */
347 	TAILQ_FOREACH(te, ring_list, next) {
348 		if (te->data == (void *) r)
349 			break;
350 	}
351 
352 	if (te == NULL) {
353 		rte_mcfg_tailq_write_unlock();
354 		return;
355 	}
356 
357 	TAILQ_REMOVE(ring_list, te, next);
358 
359 	rte_mcfg_tailq_write_unlock();
360 
361 	if (rte_memzone_free(r->memzone) != 0)
362 		RING_LOG(ERR, "Cannot free memory");
363 
364 	rte_free(te);
365 }
366 
367 /* dump the status of the ring on the console */
368 void
369 rte_ring_dump(FILE *f, const struct rte_ring *r)
370 {
371 	fprintf(f, "ring <%s>@%p\n", r->name, r);
372 	fprintf(f, "  flags=%x\n", r->flags);
373 	fprintf(f, "  size=%"PRIu32"\n", r->size);
374 	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
375 	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
376 	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
377 	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
378 	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
379 	fprintf(f, "  used=%u\n", rte_ring_count(r));
380 	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
381 }
382 
383 /* dump the status of all rings on the console */
384 void
385 rte_ring_list_dump(FILE *f)
386 {
387 	const struct rte_tailq_entry *te;
388 	struct rte_ring_list *ring_list;
389 
390 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
391 
392 	rte_mcfg_tailq_read_lock();
393 
394 	TAILQ_FOREACH(te, ring_list, next) {
395 		rte_ring_dump(f, (struct rte_ring *) te->data);
396 	}
397 
398 	rte_mcfg_tailq_read_unlock();
399 }
400 
401 /* search a ring from its name */
402 struct rte_ring *
403 rte_ring_lookup(const char *name)
404 {
405 	struct rte_tailq_entry *te;
406 	struct rte_ring *r = NULL;
407 	struct rte_ring_list *ring_list;
408 
409 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
410 
411 	rte_mcfg_tailq_read_lock();
412 
413 	TAILQ_FOREACH(te, ring_list, next) {
414 		r = (struct rte_ring *) te->data;
415 		if (strncmp(name, r->name, RTE_RING_NAMESIZE) == 0)
416 			break;
417 	}
418 
419 	rte_mcfg_tailq_read_unlock();
420 
421 	if (te == NULL) {
422 		rte_errno = ENOENT;
423 		return NULL;
424 	}
425 
426 	return r;
427 }
428 
429 static void
430 ring_walk(void (*func)(struct rte_ring *, void *), void *arg)
431 {
432 	struct rte_ring_list *ring_list;
433 	struct rte_tailq_entry *tailq_entry;
434 
435 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
436 	rte_mcfg_tailq_read_lock();
437 
438 	TAILQ_FOREACH(tailq_entry, ring_list, next) {
439 		(*func)((struct rte_ring *) tailq_entry->data, arg);
440 	}
441 
442 	rte_mcfg_tailq_read_unlock();
443 }
444 
445 static void
446 ring_list_cb(struct rte_ring *r, void *arg)
447 {
448 	struct rte_tel_data *d = (struct rte_tel_data *)arg;
449 
450 	rte_tel_data_add_array_string(d, r->name);
451 }
452 
453 static int
454 ring_handle_list(const char *cmd __rte_unused,
455 		const char *params __rte_unused, struct rte_tel_data *d)
456 {
457 	rte_tel_data_start_array(d, RTE_TEL_STRING_VAL);
458 	ring_walk(ring_list_cb, d);
459 	return 0;
460 }
461 
462 static const char *
463 ring_prod_sync_type_to_name(struct rte_ring *r)
464 {
465 	switch (r->prod.sync_type) {
466 	case RTE_RING_SYNC_MT:
467 		return "MP";
468 	case RTE_RING_SYNC_ST:
469 		return "SP";
470 	case RTE_RING_SYNC_MT_RTS:
471 		return "MP_RTS";
472 	case RTE_RING_SYNC_MT_HTS:
473 		return "MP_HTS";
474 	default:
475 		return "Unknown";
476 	}
477 }
478 
479 static const char *
480 ring_cons_sync_type_to_name(struct rte_ring *r)
481 {
482 	switch (r->cons.sync_type) {
483 	case RTE_RING_SYNC_MT:
484 		return "MC";
485 	case RTE_RING_SYNC_ST:
486 		return "SC";
487 	case RTE_RING_SYNC_MT_RTS:
488 		return "MC_RTS";
489 	case RTE_RING_SYNC_MT_HTS:
490 		return "MC_HTS";
491 	default:
492 		return "Unknown";
493 	}
494 }
495 
496 struct ring_info_cb_arg {
497 	char *ring_name;
498 	struct rte_tel_data *d;
499 };
500 
501 static void
502 ring_info_cb(struct rte_ring *r, void *arg)
503 {
504 	struct ring_info_cb_arg *ring_arg = (struct ring_info_cb_arg *)arg;
505 	struct rte_tel_data *d = ring_arg->d;
506 	const struct rte_memzone *mz;
507 
508 	if (strncmp(r->name, ring_arg->ring_name, RTE_RING_NAMESIZE))
509 		return;
510 
511 	rte_tel_data_add_dict_string(d, "name", r->name);
512 	rte_tel_data_add_dict_int(d, "socket", r->memzone->socket_id);
513 	rte_tel_data_add_dict_int(d, "flags", r->flags);
514 	rte_tel_data_add_dict_string(d, "producer_type",
515 		ring_prod_sync_type_to_name(r));
516 	rte_tel_data_add_dict_string(d, "consumer_type",
517 		ring_cons_sync_type_to_name(r));
518 	rte_tel_data_add_dict_uint(d, "size", r->size);
519 	rte_tel_data_add_dict_uint_hex(d, "mask", r->mask, 0);
520 	rte_tel_data_add_dict_uint(d, "capacity", r->capacity);
521 	rte_tel_data_add_dict_uint(d, "used_count", rte_ring_count(r));
522 
523 	mz = r->memzone;
524 	if (mz == NULL)
525 		return;
526 	rte_tel_data_add_dict_string(d, "mz_name", mz->name);
527 	rte_tel_data_add_dict_uint(d, "mz_len", mz->len);
528 	rte_tel_data_add_dict_uint(d, "mz_hugepage_sz", mz->hugepage_sz);
529 	rte_tel_data_add_dict_int(d, "mz_socket_id", mz->socket_id);
530 	rte_tel_data_add_dict_uint_hex(d, "mz_flags", mz->flags, 0);
531 }
532 
533 static int
534 ring_handle_info(const char *cmd __rte_unused, const char *params,
535 		struct rte_tel_data *d)
536 {
537 	char name[RTE_RING_NAMESIZE] = {0};
538 	struct ring_info_cb_arg ring_arg;
539 
540 	if (params == NULL || strlen(params) == 0 ||
541 		strlen(params) >= RTE_RING_NAMESIZE)
542 		return -EINVAL;
543 
544 	rte_strlcpy(name, params, RTE_RING_NAMESIZE);
545 
546 	ring_arg.ring_name = name;
547 	ring_arg.d = d;
548 
549 	rte_tel_data_start_dict(d);
550 	ring_walk(ring_info_cb, &ring_arg);
551 
552 	return 0;
553 }
554 
555 RTE_INIT(ring_init_telemetry)
556 {
557 	rte_telemetry_register_cmd("/ring/list", ring_handle_list,
558 		"Returns list of available rings. Takes no parameters");
559 	rte_telemetry_register_cmd("/ring/info", ring_handle_info,
560 		"Returns ring info. Parameters: ring_name.");
561 }
562