1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) 2018-2020 Arm Limited 3 */ 4 5 #ifndef _RTE_RCU_QSBR_H_ 6 #define _RTE_RCU_QSBR_H_ 7 8 /** 9 * @file 10 * 11 * RTE Quiescent State Based Reclamation (QSBR). 12 * 13 * Quiescent State (QS) is any point in the thread execution 14 * where the thread does not hold a reference to a data structure 15 * in shared memory. While using lock-less data structures, the writer 16 * can safely free memory once all the reader threads have entered 17 * quiescent state. 18 * 19 * This library provides the ability for the readers to report quiescent 20 * state and for the writers to identify when all the readers have 21 * entered quiescent state. 22 */ 23 24 #include <inttypes.h> 25 #include <stdalign.h> 26 #include <stdbool.h> 27 #include <stdio.h> 28 #include <stdint.h> 29 30 #include <rte_common.h> 31 #include <rte_debug.h> 32 #include <rte_atomic.h> 33 #include <rte_ring.h> 34 35 #ifdef __cplusplus 36 extern "C" { 37 #endif 38 39 extern int rte_rcu_log_type; 40 #define RTE_LOGTYPE_RCU rte_rcu_log_type 41 42 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG 43 #define __RTE_RCU_DP_LOG(level, ...) \ 44 RTE_LOG_DP_LINE_PREFIX(level, RCU, "%s(): ", __func__, __VA_ARGS__) 45 #else 46 #define __RTE_RCU_DP_LOG(level, ...) 47 #endif 48 49 #if defined(RTE_LIBRTE_RCU_DEBUG) 50 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, ...) do { \ 51 if (v->qsbr_cnt[thread_id].lock_cnt) \ 52 RTE_LOG_LINE_PREFIX(level, RCU, "%s(): ", __func__, __VA_ARGS__); \ 53 } while (0) 54 #else 55 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, ...) 56 #endif 57 58 /* Registered thread IDs are stored as a bitmap of 64b element array. 59 * Given thread id needs to be converted to index into the array and 60 * the id within the array element. 61 */ 62 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(RTE_ATOMIC(uint64_t)) * 8) 63 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \ 64 RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \ 65 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE) 66 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t __rte_atomic *) \ 67 ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i) 68 #define __RTE_QSBR_THRID_INDEX_SHIFT 6 69 #define __RTE_QSBR_THRID_MASK 0x3f 70 #define RTE_QSBR_THRID_INVALID 0xffffffff 71 72 /* Worker thread counter */ 73 struct __rte_cache_aligned rte_rcu_qsbr_cnt { 74 RTE_ATOMIC(uint64_t) cnt; 75 /**< Quiescent state counter. Value 0 indicates the thread is offline 76 * 64b counter is used to avoid adding more code to address 77 * counter overflow. Changing this to 32b would require additional 78 * changes to various APIs. 79 */ 80 RTE_ATOMIC(uint32_t) lock_cnt; 81 /**< Lock counter. Used when RTE_LIBRTE_RCU_DEBUG is enabled */ 82 }; 83 84 #define __RTE_QSBR_CNT_THR_OFFLINE 0 85 #define __RTE_QSBR_CNT_INIT 1 86 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0) 87 #define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t) 88 89 /* RTE Quiescent State variable structure. 90 * This structure has two elements that vary in size based on the 91 * 'max_threads' parameter. 92 * 1) Quiescent state counter array 93 * 2) Register thread ID array 94 */ 95 struct __rte_cache_aligned rte_rcu_qsbr { 96 alignas(RTE_CACHE_LINE_SIZE) RTE_ATOMIC(uint64_t) token; 97 /**< Counter to allow for multiple concurrent quiescent state queries */ 98 RTE_ATOMIC(uint64_t) acked_token; 99 /**< Least token acked by all the threads in the last call to 100 * rte_rcu_qsbr_check API. 101 */ 102 103 alignas(RTE_CACHE_LINE_SIZE) uint32_t num_elems; 104 /**< Number of elements in the thread ID array */ 105 RTE_ATOMIC(uint32_t) num_threads; 106 /**< Number of threads currently using this QS variable */ 107 uint32_t max_threads; 108 /**< Maximum number of threads using this QS variable */ 109 110 alignas(RTE_CACHE_LINE_SIZE) struct rte_rcu_qsbr_cnt qsbr_cnt[]; 111 /**< Quiescent state counter array of 'max_threads' elements */ 112 113 /**< Registered thread IDs are stored in a bitmap array, 114 * after the quiescent state counter array. 115 */ 116 }; 117 118 /** 119 * Call back function called to free the resources. 120 * 121 * @param p 122 * Pointer provided while creating the defer queue 123 * @param e 124 * Pointer to the resource data stored on the defer queue 125 * @param n 126 * Number of resources to free. Currently, this is set to 1. 127 * 128 * @return 129 * None 130 */ 131 typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n); 132 133 #define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE 134 135 /** 136 * Various flags supported. 137 */ 138 /**< Enqueue and reclaim operations are multi-thread safe by default. 139 * The call back functions registered to free the resources are 140 * assumed to be multi-thread safe. 141 * Set this flag if multi-thread safety is not required. 142 */ 143 #define RTE_RCU_QSBR_DQ_MT_UNSAFE 1 144 145 /** 146 * Parameters used when creating the defer queue. 147 */ 148 struct rte_rcu_qsbr_dq_parameters { 149 const char *name; 150 /**< Name of the queue. */ 151 uint32_t flags; 152 /**< Flags to control API behaviors */ 153 uint32_t size; 154 /**< Number of entries in queue. Typically, this will be 155 * the same as the maximum number of entries supported in the 156 * lock free data structure. 157 * Data structures with unbounded number of entries is not 158 * supported currently. 159 */ 160 uint32_t esize; 161 /**< Size (in bytes) of each element in the defer queue. 162 * This has to be multiple of 4B. 163 */ 164 uint32_t trigger_reclaim_limit; 165 /**< Trigger automatic reclamation after the defer queue 166 * has at least these many resources waiting. This auto 167 * reclamation is triggered in rte_rcu_qsbr_dq_enqueue API 168 * call. 169 * If this is greater than 'size', auto reclamation is 170 * not triggered. 171 * If this is set to 0, auto reclamation is triggered 172 * in every call to rte_rcu_qsbr_dq_enqueue API. 173 */ 174 uint32_t max_reclaim_size; 175 /**< When automatic reclamation is enabled, reclaim at the max 176 * these many resources. This should contain a valid value, if 177 * auto reclamation is on. Setting this to 'size' or greater will 178 * reclaim all possible resources currently on the defer queue. 179 */ 180 rte_rcu_qsbr_free_resource_t free_fn; 181 /**< Function to call to free the resource. */ 182 void *p; 183 /**< Pointer passed to the free function. Typically, this is the 184 * pointer to the data structure to which the resource to free 185 * belongs. This can be NULL. 186 */ 187 struct rte_rcu_qsbr *v; 188 /**< RCU QSBR variable to use for this defer queue */ 189 }; 190 191 /* RTE defer queue structure. 192 * This structure holds the defer queue. The defer queue is used to 193 * hold the deleted entries from the data structure that are not 194 * yet freed. 195 */ 196 struct rte_rcu_qsbr_dq; 197 198 /** 199 * Return the size of the memory occupied by a Quiescent State variable. 200 * 201 * @param max_threads 202 * Maximum number of threads reporting quiescent state on this variable. 203 * @return 204 * On success - size of memory in bytes required for this QS variable. 205 * On error - 1 with error code set in rte_errno. 206 * Possible rte_errno codes are: 207 * - EINVAL - max_threads is 0 208 */ 209 size_t 210 rte_rcu_qsbr_get_memsize(uint32_t max_threads); 211 212 /** 213 * Initialize a Quiescent State (QS) variable. 214 * 215 * @param v 216 * QS variable 217 * @param max_threads 218 * Maximum number of threads reporting quiescent state on this variable. 219 * This should be the same value as passed to rte_rcu_qsbr_get_memsize. 220 * @return 221 * On success - 0 222 * On error - 1 with error code set in rte_errno. 223 * Possible rte_errno codes are: 224 * - EINVAL - max_threads is 0 or 'v' is NULL. 225 */ 226 int 227 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads); 228 229 /** 230 * Register a reader thread to report its quiescent state 231 * on a QS variable. 232 * 233 * This is implemented as a lock-free function. It is multi-thread 234 * safe. 235 * Any reader thread that wants to report its quiescent state must 236 * call this API. This can be called during initialization or as part 237 * of the packet processing loop. 238 * 239 * Note that rte_rcu_qsbr_thread_online must be called before the 240 * thread updates its quiescent state using rte_rcu_qsbr_quiescent. 241 * 242 * @param v 243 * QS variable 244 * @param thread_id 245 * Reader thread with this thread ID will report its quiescent state on 246 * the QS variable. thread_id is a value between 0 and (max_threads - 1). 247 * 'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API. 248 */ 249 int 250 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id); 251 252 /** 253 * Remove a reader thread, from the list of threads reporting their 254 * quiescent state on a QS variable. 255 * 256 * This is implemented as a lock-free function. It is multi-thread safe. 257 * This API can be called from the reader threads during shutdown. 258 * Ongoing quiescent state queries will stop waiting for the status from this 259 * unregistered reader thread. 260 * 261 * @param v 262 * QS variable 263 * @param thread_id 264 * Reader thread with this thread ID will stop reporting its quiescent 265 * state on the QS variable. 266 */ 267 int 268 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id); 269 270 /** 271 * Add a registered reader thread, to the list of threads reporting their 272 * quiescent state on a QS variable. 273 * 274 * This is implemented as a lock-free function. It is multi-thread 275 * safe. 276 * 277 * Any registered reader thread that wants to report its quiescent state must 278 * call this API before calling rte_rcu_qsbr_quiescent. This can be called 279 * during initialization or as part of the packet processing loop. 280 * 281 * The reader thread must call rte_rcu_qsbr_thread_offline API, before 282 * calling any functions that block, to ensure that rte_rcu_qsbr_check 283 * API does not wait indefinitely for the reader thread to update its QS. 284 * 285 * The reader thread must call rte_rcu_thread_online API, after the blocking 286 * function call returns, to ensure that rte_rcu_qsbr_check API 287 * waits for the reader thread to update its quiescent state. 288 * 289 * @param v 290 * QS variable 291 * @param thread_id 292 * Reader thread with this thread ID will report its quiescent state on 293 * the QS variable. 294 */ 295 static __rte_always_inline void 296 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id) 297 { 298 uint64_t t; 299 300 RTE_ASSERT(v != NULL && thread_id < v->max_threads); 301 302 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u", 303 v->qsbr_cnt[thread_id].lock_cnt); 304 305 /* Copy the current value of token. 306 * The fence at the end of the function will ensure that 307 * the following will not move down after the load of any shared 308 * data structure. 309 */ 310 t = rte_atomic_load_explicit(&v->token, rte_memory_order_relaxed); 311 312 /* rte_atomic_store_explicit(cnt, rte_memory_order_relaxed) is used to ensure 313 * 'cnt' (64b) is accessed atomically. 314 */ 315 rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt, 316 t, rte_memory_order_relaxed); 317 318 /* The subsequent load of the data structure should not 319 * move above the store. Hence a store-load barrier 320 * is required. 321 * If the load of the data structure moves above the store, 322 * writer might not see that the reader is online, even though 323 * the reader is referencing the shared data structure. 324 */ 325 rte_atomic_thread_fence(rte_memory_order_seq_cst); 326 } 327 328 /** 329 * Remove a registered reader thread from the list of threads reporting their 330 * quiescent state on a QS variable. 331 * 332 * This is implemented as a lock-free function. It is multi-thread 333 * safe. 334 * 335 * This can be called during initialization or as part of the packet 336 * processing loop. 337 * 338 * The reader thread must call rte_rcu_qsbr_thread_offline API, before 339 * calling any functions that block, to ensure that rte_rcu_qsbr_check 340 * API does not wait indefinitely for the reader thread to update its QS. 341 * 342 * @param v 343 * QS variable 344 * @param thread_id 345 * rte_rcu_qsbr_check API will not wait for the reader thread with 346 * this thread ID to report its quiescent state on the QS variable. 347 */ 348 static __rte_always_inline void 349 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id) 350 { 351 RTE_ASSERT(v != NULL && thread_id < v->max_threads); 352 353 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u", 354 v->qsbr_cnt[thread_id].lock_cnt); 355 356 /* The reader can go offline only after the load of the 357 * data structure is completed. i.e. any load of the 358 * data structure can not move after this store. 359 */ 360 361 rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt, 362 __RTE_QSBR_CNT_THR_OFFLINE, rte_memory_order_release); 363 } 364 365 /** 366 * Acquire a lock for accessing a shared data structure. 367 * 368 * This is implemented as a lock-free function. It is multi-thread 369 * safe. 370 * 371 * This API is provided to aid debugging. This should be called before 372 * accessing a shared data structure. 373 * 374 * When RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented. 375 * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the 376 * rte_rcu_qsbr_check API will verify that this counter is 0. 377 * 378 * When RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing. 379 * 380 * @param v 381 * QS variable 382 * @param thread_id 383 * Reader thread id 384 */ 385 static __rte_always_inline void 386 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, 387 __rte_unused unsigned int thread_id) 388 { 389 RTE_ASSERT(v != NULL && thread_id < v->max_threads); 390 391 #if defined(RTE_LIBRTE_RCU_DEBUG) 392 /* Increment the lock counter */ 393 rte_atomic_fetch_add_explicit(&v->qsbr_cnt[thread_id].lock_cnt, 394 1, rte_memory_order_acquire); 395 #endif 396 } 397 398 /** 399 * Release a lock after accessing a shared data structure. 400 * 401 * This is implemented as a lock-free function. It is multi-thread 402 * safe. 403 * 404 * This API is provided to aid debugging. This should be called after 405 * accessing a shared data structure. 406 * 407 * When RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will 408 * decrement a lock counter. rte_rcu_qsbr_check API will verify that this 409 * counter is 0. 410 * 411 * When RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing. 412 * 413 * @param v 414 * QS variable 415 * @param thread_id 416 * Reader thread id 417 */ 418 static __rte_always_inline void 419 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, 420 __rte_unused unsigned int thread_id) 421 { 422 RTE_ASSERT(v != NULL && thread_id < v->max_threads); 423 424 #if defined(RTE_LIBRTE_RCU_DEBUG) 425 /* Decrement the lock counter */ 426 rte_atomic_fetch_sub_explicit(&v->qsbr_cnt[thread_id].lock_cnt, 427 1, rte_memory_order_release); 428 429 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING, 430 "Lock counter %u. Nested locks?", 431 v->qsbr_cnt[thread_id].lock_cnt); 432 #endif 433 } 434 435 /** 436 * Ask the reader threads to report the quiescent state 437 * status. 438 * 439 * This is implemented as a lock-free function. It is multi-thread 440 * safe and can be called from worker threads. 441 * 442 * @param v 443 * QS variable 444 * @return 445 * - This is the token for this call of the API. This should be 446 * passed to rte_rcu_qsbr_check API. 447 */ 448 static __rte_always_inline uint64_t 449 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v) 450 { 451 uint64_t t; 452 453 RTE_ASSERT(v != NULL); 454 455 /* Release the changes to the shared data structure. 456 * This store release will ensure that changes to any data 457 * structure are visible to the workers before the token 458 * update is visible. 459 */ 460 t = rte_atomic_fetch_add_explicit(&v->token, 1, rte_memory_order_release) + 1; 461 462 return t; 463 } 464 465 /** 466 * Update quiescent state for a reader thread. 467 * 468 * This is implemented as a lock-free function. It is multi-thread safe. 469 * All the reader threads registered to report their quiescent state 470 * on the QS variable must call this API. 471 * 472 * @param v 473 * QS variable 474 * @param thread_id 475 * Update the quiescent state for the reader with this thread ID. 476 */ 477 static __rte_always_inline void 478 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id) 479 { 480 uint64_t t; 481 482 RTE_ASSERT(v != NULL && thread_id < v->max_threads); 483 484 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u", 485 v->qsbr_cnt[thread_id].lock_cnt); 486 487 /* Acquire the changes to the shared data structure released 488 * by rte_rcu_qsbr_start. 489 * Later loads of the shared data structure should not move 490 * above this load. Hence, use load-acquire. 491 */ 492 t = rte_atomic_load_explicit(&v->token, rte_memory_order_acquire); 493 494 /* Check if there are updates available from the writer. 495 * Inform the writer that updates are visible to this reader. 496 * Prior loads of the shared data structure should not move 497 * beyond this store. Hence use store-release. 498 */ 499 if (t != rte_atomic_load_explicit(&v->qsbr_cnt[thread_id].cnt, rte_memory_order_relaxed)) 500 rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt, 501 t, rte_memory_order_release); 502 503 __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d", 504 __func__, t, thread_id); 505 } 506 507 /* Check the quiescent state counter for registered threads only, assuming 508 * that not all threads have registered. 509 */ 510 static __rte_always_inline int 511 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait) 512 { 513 uint32_t i, j, id; 514 uint64_t bmap; 515 uint64_t c; 516 RTE_ATOMIC(uint64_t) *reg_thread_id; 517 uint64_t acked_token = __RTE_QSBR_CNT_MAX; 518 519 for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0); 520 i < v->num_elems; 521 i++, reg_thread_id++) { 522 /* Load the current registered thread bit map before 523 * loading the reader thread quiescent state counters. 524 */ 525 bmap = rte_atomic_load_explicit(reg_thread_id, rte_memory_order_acquire); 526 id = i << __RTE_QSBR_THRID_INDEX_SHIFT; 527 528 while (bmap) { 529 j = rte_ctz64(bmap); 530 __RTE_RCU_DP_LOG(DEBUG, 531 "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d", 532 __func__, t, wait, bmap, id + j); 533 c = rte_atomic_load_explicit( 534 &v->qsbr_cnt[id + j].cnt, 535 rte_memory_order_acquire); 536 __RTE_RCU_DP_LOG(DEBUG, 537 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d", 538 __func__, t, wait, c, id+j); 539 540 /* Counter is not checked for wrap-around condition 541 * as it is a 64b counter. 542 */ 543 if (unlikely(c != 544 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) { 545 /* This thread is not in quiescent state */ 546 if (!wait) 547 return 0; 548 549 rte_pause(); 550 /* This thread might have unregistered. 551 * Re-read the bitmap. 552 */ 553 bmap = rte_atomic_load_explicit(reg_thread_id, 554 rte_memory_order_acquire); 555 556 continue; 557 } 558 559 /* This thread is in quiescent state. Use the counter 560 * to find the least acknowledged token among all the 561 * readers. 562 */ 563 if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c) 564 acked_token = c; 565 566 bmap &= ~(1UL << j); 567 } 568 } 569 570 /* All readers are checked, update least acknowledged token. 571 * There might be multiple writers trying to update this. There is 572 * no need to update this very accurately using compare-and-swap. 573 */ 574 if (acked_token != __RTE_QSBR_CNT_MAX) 575 rte_atomic_store_explicit(&v->acked_token, acked_token, 576 rte_memory_order_relaxed); 577 578 return 1; 579 } 580 581 /* Check the quiescent state counter for all threads, assuming that 582 * all the threads have registered. 583 */ 584 static __rte_always_inline int 585 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait) 586 { 587 uint32_t i; 588 struct rte_rcu_qsbr_cnt *cnt; 589 uint64_t c; 590 uint64_t acked_token = __RTE_QSBR_CNT_MAX; 591 592 for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) { 593 __RTE_RCU_DP_LOG(DEBUG, 594 "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d", 595 __func__, t, wait, i); 596 while (1) { 597 c = rte_atomic_load_explicit(&cnt->cnt, rte_memory_order_acquire); 598 __RTE_RCU_DP_LOG(DEBUG, 599 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d", 600 __func__, t, wait, c, i); 601 602 /* Counter is not checked for wrap-around condition 603 * as it is a 64b counter. 604 */ 605 if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t)) 606 break; 607 608 /* This thread is not in quiescent state */ 609 if (!wait) 610 return 0; 611 612 rte_pause(); 613 } 614 615 /* This thread is in quiescent state. Use the counter to find 616 * the least acknowledged token among all the readers. 617 */ 618 if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)) 619 acked_token = c; 620 } 621 622 /* All readers are checked, update least acknowledged token. 623 * There might be multiple writers trying to update this. There is 624 * no need to update this very accurately using compare-and-swap. 625 */ 626 if (acked_token != __RTE_QSBR_CNT_MAX) 627 rte_atomic_store_explicit(&v->acked_token, acked_token, 628 rte_memory_order_relaxed); 629 630 return 1; 631 } 632 633 /** 634 * Checks if all the reader threads have entered the quiescent state 635 * referenced by token. 636 * 637 * This is implemented as a lock-free function. It is multi-thread 638 * safe and can be called from the worker threads as well. 639 * 640 * If this API is called with 'wait' set to true, the following 641 * factors must be considered: 642 * 643 * 1) If the calling thread is also reporting the status on the 644 * same QS variable, it must update the quiescent state status, before 645 * calling this API. 646 * 647 * 2) In addition, while calling from multiple threads, only 648 * one of those threads can be reporting the quiescent state status 649 * on a given QS variable. 650 * 651 * @param v 652 * QS variable 653 * @param t 654 * Token returned by rte_rcu_qsbr_start API 655 * @param wait 656 * If true, block till all the reader threads have completed entering 657 * the quiescent state referenced by token 't'. 658 * @return 659 * - 0 if all reader threads have NOT passed through specified number 660 * of quiescent states. 661 * - 1 if all reader threads have passed through specified number 662 * of quiescent states. 663 */ 664 static __rte_always_inline int 665 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) 666 { 667 uint64_t acked_token; 668 669 RTE_ASSERT(v != NULL); 670 671 /* Check if all the readers have already acknowledged this token */ 672 acked_token = rte_atomic_load_explicit(&v->acked_token, 673 rte_memory_order_relaxed); 674 if (likely(t <= acked_token)) { 675 __RTE_RCU_DP_LOG(DEBUG, 676 "%s: check: token = %" PRIu64 ", wait = %d", 677 __func__, t, wait); 678 __RTE_RCU_DP_LOG(DEBUG, 679 "%s: status: least acked token = %" PRIu64, 680 __func__, acked_token); 681 return 1; 682 } 683 684 if (likely(v->num_threads == v->max_threads)) 685 return __rte_rcu_qsbr_check_all(v, t, wait); 686 else 687 return __rte_rcu_qsbr_check_selective(v, t, wait); 688 } 689 690 /** 691 * Wait till the reader threads have entered quiescent state. 692 * 693 * This is implemented as a lock-free function. It is multi-thread safe. 694 * This API can be thought of as a wrapper around rte_rcu_qsbr_start and 695 * rte_rcu_qsbr_check APIs. 696 * 697 * If this API is called from multiple threads, only one of 698 * those threads can be reporting the quiescent state status on a 699 * given QS variable. 700 * 701 * @param v 702 * QS variable 703 * @param thread_id 704 * Thread ID of the caller if it is registered to report quiescent state 705 * on this QS variable (i.e. the calling thread is also part of the 706 * readside critical section). If not, pass RTE_QSBR_THRID_INVALID. 707 */ 708 void 709 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id); 710 711 /** 712 * Dump the details of a single QS variables to a file. 713 * 714 * It is NOT multi-thread safe. 715 * 716 * @param f 717 * A pointer to a file for output 718 * @param v 719 * QS variable 720 * @return 721 * On success - 0 722 * On error - 1 with error code set in rte_errno. 723 * Possible rte_errno codes are: 724 * - EINVAL - NULL parameters are passed 725 */ 726 int 727 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v); 728 729 /** 730 * Create a queue used to store the data structure elements that can 731 * be freed later. This queue is referred to as 'defer queue'. 732 * 733 * @param params 734 * Parameters to create a defer queue. 735 * @return 736 * On success - Valid pointer to defer queue 737 * On error - NULL 738 * Possible rte_errno codes are: 739 * - EINVAL - NULL parameters are passed 740 * - ENOMEM - Not enough memory 741 */ 742 struct rte_rcu_qsbr_dq * 743 rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params); 744 745 /** 746 * Enqueue one resource to the defer queue and start the grace period. 747 * The resource will be freed later after at least one grace period 748 * is over. 749 * 750 * If the defer queue is full, it will attempt to reclaim resources. 751 * It will also reclaim resources at regular intervals to avoid 752 * the defer queue from growing too big. 753 * 754 * Multi-thread safety is provided as the defer queue configuration. 755 * When multi-thread safety is requested, it is possible that the 756 * resources are not stored in their order of deletion. This results 757 * in resources being held in the defer queue longer than they should. 758 * 759 * @param dq 760 * Defer queue to allocate an entry from. 761 * @param e 762 * Pointer to resource data to copy to the defer queue. The size of 763 * the data to copy is equal to the element size provided when the 764 * defer queue was created. 765 * @return 766 * On success - 0 767 * On error - 1 with rte_errno set to 768 * - EINVAL - NULL parameters are passed 769 * - ENOSPC - Defer queue is full. This condition can not happen 770 * if the defer queue size is equal (or larger) than the 771 * number of elements in the data structure. 772 */ 773 int 774 rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e); 775 776 /** 777 * Free resources from the defer queue. 778 * 779 * This API is multi-thread safe. 780 * 781 * @param dq 782 * Defer queue to free an entry from. 783 * @param n 784 * Maximum number of resources to free. 785 * @param freed 786 * Number of resources that were freed. 787 * @param pending 788 * Number of resources pending on the defer queue. This number might not 789 * be accurate if multi-thread safety is configured. 790 * @param available 791 * Number of resources that can be added to the defer queue. 792 * This number might not be accurate if multi-thread safety is configured. 793 * @return 794 * On successful reclamation of at least 1 resource - 0 795 * On error - 1 with rte_errno set to 796 * - EINVAL - NULL parameters are passed 797 */ 798 int 799 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, 800 unsigned int *freed, unsigned int *pending, unsigned int *available); 801 802 /** 803 * Delete a defer queue. 804 * 805 * It tries to reclaim all the resources on the defer queue. 806 * If any of the resources have not completed the grace period 807 * the reclamation stops and returns immediately. The rest of 808 * the resources are not reclaimed and the defer queue is not 809 * freed. 810 * 811 * @param dq 812 * Defer queue to delete. 813 * @return 814 * On success - 0 815 * On error - 1 816 * Possible rte_errno codes are: 817 * - EAGAIN - Some of the resources have not completed at least 1 grace 818 * period, try again. 819 */ 820 int 821 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq); 822 823 #ifdef __cplusplus 824 } 825 #endif 826 827 #endif /* _RTE_RCU_QSBR_H_ */ 828