xref: /dpdk/lib/rcu/rte_rcu_qsbr.h (revision 719834a6849e1daf4a70ff7742bbcc3ae7e25607)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018-2020 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
8 /**
9  * @file
10  *
11  * RTE Quiescent State Based Reclamation (QSBR).
12  *
13  * Quiescent State (QS) is any point in the thread execution
14  * where the thread does not hold a reference to a data structure
15  * in shared memory. While using lock-less data structures, the writer
16  * can safely free memory once all the reader threads have entered
17  * quiescent state.
18  *
19  * This library provides the ability for the readers to report quiescent
20  * state and for the writers to identify when all the readers have
21  * entered quiescent state.
22  */
23 
24 #include <inttypes.h>
25 #include <stdalign.h>
26 #include <stdbool.h>
27 #include <stdio.h>
28 #include <stdint.h>
29 
30 #include <rte_common.h>
31 #include <rte_debug.h>
32 #include <rte_atomic.h>
33 #include <rte_ring.h>
34 
35 #ifdef __cplusplus
36 extern "C" {
37 #endif
38 
39 extern int rte_rcu_log_type;
40 #define RTE_LOGTYPE_RCU rte_rcu_log_type
41 
42 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
43 #define __RTE_RCU_DP_LOG(level, ...) \
44 	RTE_LOG_DP_LINE_PREFIX(level, RCU, "%s(): ", __func__, __VA_ARGS__)
45 #else
46 #define __RTE_RCU_DP_LOG(level, ...)
47 #endif
48 
49 #if defined(RTE_LIBRTE_RCU_DEBUG)
50 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, ...) do { \
51 	if (v->qsbr_cnt[thread_id].lock_cnt) \
52 		RTE_LOG_LINE_PREFIX(level, RCU, "%s(): ", __func__, __VA_ARGS__); \
53 } while (0)
54 #else
55 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, ...)
56 #endif
57 
58 /* Registered thread IDs are stored as a bitmap of 64b element array.
59  * Given thread id needs to be converted to index into the array and
60  * the id within the array element.
61  */
62 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(RTE_ATOMIC(uint64_t)) * 8)
63 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
64 	RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
65 		__RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
66 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t __rte_atomic *) \
67 	((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
68 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
69 #define __RTE_QSBR_THRID_MASK 0x3f
70 #define RTE_QSBR_THRID_INVALID 0xffffffff
71 
72 /* Worker thread counter */
73 struct __rte_cache_aligned rte_rcu_qsbr_cnt {
74 	RTE_ATOMIC(uint64_t) cnt;
75 	/**< Quiescent state counter. Value 0 indicates the thread is offline
76 	 *   64b counter is used to avoid adding more code to address
77 	 *   counter overflow. Changing this to 32b would require additional
78 	 *   changes to various APIs.
79 	 */
80 	RTE_ATOMIC(uint32_t) lock_cnt;
81 	/**< Lock counter. Used when RTE_LIBRTE_RCU_DEBUG is enabled */
82 };
83 
84 #define __RTE_QSBR_CNT_THR_OFFLINE 0
85 #define __RTE_QSBR_CNT_INIT 1
86 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
87 #define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
88 
89 /* RTE Quiescent State variable structure.
90  * This structure has two elements that vary in size based on the
91  * 'max_threads' parameter.
92  * 1) Quiescent state counter array
93  * 2) Register thread ID array
94  */
95 struct __rte_cache_aligned rte_rcu_qsbr {
96 	alignas(RTE_CACHE_LINE_SIZE) RTE_ATOMIC(uint64_t) token;
97 	/**< Counter to allow for multiple concurrent quiescent state queries */
98 	RTE_ATOMIC(uint64_t) acked_token;
99 	/**< Least token acked by all the threads in the last call to
100 	 *   rte_rcu_qsbr_check API.
101 	 */
102 
103 	alignas(RTE_CACHE_LINE_SIZE) uint32_t num_elems;
104 	/**< Number of elements in the thread ID array */
105 	RTE_ATOMIC(uint32_t) num_threads;
106 	/**< Number of threads currently using this QS variable */
107 	uint32_t max_threads;
108 	/**< Maximum number of threads using this QS variable */
109 
110 	alignas(RTE_CACHE_LINE_SIZE) struct rte_rcu_qsbr_cnt qsbr_cnt[];
111 	/**< Quiescent state counter array of 'max_threads' elements */
112 
113 	/**< Registered thread IDs are stored in a bitmap array,
114 	 *   after the quiescent state counter array.
115 	 */
116 };
117 
118 /**
119  * Call back function called to free the resources.
120  *
121  * @param p
122  *   Pointer provided while creating the defer queue
123  * @param e
124  *   Pointer to the resource data stored on the defer queue
125  * @param n
126  *   Number of resources to free. Currently, this is set to 1.
127  *
128  * @return
129  *   None
130  */
131 typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
132 
133 #define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
134 
135 /**
136  * Various flags supported.
137  */
138 /**< Enqueue and reclaim operations are multi-thread safe by default.
139  *   The call back functions registered to free the resources are
140  *   assumed to be multi-thread safe.
141  *   Set this flag if multi-thread safety is not required.
142  */
143 #define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
144 
145 /**
146  * Parameters used when creating the defer queue.
147  */
148 struct rte_rcu_qsbr_dq_parameters {
149 	const char *name;
150 	/**< Name of the queue. */
151 	uint32_t flags;
152 	/**< Flags to control API behaviors */
153 	uint32_t size;
154 	/**< Number of entries in queue. Typically, this will be
155 	 *   the same as the maximum number of entries supported in the
156 	 *   lock free data structure.
157 	 *   Data structures with unbounded number of entries is not
158 	 *   supported currently.
159 	 */
160 	uint32_t esize;
161 	/**< Size (in bytes) of each element in the defer queue.
162 	 *   This has to be multiple of 4B.
163 	 */
164 	uint32_t trigger_reclaim_limit;
165 	/**< Trigger automatic reclamation after the defer queue
166 	 *   has at least these many resources waiting. This auto
167 	 *   reclamation is triggered in rte_rcu_qsbr_dq_enqueue API
168 	 *   call.
169 	 *   If this is greater than 'size', auto reclamation is
170 	 *   not triggered.
171 	 *   If this is set to 0, auto reclamation is triggered
172 	 *   in every call to rte_rcu_qsbr_dq_enqueue API.
173 	 */
174 	uint32_t max_reclaim_size;
175 	/**< When automatic reclamation is enabled, reclaim at the max
176 	 *   these many resources. This should contain a valid value, if
177 	 *   auto reclamation is on. Setting this to 'size' or greater will
178 	 *   reclaim all possible resources currently on the defer queue.
179 	 */
180 	rte_rcu_qsbr_free_resource_t free_fn;
181 	/**< Function to call to free the resource. */
182 	void *p;
183 	/**< Pointer passed to the free function. Typically, this is the
184 	 *   pointer to the data structure to which the resource to free
185 	 *   belongs. This can be NULL.
186 	 */
187 	struct rte_rcu_qsbr *v;
188 	/**< RCU QSBR variable to use for this defer queue */
189 };
190 
191 /* RTE defer queue structure.
192  * This structure holds the defer queue. The defer queue is used to
193  * hold the deleted entries from the data structure that are not
194  * yet freed.
195  */
196 struct rte_rcu_qsbr_dq;
197 
198 /**
199  * Return the size of the memory occupied by a Quiescent State variable.
200  *
201  * @param max_threads
202  *   Maximum number of threads reporting quiescent state on this variable.
203  * @return
204  *   On success - size of memory in bytes required for this QS variable.
205  *   On error - 1 with error code set in rte_errno.
206  *   Possible rte_errno codes are:
207  *   - EINVAL - max_threads is 0
208  */
209 size_t
210 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
211 
212 /**
213  * Initialize a Quiescent State (QS) variable.
214  *
215  * @param v
216  *   QS variable
217  * @param max_threads
218  *   Maximum number of threads reporting quiescent state on this variable.
219  *   This should be the same value as passed to rte_rcu_qsbr_get_memsize.
220  * @return
221  *   On success - 0
222  *   On error - 1 with error code set in rte_errno.
223  *   Possible rte_errno codes are:
224  *   - EINVAL - max_threads is 0 or 'v' is NULL.
225  */
226 int
227 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
228 
229 /**
230  * Register a reader thread to report its quiescent state
231  * on a QS variable.
232  *
233  * This is implemented as a lock-free function. It is multi-thread
234  * safe.
235  * Any reader thread that wants to report its quiescent state must
236  * call this API. This can be called during initialization or as part
237  * of the packet processing loop.
238  *
239  * Note that rte_rcu_qsbr_thread_online must be called before the
240  * thread updates its quiescent state using rte_rcu_qsbr_quiescent.
241  *
242  * @param v
243  *   QS variable
244  * @param thread_id
245  *   Reader thread with this thread ID will report its quiescent state on
246  *   the QS variable. thread_id is a value between 0 and (max_threads - 1).
247  *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.
248  */
249 int
250 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
251 
252 /**
253  * Remove a reader thread, from the list of threads reporting their
254  * quiescent state on a QS variable.
255  *
256  * This is implemented as a lock-free function. It is multi-thread safe.
257  * This API can be called from the reader threads during shutdown.
258  * Ongoing quiescent state queries will stop waiting for the status from this
259  * unregistered reader thread.
260  *
261  * @param v
262  *   QS variable
263  * @param thread_id
264  *   Reader thread with this thread ID will stop reporting its quiescent
265  *   state on the QS variable.
266  */
267 int
268 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
269 
270 /**
271  * Add a registered reader thread, to the list of threads reporting their
272  * quiescent state on a QS variable.
273  *
274  * This is implemented as a lock-free function. It is multi-thread
275  * safe.
276  *
277  * Any registered reader thread that wants to report its quiescent state must
278  * call this API before calling rte_rcu_qsbr_quiescent. This can be called
279  * during initialization or as part of the packet processing loop.
280  *
281  * The reader thread must call rte_rcu_qsbr_thread_offline API, before
282  * calling any functions that block, to ensure that rte_rcu_qsbr_check
283  * API does not wait indefinitely for the reader thread to update its QS.
284  *
285  * The reader thread must call rte_rcu_thread_online API, after the blocking
286  * function call returns, to ensure that rte_rcu_qsbr_check API
287  * waits for the reader thread to update its quiescent state.
288  *
289  * @param v
290  *   QS variable
291  * @param thread_id
292  *   Reader thread with this thread ID will report its quiescent state on
293  *   the QS variable.
294  */
295 static __rte_always_inline void
296 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
297 {
298 	uint64_t t;
299 
300 	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
301 
302 	__RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
303 				v->qsbr_cnt[thread_id].lock_cnt);
304 
305 	/* Copy the current value of token.
306 	 * The fence at the end of the function will ensure that
307 	 * the following will not move down after the load of any shared
308 	 * data structure.
309 	 */
310 	t = rte_atomic_load_explicit(&v->token, rte_memory_order_relaxed);
311 
312 	/* rte_atomic_store_explicit(cnt, rte_memory_order_relaxed) is used to ensure
313 	 * 'cnt' (64b) is accessed atomically.
314 	 */
315 	rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
316 		t, rte_memory_order_relaxed);
317 
318 	/* The subsequent load of the data structure should not
319 	 * move above the store. Hence a store-load barrier
320 	 * is required.
321 	 * If the load of the data structure moves above the store,
322 	 * writer might not see that the reader is online, even though
323 	 * the reader is referencing the shared data structure.
324 	 */
325 	rte_atomic_thread_fence(rte_memory_order_seq_cst);
326 }
327 
328 /**
329  * Remove a registered reader thread from the list of threads reporting their
330  * quiescent state on a QS variable.
331  *
332  * This is implemented as a lock-free function. It is multi-thread
333  * safe.
334  *
335  * This can be called during initialization or as part of the packet
336  * processing loop.
337  *
338  * The reader thread must call rte_rcu_qsbr_thread_offline API, before
339  * calling any functions that block, to ensure that rte_rcu_qsbr_check
340  * API does not wait indefinitely for the reader thread to update its QS.
341  *
342  * @param v
343  *   QS variable
344  * @param thread_id
345  *   rte_rcu_qsbr_check API will not wait for the reader thread with
346  *   this thread ID to report its quiescent state on the QS variable.
347  */
348 static __rte_always_inline void
349 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
350 {
351 	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
352 
353 	__RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
354 				v->qsbr_cnt[thread_id].lock_cnt);
355 
356 	/* The reader can go offline only after the load of the
357 	 * data structure is completed. i.e. any load of the
358 	 * data structure can not move after this store.
359 	 */
360 
361 	rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
362 		__RTE_QSBR_CNT_THR_OFFLINE, rte_memory_order_release);
363 }
364 
365 /**
366  * Acquire a lock for accessing a shared data structure.
367  *
368  * This is implemented as a lock-free function. It is multi-thread
369  * safe.
370  *
371  * This API is provided to aid debugging. This should be called before
372  * accessing a shared data structure.
373  *
374  * When RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented.
375  * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the
376  * rte_rcu_qsbr_check API will verify that this counter is 0.
377  *
378  * When RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
379  *
380  * @param v
381  *   QS variable
382  * @param thread_id
383  *   Reader thread id
384  */
385 static __rte_always_inline void
386 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
387 			__rte_unused unsigned int thread_id)
388 {
389 	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
390 
391 #if defined(RTE_LIBRTE_RCU_DEBUG)
392 	/* Increment the lock counter */
393 	rte_atomic_fetch_add_explicit(&v->qsbr_cnt[thread_id].lock_cnt,
394 				1, rte_memory_order_acquire);
395 #endif
396 }
397 
398 /**
399  * Release a lock after accessing a shared data structure.
400  *
401  * This is implemented as a lock-free function. It is multi-thread
402  * safe.
403  *
404  * This API is provided to aid debugging. This should be called after
405  * accessing a shared data structure.
406  *
407  * When RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will
408  * decrement a lock counter. rte_rcu_qsbr_check API will verify that this
409  * counter is 0.
410  *
411  * When RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
412  *
413  * @param v
414  *   QS variable
415  * @param thread_id
416  *   Reader thread id
417  */
418 static __rte_always_inline void
419 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
420 			__rte_unused unsigned int thread_id)
421 {
422 	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
423 
424 #if defined(RTE_LIBRTE_RCU_DEBUG)
425 	/* Decrement the lock counter */
426 	rte_atomic_fetch_sub_explicit(&v->qsbr_cnt[thread_id].lock_cnt,
427 				1, rte_memory_order_release);
428 
429 	__RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
430 				"Lock counter %u. Nested locks?",
431 				v->qsbr_cnt[thread_id].lock_cnt);
432 #endif
433 }
434 
435 /**
436  * Ask the reader threads to report the quiescent state
437  * status.
438  *
439  * This is implemented as a lock-free function. It is multi-thread
440  * safe and can be called from worker threads.
441  *
442  * @param v
443  *   QS variable
444  * @return
445  *   - This is the token for this call of the API. This should be
446  *     passed to rte_rcu_qsbr_check API.
447  */
448 static __rte_always_inline uint64_t
449 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
450 {
451 	uint64_t t;
452 
453 	RTE_ASSERT(v != NULL);
454 
455 	/* Release the changes to the shared data structure.
456 	 * This store release will ensure that changes to any data
457 	 * structure are visible to the workers before the token
458 	 * update is visible.
459 	 */
460 	t = rte_atomic_fetch_add_explicit(&v->token, 1, rte_memory_order_release) + 1;
461 
462 	return t;
463 }
464 
465 /**
466  * Update quiescent state for a reader thread.
467  *
468  * This is implemented as a lock-free function. It is multi-thread safe.
469  * All the reader threads registered to report their quiescent state
470  * on the QS variable must call this API.
471  *
472  * @param v
473  *   QS variable
474  * @param thread_id
475  *   Update the quiescent state for the reader with this thread ID.
476  */
477 static __rte_always_inline void
478 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
479 {
480 	uint64_t t;
481 
482 	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
483 
484 	__RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
485 				v->qsbr_cnt[thread_id].lock_cnt);
486 
487 	/* Acquire the changes to the shared data structure released
488 	 * by rte_rcu_qsbr_start.
489 	 * Later loads of the shared data structure should not move
490 	 * above this load. Hence, use load-acquire.
491 	 */
492 	t = rte_atomic_load_explicit(&v->token, rte_memory_order_acquire);
493 
494 	/* Check if there are updates available from the writer.
495 	 * Inform the writer that updates are visible to this reader.
496 	 * Prior loads of the shared data structure should not move
497 	 * beyond this store. Hence use store-release.
498 	 */
499 	if (t != rte_atomic_load_explicit(&v->qsbr_cnt[thread_id].cnt, rte_memory_order_relaxed))
500 		rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
501 					 t, rte_memory_order_release);
502 
503 	__RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
504 		__func__, t, thread_id);
505 }
506 
507 /* Check the quiescent state counter for registered threads only, assuming
508  * that not all threads have registered.
509  */
510 static __rte_always_inline int
511 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
512 {
513 	uint32_t i, j, id;
514 	uint64_t bmap;
515 	uint64_t c;
516 	RTE_ATOMIC(uint64_t) *reg_thread_id;
517 	uint64_t acked_token = __RTE_QSBR_CNT_MAX;
518 
519 	for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
520 		i < v->num_elems;
521 		i++, reg_thread_id++) {
522 		/* Load the current registered thread bit map before
523 		 * loading the reader thread quiescent state counters.
524 		 */
525 		bmap = rte_atomic_load_explicit(reg_thread_id, rte_memory_order_acquire);
526 		id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
527 
528 		while (bmap) {
529 			j = rte_ctz64(bmap);
530 			__RTE_RCU_DP_LOG(DEBUG,
531 				"%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
532 				__func__, t, wait, bmap, id + j);
533 			c = rte_atomic_load_explicit(
534 					&v->qsbr_cnt[id + j].cnt,
535 					rte_memory_order_acquire);
536 			__RTE_RCU_DP_LOG(DEBUG,
537 				"%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
538 				__func__, t, wait, c, id+j);
539 
540 			/* Counter is not checked for wrap-around condition
541 			 * as it is a 64b counter.
542 			 */
543 			if (unlikely(c !=
544 				__RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
545 				/* This thread is not in quiescent state */
546 				if (!wait)
547 					return 0;
548 
549 				rte_pause();
550 				/* This thread might have unregistered.
551 				 * Re-read the bitmap.
552 				 */
553 				bmap = rte_atomic_load_explicit(reg_thread_id,
554 						rte_memory_order_acquire);
555 
556 				continue;
557 			}
558 
559 			/* This thread is in quiescent state. Use the counter
560 			 * to find the least acknowledged token among all the
561 			 * readers.
562 			 */
563 			if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
564 				acked_token = c;
565 
566 			bmap &= ~(1UL << j);
567 		}
568 	}
569 
570 	/* All readers are checked, update least acknowledged token.
571 	 * There might be multiple writers trying to update this. There is
572 	 * no need to update this very accurately using compare-and-swap.
573 	 */
574 	if (acked_token != __RTE_QSBR_CNT_MAX)
575 		rte_atomic_store_explicit(&v->acked_token, acked_token,
576 			rte_memory_order_relaxed);
577 
578 	return 1;
579 }
580 
581 /* Check the quiescent state counter for all threads, assuming that
582  * all the threads have registered.
583  */
584 static __rte_always_inline int
585 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
586 {
587 	uint32_t i;
588 	struct rte_rcu_qsbr_cnt *cnt;
589 	uint64_t c;
590 	uint64_t acked_token = __RTE_QSBR_CNT_MAX;
591 
592 	for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
593 		__RTE_RCU_DP_LOG(DEBUG,
594 			"%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
595 			__func__, t, wait, i);
596 		while (1) {
597 			c = rte_atomic_load_explicit(&cnt->cnt, rte_memory_order_acquire);
598 			__RTE_RCU_DP_LOG(DEBUG,
599 				"%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
600 				__func__, t, wait, c, i);
601 
602 			/* Counter is not checked for wrap-around condition
603 			 * as it is a 64b counter.
604 			 */
605 			if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
606 				break;
607 
608 			/* This thread is not in quiescent state */
609 			if (!wait)
610 				return 0;
611 
612 			rte_pause();
613 		}
614 
615 		/* This thread is in quiescent state. Use the counter to find
616 		 * the least acknowledged token among all the readers.
617 		 */
618 		if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
619 			acked_token = c;
620 	}
621 
622 	/* All readers are checked, update least acknowledged token.
623 	 * There might be multiple writers trying to update this. There is
624 	 * no need to update this very accurately using compare-and-swap.
625 	 */
626 	if (acked_token != __RTE_QSBR_CNT_MAX)
627 		rte_atomic_store_explicit(&v->acked_token, acked_token,
628 			rte_memory_order_relaxed);
629 
630 	return 1;
631 }
632 
633 /**
634  * Checks if all the reader threads have entered the quiescent state
635  * referenced by token.
636  *
637  * This is implemented as a lock-free function. It is multi-thread
638  * safe and can be called from the worker threads as well.
639  *
640  * If this API is called with 'wait' set to true, the following
641  * factors must be considered:
642  *
643  * 1) If the calling thread is also reporting the status on the
644  * same QS variable, it must update the quiescent state status, before
645  * calling this API.
646  *
647  * 2) In addition, while calling from multiple threads, only
648  * one of those threads can be reporting the quiescent state status
649  * on a given QS variable.
650  *
651  * @param v
652  *   QS variable
653  * @param t
654  *   Token returned by rte_rcu_qsbr_start API
655  * @param wait
656  *   If true, block till all the reader threads have completed entering
657  *   the quiescent state referenced by token 't'.
658  * @return
659  *   - 0 if all reader threads have NOT passed through specified number
660  *     of quiescent states.
661  *   - 1 if all reader threads have passed through specified number
662  *     of quiescent states.
663  */
664 static __rte_always_inline int
665 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
666 {
667 	uint64_t acked_token;
668 
669 	RTE_ASSERT(v != NULL);
670 
671 	/* Check if all the readers have already acknowledged this token */
672 	acked_token = rte_atomic_load_explicit(&v->acked_token,
673 						rte_memory_order_relaxed);
674 	if (likely(t <= acked_token)) {
675 		__RTE_RCU_DP_LOG(DEBUG,
676 			"%s: check: token = %" PRIu64 ", wait = %d",
677 			__func__, t, wait);
678 		__RTE_RCU_DP_LOG(DEBUG,
679 			"%s: status: least acked token = %" PRIu64,
680 			__func__, acked_token);
681 		return 1;
682 	}
683 
684 	if (likely(v->num_threads == v->max_threads))
685 		return __rte_rcu_qsbr_check_all(v, t, wait);
686 	else
687 		return __rte_rcu_qsbr_check_selective(v, t, wait);
688 }
689 
690 /**
691  * Wait till the reader threads have entered quiescent state.
692  *
693  * This is implemented as a lock-free function. It is multi-thread safe.
694  * This API can be thought of as a wrapper around rte_rcu_qsbr_start and
695  * rte_rcu_qsbr_check APIs.
696  *
697  * If this API is called from multiple threads, only one of
698  * those threads can be reporting the quiescent state status on a
699  * given QS variable.
700  *
701  * @param v
702  *   QS variable
703  * @param thread_id
704  *   Thread ID of the caller if it is registered to report quiescent state
705  *   on this QS variable (i.e. the calling thread is also part of the
706  *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.
707  */
708 void
709 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
710 
711 /**
712  * Dump the details of a single QS variables to a file.
713  *
714  * It is NOT multi-thread safe.
715  *
716  * @param f
717  *   A pointer to a file for output
718  * @param v
719  *   QS variable
720  * @return
721  *   On success - 0
722  *   On error - 1 with error code set in rte_errno.
723  *   Possible rte_errno codes are:
724  *   - EINVAL - NULL parameters are passed
725  */
726 int
727 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
728 
729 /**
730  * Create a queue used to store the data structure elements that can
731  * be freed later. This queue is referred to as 'defer queue'.
732  *
733  * @param params
734  *   Parameters to create a defer queue.
735  * @return
736  *   On success - Valid pointer to defer queue
737  *   On error - NULL
738  *   Possible rte_errno codes are:
739  *   - EINVAL - NULL parameters are passed
740  *   - ENOMEM - Not enough memory
741  */
742 struct rte_rcu_qsbr_dq *
743 rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
744 
745 /**
746  * Enqueue one resource to the defer queue and start the grace period.
747  * The resource will be freed later after at least one grace period
748  * is over.
749  *
750  * If the defer queue is full, it will attempt to reclaim resources.
751  * It will also reclaim resources at regular intervals to avoid
752  * the defer queue from growing too big.
753  *
754  * Multi-thread safety is provided as the defer queue configuration.
755  * When multi-thread safety is requested, it is possible that the
756  * resources are not stored in their order of deletion. This results
757  * in resources being held in the defer queue longer than they should.
758  *
759  * @param dq
760  *   Defer queue to allocate an entry from.
761  * @param e
762  *   Pointer to resource data to copy to the defer queue. The size of
763  *   the data to copy is equal to the element size provided when the
764  *   defer queue was created.
765  * @return
766  *   On success - 0
767  *   On error - 1 with rte_errno set to
768  *   - EINVAL - NULL parameters are passed
769  *   - ENOSPC - Defer queue is full. This condition can not happen
770  *		if the defer queue size is equal (or larger) than the
771  *		number of elements in the data structure.
772  */
773 int
774 rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
775 
776 /**
777  * Free resources from the defer queue.
778  *
779  * This API is multi-thread safe.
780  *
781  * @param dq
782  *   Defer queue to free an entry from.
783  * @param n
784  *   Maximum number of resources to free.
785  * @param freed
786  *   Number of resources that were freed.
787  * @param pending
788  *   Number of resources pending on the defer queue. This number might not
789  *   be accurate if multi-thread safety is configured.
790  * @param available
791  *   Number of resources that can be added to the defer queue.
792  *   This number might not be accurate if multi-thread safety is configured.
793  * @return
794  *   On successful reclamation of at least 1 resource - 0
795  *   On error - 1 with rte_errno set to
796  *   - EINVAL - NULL parameters are passed
797  */
798 int
799 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
800 	unsigned int *freed, unsigned int *pending, unsigned int *available);
801 
802 /**
803  * Delete a defer queue.
804  *
805  * It tries to reclaim all the resources on the defer queue.
806  * If any of the resources have not completed the grace period
807  * the reclamation stops and returns immediately. The rest of
808  * the resources are not reclaimed and the defer queue is not
809  * freed.
810  *
811  * @param dq
812  *   Defer queue to delete.
813  * @return
814  *   On success - 0
815  *   On error - 1
816  *   Possible rte_errno codes are:
817  *   - EAGAIN - Some of the resources have not completed at least 1 grace
818  *		period, try again.
819  */
820 int
821 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
822 
823 #ifdef __cplusplus
824 }
825 #endif
826 
827 #endif /* _RTE_RCU_QSBR_H_ */
828