xref: /dpdk/lib/rcu/rte_rcu_qsbr.c (revision ffe827f38e6e0be8a307d7ef9c0e1347874f0af7)
199a2dd95SBruce Richardson /* SPDX-License-Identifier: BSD-3-Clause
299a2dd95SBruce Richardson  *
399a2dd95SBruce Richardson  * Copyright (c) 2018-2020 Arm Limited
499a2dd95SBruce Richardson  */
599a2dd95SBruce Richardson 
699a2dd95SBruce Richardson #include <stdio.h>
799a2dd95SBruce Richardson #include <string.h>
899a2dd95SBruce Richardson #include <stdint.h>
999a2dd95SBruce Richardson #include <inttypes.h>
1099a2dd95SBruce Richardson #include <errno.h>
1199a2dd95SBruce Richardson 
1299a2dd95SBruce Richardson #include <rte_common.h>
1399a2dd95SBruce Richardson #include <rte_log.h>
1499a2dd95SBruce Richardson #include <rte_memory.h>
1599a2dd95SBruce Richardson #include <rte_malloc.h>
1699a2dd95SBruce Richardson #include <rte_errno.h>
1799a2dd95SBruce Richardson #include <rte_ring_elem.h>
1899a2dd95SBruce Richardson 
1999a2dd95SBruce Richardson #include "rte_rcu_qsbr.h"
2099a2dd95SBruce Richardson #include "rcu_qsbr_pvt.h"
2199a2dd95SBruce Richardson 
220f1dc8cbSTyler Retzlaff #define RCU_LOG(level, ...) \
230f1dc8cbSTyler Retzlaff 	RTE_LOG_LINE_PREFIX(level, RCU, "%s(): ", __func__, __VA_ARGS__)
24ae67895bSDavid Marchand 
2599a2dd95SBruce Richardson /* Get the memory size of QSBR variable */
2699a2dd95SBruce Richardson size_t
2799a2dd95SBruce Richardson rte_rcu_qsbr_get_memsize(uint32_t max_threads)
2899a2dd95SBruce Richardson {
2999a2dd95SBruce Richardson 	size_t sz;
3099a2dd95SBruce Richardson 
3199a2dd95SBruce Richardson 	if (max_threads == 0) {
32ae67895bSDavid Marchand 		RCU_LOG(ERR, "Invalid max_threads %u", max_threads);
3399a2dd95SBruce Richardson 		rte_errno = EINVAL;
3499a2dd95SBruce Richardson 
3599a2dd95SBruce Richardson 		return 1;
3699a2dd95SBruce Richardson 	}
3799a2dd95SBruce Richardson 
3899a2dd95SBruce Richardson 	sz = sizeof(struct rte_rcu_qsbr);
3999a2dd95SBruce Richardson 
4099a2dd95SBruce Richardson 	/* Add the size of quiescent state counter array */
4199a2dd95SBruce Richardson 	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
4299a2dd95SBruce Richardson 
4399a2dd95SBruce Richardson 	/* Add the size of the registered thread ID bitmap array */
4499a2dd95SBruce Richardson 	sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
4599a2dd95SBruce Richardson 
4699a2dd95SBruce Richardson 	return sz;
4799a2dd95SBruce Richardson }
4899a2dd95SBruce Richardson 
4999a2dd95SBruce Richardson /* Initialize a quiescent state variable */
5099a2dd95SBruce Richardson int
5199a2dd95SBruce Richardson rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
5299a2dd95SBruce Richardson {
5399a2dd95SBruce Richardson 	size_t sz;
5499a2dd95SBruce Richardson 
5599a2dd95SBruce Richardson 	if (v == NULL) {
56ae67895bSDavid Marchand 		RCU_LOG(ERR, "Invalid input parameter");
5799a2dd95SBruce Richardson 		rte_errno = EINVAL;
5899a2dd95SBruce Richardson 
5999a2dd95SBruce Richardson 		return 1;
6099a2dd95SBruce Richardson 	}
6199a2dd95SBruce Richardson 
6299a2dd95SBruce Richardson 	sz = rte_rcu_qsbr_get_memsize(max_threads);
6399a2dd95SBruce Richardson 	if (sz == 1)
6499a2dd95SBruce Richardson 		return 1;
6599a2dd95SBruce Richardson 
6699a2dd95SBruce Richardson 	/* Set all the threads to offline */
6799a2dd95SBruce Richardson 	memset(v, 0, sz);
6899a2dd95SBruce Richardson 	v->max_threads = max_threads;
6999a2dd95SBruce Richardson 	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
7099a2dd95SBruce Richardson 			__RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
7199a2dd95SBruce Richardson 			__RTE_QSBR_THRID_ARRAY_ELM_SIZE;
7299a2dd95SBruce Richardson 	v->token = __RTE_QSBR_CNT_INIT;
7399a2dd95SBruce Richardson 	v->acked_token = __RTE_QSBR_CNT_INIT - 1;
7499a2dd95SBruce Richardson 
7599a2dd95SBruce Richardson 	return 0;
7699a2dd95SBruce Richardson }
7799a2dd95SBruce Richardson 
7899a2dd95SBruce Richardson /* Register a reader thread to report its quiescent state
7999a2dd95SBruce Richardson  * on a QS variable.
8099a2dd95SBruce Richardson  */
8199a2dd95SBruce Richardson int
8299a2dd95SBruce Richardson rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
8399a2dd95SBruce Richardson {
848d03152eSDoug Foster 	unsigned int i, id;
858d03152eSDoug Foster 	uint64_t old_bmap;
8699a2dd95SBruce Richardson 
8799a2dd95SBruce Richardson 	if (v == NULL || thread_id >= v->max_threads) {
88ae67895bSDavid Marchand 		RCU_LOG(ERR, "Invalid input parameter");
8999a2dd95SBruce Richardson 		rte_errno = EINVAL;
9099a2dd95SBruce Richardson 
9199a2dd95SBruce Richardson 		return 1;
9299a2dd95SBruce Richardson 	}
9399a2dd95SBruce Richardson 
94ae282b06SDavid Marchand 	__RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
9599a2dd95SBruce Richardson 				v->qsbr_cnt[thread_id].lock_cnt);
9699a2dd95SBruce Richardson 
9799a2dd95SBruce Richardson 	id = thread_id & __RTE_QSBR_THRID_MASK;
9899a2dd95SBruce Richardson 	i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
9999a2dd95SBruce Richardson 
1008d03152eSDoug Foster 	/* Add the thread to the bitmap of registered threads */
1018d03152eSDoug Foster 	old_bmap = rte_atomic_fetch_or_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
102*ffe827f3SAndre Muezerie 						RTE_BIT64(id), rte_memory_order_release);
10399a2dd95SBruce Richardson 
1048d03152eSDoug Foster 	/* Increment the number of threads registered only if the thread was not already
1058d03152eSDoug Foster 	 * registered
10699a2dd95SBruce Richardson 	 */
107*ffe827f3SAndre Muezerie 	if (!(old_bmap & RTE_BIT64(id)))
1088d03152eSDoug Foster 		rte_atomic_fetch_add_explicit(&v->num_threads, 1, rte_memory_order_relaxed);
10999a2dd95SBruce Richardson 
11099a2dd95SBruce Richardson 	return 0;
11199a2dd95SBruce Richardson }
11299a2dd95SBruce Richardson 
11399a2dd95SBruce Richardson /* Remove a reader thread, from the list of threads reporting their
11499a2dd95SBruce Richardson  * quiescent state on a QS variable.
11599a2dd95SBruce Richardson  */
11699a2dd95SBruce Richardson int
11799a2dd95SBruce Richardson rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
11899a2dd95SBruce Richardson {
1198d03152eSDoug Foster 	unsigned int i, id;
1208d03152eSDoug Foster 	uint64_t old_bmap;
12199a2dd95SBruce Richardson 
12299a2dd95SBruce Richardson 	if (v == NULL || thread_id >= v->max_threads) {
123ae67895bSDavid Marchand 		RCU_LOG(ERR, "Invalid input parameter");
12499a2dd95SBruce Richardson 		rte_errno = EINVAL;
12599a2dd95SBruce Richardson 
12699a2dd95SBruce Richardson 		return 1;
12799a2dd95SBruce Richardson 	}
12899a2dd95SBruce Richardson 
129ae282b06SDavid Marchand 	__RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
13099a2dd95SBruce Richardson 				v->qsbr_cnt[thread_id].lock_cnt);
13199a2dd95SBruce Richardson 
13299a2dd95SBruce Richardson 	id = thread_id & __RTE_QSBR_THRID_MASK;
13399a2dd95SBruce Richardson 	i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
13499a2dd95SBruce Richardson 
13599a2dd95SBruce Richardson 	/* Make sure any loads of the shared data structure are
1368d03152eSDoug Foster 	 * completed before removal of the thread from the bitmap of
13799a2dd95SBruce Richardson 	 * reporting threads.
13899a2dd95SBruce Richardson 	 */
1398d03152eSDoug Foster 	old_bmap = rte_atomic_fetch_and_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
140*ffe827f3SAndre Muezerie 						 ~RTE_BIT64(id), rte_memory_order_release);
14199a2dd95SBruce Richardson 
1428d03152eSDoug Foster 	/* Decrement the number of threads unregistered only if the thread was not already
1438d03152eSDoug Foster 	 * unregistered
14499a2dd95SBruce Richardson 	 */
145*ffe827f3SAndre Muezerie 	if (old_bmap & RTE_BIT64(id))
1468d03152eSDoug Foster 		rte_atomic_fetch_sub_explicit(&v->num_threads, 1, rte_memory_order_relaxed);
14799a2dd95SBruce Richardson 
14899a2dd95SBruce Richardson 	return 0;
14999a2dd95SBruce Richardson }
15099a2dd95SBruce Richardson 
15199a2dd95SBruce Richardson /* Wait till the reader threads have entered quiescent state. */
15299a2dd95SBruce Richardson void
15399a2dd95SBruce Richardson rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
15499a2dd95SBruce Richardson {
15599a2dd95SBruce Richardson 	uint64_t t;
15699a2dd95SBruce Richardson 
15799a2dd95SBruce Richardson 	RTE_ASSERT(v != NULL);
15899a2dd95SBruce Richardson 
15999a2dd95SBruce Richardson 	t = rte_rcu_qsbr_start(v);
16099a2dd95SBruce Richardson 
16199a2dd95SBruce Richardson 	/* If the current thread has readside critical section,
16299a2dd95SBruce Richardson 	 * update its quiescent state status.
16399a2dd95SBruce Richardson 	 */
16499a2dd95SBruce Richardson 	if (thread_id != RTE_QSBR_THRID_INVALID)
16599a2dd95SBruce Richardson 		rte_rcu_qsbr_quiescent(v, thread_id);
16699a2dd95SBruce Richardson 
16799a2dd95SBruce Richardson 	/* Wait for other readers to enter quiescent state */
16899a2dd95SBruce Richardson 	rte_rcu_qsbr_check(v, t, true);
16999a2dd95SBruce Richardson }
17099a2dd95SBruce Richardson 
17199a2dd95SBruce Richardson /* Dump the details of a single quiescent state variable to a file. */
17299a2dd95SBruce Richardson int
17399a2dd95SBruce Richardson rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
17499a2dd95SBruce Richardson {
17599a2dd95SBruce Richardson 	uint64_t bmap;
17699a2dd95SBruce Richardson 	uint32_t i, t, id;
17799a2dd95SBruce Richardson 
17899a2dd95SBruce Richardson 	if (v == NULL || f == NULL) {
179ae67895bSDavid Marchand 		RCU_LOG(ERR, "Invalid input parameter");
18099a2dd95SBruce Richardson 		rte_errno = EINVAL;
18199a2dd95SBruce Richardson 
18299a2dd95SBruce Richardson 		return 1;
18399a2dd95SBruce Richardson 	}
18499a2dd95SBruce Richardson 
18599a2dd95SBruce Richardson 	fprintf(f, "\nQuiescent State Variable @%p\n", v);
18699a2dd95SBruce Richardson 
18799a2dd95SBruce Richardson 	fprintf(f, "  QS variable memory size = %zu\n",
18899a2dd95SBruce Richardson 				rte_rcu_qsbr_get_memsize(v->max_threads));
18999a2dd95SBruce Richardson 	fprintf(f, "  Given # max threads = %u\n", v->max_threads);
19099a2dd95SBruce Richardson 	fprintf(f, "  Current # threads = %u\n", v->num_threads);
19199a2dd95SBruce Richardson 
19299a2dd95SBruce Richardson 	fprintf(f, "  Registered thread IDs = ");
19399a2dd95SBruce Richardson 	for (i = 0; i < v->num_elems; i++) {
194002a92b9STyler Retzlaff 		bmap = rte_atomic_load_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
195002a92b9STyler Retzlaff 					rte_memory_order_acquire);
19699a2dd95SBruce Richardson 		id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
19799a2dd95SBruce Richardson 		while (bmap) {
198de0ec3c2STyler Retzlaff 			t = rte_ctz64(bmap);
19999a2dd95SBruce Richardson 			fprintf(f, "%u ", id + t);
20099a2dd95SBruce Richardson 
201*ffe827f3SAndre Muezerie 			bmap &= ~RTE_BIT64(t);
20299a2dd95SBruce Richardson 		}
20399a2dd95SBruce Richardson 	}
20499a2dd95SBruce Richardson 
20599a2dd95SBruce Richardson 	fprintf(f, "\n");
20699a2dd95SBruce Richardson 
20799a2dd95SBruce Richardson 	fprintf(f, "  Token = %" PRIu64 "\n",
208002a92b9STyler Retzlaff 			rte_atomic_load_explicit(&v->token, rte_memory_order_acquire));
20999a2dd95SBruce Richardson 
21099a2dd95SBruce Richardson 	fprintf(f, "  Least Acknowledged Token = %" PRIu64 "\n",
211002a92b9STyler Retzlaff 			rte_atomic_load_explicit(&v->acked_token, rte_memory_order_acquire));
21299a2dd95SBruce Richardson 
21399a2dd95SBruce Richardson 	fprintf(f, "Quiescent State Counts for readers:\n");
21499a2dd95SBruce Richardson 	for (i = 0; i < v->num_elems; i++) {
215002a92b9STyler Retzlaff 		bmap = rte_atomic_load_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
216002a92b9STyler Retzlaff 					rte_memory_order_acquire);
21799a2dd95SBruce Richardson 		id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
21899a2dd95SBruce Richardson 		while (bmap) {
219de0ec3c2STyler Retzlaff 			t = rte_ctz64(bmap);
22099a2dd95SBruce Richardson 			fprintf(f, "thread ID = %u, count = %" PRIu64 ", lock count = %u\n",
22199a2dd95SBruce Richardson 				id + t,
222002a92b9STyler Retzlaff 				rte_atomic_load_explicit(
22399a2dd95SBruce Richardson 					&v->qsbr_cnt[id + t].cnt,
224002a92b9STyler Retzlaff 					rte_memory_order_relaxed),
225002a92b9STyler Retzlaff 				rte_atomic_load_explicit(
22699a2dd95SBruce Richardson 					&v->qsbr_cnt[id + t].lock_cnt,
227002a92b9STyler Retzlaff 					rte_memory_order_relaxed));
228*ffe827f3SAndre Muezerie 			bmap &= ~RTE_BIT64(t);
22999a2dd95SBruce Richardson 		}
23099a2dd95SBruce Richardson 	}
23199a2dd95SBruce Richardson 
23299a2dd95SBruce Richardson 	return 0;
23399a2dd95SBruce Richardson }
23499a2dd95SBruce Richardson 
23599a2dd95SBruce Richardson /* Create a queue used to store the data structure elements that can
23699a2dd95SBruce Richardson  * be freed later. This queue is referred to as 'defer queue'.
23799a2dd95SBruce Richardson  */
23899a2dd95SBruce Richardson struct rte_rcu_qsbr_dq *
23999a2dd95SBruce Richardson rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
24099a2dd95SBruce Richardson {
24199a2dd95SBruce Richardson 	struct rte_rcu_qsbr_dq *dq;
24299a2dd95SBruce Richardson 	uint32_t qs_fifo_size;
24399a2dd95SBruce Richardson 	unsigned int flags;
24499a2dd95SBruce Richardson 
24599a2dd95SBruce Richardson 	if (params == NULL || params->free_fn == NULL ||
24699a2dd95SBruce Richardson 		params->v == NULL || params->name == NULL ||
24799a2dd95SBruce Richardson 		params->size == 0 || params->esize == 0 ||
24899a2dd95SBruce Richardson 		(params->esize % 4 != 0)) {
249ae67895bSDavid Marchand 		RCU_LOG(ERR, "Invalid input parameter");
25099a2dd95SBruce Richardson 		rte_errno = EINVAL;
25199a2dd95SBruce Richardson 
25299a2dd95SBruce Richardson 		return NULL;
25399a2dd95SBruce Richardson 	}
25499a2dd95SBruce Richardson 	/* If auto reclamation is configured, reclaim limit
25599a2dd95SBruce Richardson 	 * should be a valid value.
25699a2dd95SBruce Richardson 	 */
25799a2dd95SBruce Richardson 	if ((params->trigger_reclaim_limit <= params->size) &&
25899a2dd95SBruce Richardson 	    (params->max_reclaim_size == 0)) {
259ae67895bSDavid Marchand 		RCU_LOG(ERR,
260ae67895bSDavid Marchand 			"Invalid input parameter, size = %u, trigger_reclaim_limit = %u, "
261ae67895bSDavid Marchand 			"max_reclaim_size = %u",
262ae67895bSDavid Marchand 			params->size, params->trigger_reclaim_limit,
26399a2dd95SBruce Richardson 			params->max_reclaim_size);
26499a2dd95SBruce Richardson 		rte_errno = EINVAL;
26599a2dd95SBruce Richardson 
26699a2dd95SBruce Richardson 		return NULL;
26799a2dd95SBruce Richardson 	}
26899a2dd95SBruce Richardson 
26999a2dd95SBruce Richardson 	dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
27099a2dd95SBruce Richardson 			 RTE_CACHE_LINE_SIZE);
27199a2dd95SBruce Richardson 	if (dq == NULL) {
27299a2dd95SBruce Richardson 		rte_errno = ENOMEM;
27399a2dd95SBruce Richardson 
27499a2dd95SBruce Richardson 		return NULL;
27599a2dd95SBruce Richardson 	}
27699a2dd95SBruce Richardson 
27799a2dd95SBruce Richardson 	/* Decide the flags for the ring.
27899a2dd95SBruce Richardson 	 * If MT safety is requested, use RTS for ring enqueue as most
27999a2dd95SBruce Richardson 	 * use cases involve dq-enqueue happening on the control plane.
28099a2dd95SBruce Richardson 	 * Ring dequeue is always HTS due to the possibility of revert.
28199a2dd95SBruce Richardson 	 */
28299a2dd95SBruce Richardson 	flags = RING_F_MP_RTS_ENQ;
28399a2dd95SBruce Richardson 	if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
28499a2dd95SBruce Richardson 		flags = RING_F_SP_ENQ;
28599a2dd95SBruce Richardson 	flags |= RING_F_MC_HTS_DEQ;
28699a2dd95SBruce Richardson 	/* round up qs_fifo_size to next power of two that is not less than
28799a2dd95SBruce Richardson 	 * max_size.
28899a2dd95SBruce Richardson 	 */
28999a2dd95SBruce Richardson 	qs_fifo_size = rte_align32pow2(params->size + 1);
29099a2dd95SBruce Richardson 	/* Add token size to ring element size */
29199a2dd95SBruce Richardson 	dq->r = rte_ring_create_elem(params->name,
29299a2dd95SBruce Richardson 			__RTE_QSBR_TOKEN_SIZE + params->esize,
29399a2dd95SBruce Richardson 			qs_fifo_size, SOCKET_ID_ANY, flags);
29499a2dd95SBruce Richardson 	if (dq->r == NULL) {
295ae67895bSDavid Marchand 		RCU_LOG(ERR, "defer queue create failed");
29699a2dd95SBruce Richardson 		rte_free(dq);
29799a2dd95SBruce Richardson 		return NULL;
29899a2dd95SBruce Richardson 	}
29999a2dd95SBruce Richardson 
30099a2dd95SBruce Richardson 	dq->v = params->v;
30199a2dd95SBruce Richardson 	dq->size = params->size;
30299a2dd95SBruce Richardson 	dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
30399a2dd95SBruce Richardson 	dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
30499a2dd95SBruce Richardson 	dq->max_reclaim_size = params->max_reclaim_size;
30599a2dd95SBruce Richardson 	dq->free_fn = params->free_fn;
30699a2dd95SBruce Richardson 	dq->p = params->p;
30799a2dd95SBruce Richardson 
30899a2dd95SBruce Richardson 	return dq;
30999a2dd95SBruce Richardson }
31099a2dd95SBruce Richardson 
31199a2dd95SBruce Richardson /* Enqueue one resource to the defer queue to free after the grace
31299a2dd95SBruce Richardson  * period is over.
31399a2dd95SBruce Richardson  */
31499a2dd95SBruce Richardson int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
31599a2dd95SBruce Richardson {
31699a2dd95SBruce Richardson 	__rte_rcu_qsbr_dq_elem_t *dq_elem;
31799a2dd95SBruce Richardson 	uint32_t cur_size;
31899a2dd95SBruce Richardson 
31999a2dd95SBruce Richardson 	if (dq == NULL || e == NULL) {
320ae67895bSDavid Marchand 		RCU_LOG(ERR, "Invalid input parameter");
32199a2dd95SBruce Richardson 		rte_errno = EINVAL;
32299a2dd95SBruce Richardson 
32399a2dd95SBruce Richardson 		return 1;
32499a2dd95SBruce Richardson 	}
32599a2dd95SBruce Richardson 
32699a2dd95SBruce Richardson 	char data[dq->esize];
32799a2dd95SBruce Richardson 	dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
32899a2dd95SBruce Richardson 	/* Start the grace period */
32999a2dd95SBruce Richardson 	dq_elem->token = rte_rcu_qsbr_start(dq->v);
33099a2dd95SBruce Richardson 
33199a2dd95SBruce Richardson 	/* Reclaim resources if the queue size has hit the reclaim
33299a2dd95SBruce Richardson 	 * limit. This helps the queue from growing too large and
33399a2dd95SBruce Richardson 	 * allows time for reader threads to report their quiescent state.
33499a2dd95SBruce Richardson 	 */
33599a2dd95SBruce Richardson 	cur_size = rte_ring_count(dq->r);
33699a2dd95SBruce Richardson 	if (cur_size > dq->trigger_reclaim_limit) {
337ae67895bSDavid Marchand 		RCU_LOG(INFO, "Triggering reclamation");
33899a2dd95SBruce Richardson 		rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size,
33999a2dd95SBruce Richardson 						NULL, NULL, NULL);
34099a2dd95SBruce Richardson 	}
34199a2dd95SBruce Richardson 
34299a2dd95SBruce Richardson 	/* Enqueue the token and resource. Generating the token and
34399a2dd95SBruce Richardson 	 * enqueuing (token + resource) on the queue is not an
34499a2dd95SBruce Richardson 	 * atomic operation. When the defer queue is shared by multiple
34599a2dd95SBruce Richardson 	 * writers, this might result in tokens enqueued out of order
34699a2dd95SBruce Richardson 	 * on the queue. So, some tokens might wait longer than they
34799a2dd95SBruce Richardson 	 * are required to be reclaimed.
34899a2dd95SBruce Richardson 	 */
34999a2dd95SBruce Richardson 	memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE);
35099a2dd95SBruce Richardson 	/* Check the status as enqueue might fail since the other threads
35199a2dd95SBruce Richardson 	 * might have used up the freed space.
35299a2dd95SBruce Richardson 	 * Enqueue uses the configured flags when the DQ was created.
35399a2dd95SBruce Richardson 	 */
35499a2dd95SBruce Richardson 	if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
355ae67895bSDavid Marchand 		RCU_LOG(ERR, "Enqueue failed");
35699a2dd95SBruce Richardson 		/* Note that the token generated above is not used.
35799a2dd95SBruce Richardson 		 * Other than wasting tokens, it should not cause any
35899a2dd95SBruce Richardson 		 * other issues.
35999a2dd95SBruce Richardson 		 */
360ae67895bSDavid Marchand 		RCU_LOG(INFO, "Skipped enqueuing token = %" PRIu64, dq_elem->token);
36199a2dd95SBruce Richardson 
36299a2dd95SBruce Richardson 		rte_errno = ENOSPC;
36399a2dd95SBruce Richardson 		return 1;
36499a2dd95SBruce Richardson 	}
36599a2dd95SBruce Richardson 
366ae67895bSDavid Marchand 	RCU_LOG(INFO, "Enqueued token = %" PRIu64, dq_elem->token);
36799a2dd95SBruce Richardson 
36899a2dd95SBruce Richardson 	return 0;
36999a2dd95SBruce Richardson }
37099a2dd95SBruce Richardson 
37199a2dd95SBruce Richardson /* Reclaim resources from the defer queue. */
37299a2dd95SBruce Richardson int
37399a2dd95SBruce Richardson rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
37499a2dd95SBruce Richardson 			unsigned int *freed, unsigned int *pending,
37599a2dd95SBruce Richardson 			unsigned int *available)
37699a2dd95SBruce Richardson {
37799a2dd95SBruce Richardson 	uint32_t cnt;
37899a2dd95SBruce Richardson 	__rte_rcu_qsbr_dq_elem_t *dq_elem;
37999a2dd95SBruce Richardson 
38099a2dd95SBruce Richardson 	if (dq == NULL || n == 0) {
381ae67895bSDavid Marchand 		RCU_LOG(ERR, "Invalid input parameter");
38299a2dd95SBruce Richardson 		rte_errno = EINVAL;
38399a2dd95SBruce Richardson 
38499a2dd95SBruce Richardson 		return 1;
38599a2dd95SBruce Richardson 	}
38699a2dd95SBruce Richardson 
38799a2dd95SBruce Richardson 	cnt = 0;
38899a2dd95SBruce Richardson 
38999a2dd95SBruce Richardson 	char data[dq->esize];
39099a2dd95SBruce Richardson 	/* Check reader threads quiescent state and reclaim resources */
39199a2dd95SBruce Richardson 	while (cnt < n &&
39299a2dd95SBruce Richardson 		rte_ring_dequeue_bulk_elem_start(dq->r, &data,
39399a2dd95SBruce Richardson 					dq->esize, 1, available) != 0) {
39499a2dd95SBruce Richardson 		dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
39599a2dd95SBruce Richardson 
39699a2dd95SBruce Richardson 		/* Reclaim the resource */
39799a2dd95SBruce Richardson 		if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) {
39899a2dd95SBruce Richardson 			rte_ring_dequeue_elem_finish(dq->r, 0);
39999a2dd95SBruce Richardson 			break;
40099a2dd95SBruce Richardson 		}
40199a2dd95SBruce Richardson 		rte_ring_dequeue_elem_finish(dq->r, 1);
40299a2dd95SBruce Richardson 
403ae67895bSDavid Marchand 		RCU_LOG(INFO, "Reclaimed token = %" PRIu64, dq_elem->token);
40499a2dd95SBruce Richardson 
40599a2dd95SBruce Richardson 		dq->free_fn(dq->p, dq_elem->elem, 1);
40699a2dd95SBruce Richardson 
40799a2dd95SBruce Richardson 		cnt++;
40899a2dd95SBruce Richardson 	}
40999a2dd95SBruce Richardson 
410ae67895bSDavid Marchand 	RCU_LOG(INFO, "Reclaimed %u resources", cnt);
41199a2dd95SBruce Richardson 
41299a2dd95SBruce Richardson 	if (freed != NULL)
41399a2dd95SBruce Richardson 		*freed = cnt;
41499a2dd95SBruce Richardson 	if (pending != NULL)
41599a2dd95SBruce Richardson 		*pending = rte_ring_count(dq->r);
41699a2dd95SBruce Richardson 
41799a2dd95SBruce Richardson 	return 0;
41899a2dd95SBruce Richardson }
41999a2dd95SBruce Richardson 
42099a2dd95SBruce Richardson /* Delete a defer queue. */
42199a2dd95SBruce Richardson int
42299a2dd95SBruce Richardson rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
42399a2dd95SBruce Richardson {
42499a2dd95SBruce Richardson 	unsigned int pending;
42599a2dd95SBruce Richardson 
42699a2dd95SBruce Richardson 	if (dq == NULL) {
427ae67895bSDavid Marchand 		RCU_LOG(DEBUG, "Invalid input parameter");
42899a2dd95SBruce Richardson 
42999a2dd95SBruce Richardson 		return 0;
43099a2dd95SBruce Richardson 	}
43199a2dd95SBruce Richardson 
43299a2dd95SBruce Richardson 	/* Reclaim all the resources */
43399a2dd95SBruce Richardson 	rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending, NULL);
43499a2dd95SBruce Richardson 	if (pending != 0) {
43599a2dd95SBruce Richardson 		rte_errno = EAGAIN;
43699a2dd95SBruce Richardson 
43799a2dd95SBruce Richardson 		return 1;
43899a2dd95SBruce Richardson 	}
43999a2dd95SBruce Richardson 
44099a2dd95SBruce Richardson 	rte_ring_free(dq->r);
44199a2dd95SBruce Richardson 	rte_free(dq);
44299a2dd95SBruce Richardson 
44399a2dd95SBruce Richardson 	return 0;
44499a2dd95SBruce Richardson }
44599a2dd95SBruce Richardson 
446eeded204SDavid Marchand RTE_LOG_REGISTER_DEFAULT(rte_rcu_log_type, ERR);
447