1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2024 Huawei Technologies Co., Ltd 3 */ 4 5 /** 6 * @file 7 * This file contains implementation of SORING 'datapath' functions. 8 * 9 * Brief description: 10 * ================== 11 * enqueue/dequeue works the same as for conventional rte_ring: 12 * any rte_ring sync types can be used, etc. 13 * Plus there could be multiple 'stages'. 14 * For each stage there is an acquire (start) and release (finish) operation. 15 * After some elems are 'acquired' - user can safely assume that he has 16 * exclusive possession of these elems till 'release' for them is done. 17 * Note that right now user has to release exactly the same number of elems 18 * he acquired before. 19 * After 'release', elems can be 'acquired' by next stage and/or dequeued 20 * (in case of last stage). 21 * 22 * Internal structure: 23 * =================== 24 * In addition to 'normal' ring of elems, it also has a ring of states of the 25 * same size. Each state[] corresponds to exactly one elem[]. 26 * state[] will be used by acquire/release/dequeue functions to store internal 27 * information and should not be accessed by the user directly. 28 * 29 * How it works: 30 * ============= 31 * 'acquire()' just moves stage's head (same as rte_ring move_head does), 32 * plus it saves in state[stage.cur_head] information about how many elems 33 * were acquired, current head position and special flag value to indicate 34 * that elems are acquired (SORING_ST_START). 35 * Note that 'acquire()' returns to the user a special 'ftoken' that user has 36 * to provide for 'release()' (in fact it is just a position for current head 37 * plus current stage index). 38 * 'release()' extracts old head value from provided ftoken and checks that 39 * corresponding 'state[]' contains expected values(mostly for sanity 40 * purposes). 41 * Then it marks this state[] with 'SORING_ST_FINISH' flag to indicate 42 * that given subset of objects was released. 43 * After that, it checks does old head value equals to current tail value? 44 * If yes, then it performs 'finalize()' operation, otherwise 'release()' 45 * just returns (without spinning on stage tail value). 46 * As updated state[] is shared by all threads, some other thread can do 47 * 'finalize()' for given stage. 48 * That allows 'release()' to avoid excessive waits on the tail value. 49 * Main purpose of 'finalize()' operation is to walk through 'state[]' 50 * from current stage tail up to its head, check state[] and move stage tail 51 * through elements that already are in SORING_ST_FINISH state. 52 * Along with that, corresponding state[] values are reset to zero. 53 * Note that 'finalize()' for given stage can be done from multiple places: 54 * 'release()' for that stage or from 'acquire()' for next stage 55 * even from consumer's 'dequeue()' - in case given stage is the last one. 56 * So 'finalize()' has to be MT-safe and inside it we have to 57 * guarantee that only one thread will update state[] and stage's tail values. 58 */ 59 60 #include "soring.h" 61 62 /* 63 * Inline functions (fastpath) start here. 64 */ 65 static __rte_always_inline uint32_t 66 __rte_soring_stage_finalize(struct soring_stage_headtail *sht, uint32_t stage, 67 union soring_state *rstate, uint32_t rmask, uint32_t maxn) 68 { 69 int32_t rc; 70 uint32_t ftkn, head, i, idx, k, n, tail; 71 union soring_stage_tail nt, ot; 72 union soring_state st; 73 74 /* try to grab exclusive right to update tail value */ 75 ot.raw = rte_atomic_load_explicit(&sht->tail.raw, 76 rte_memory_order_acquire); 77 78 /* other thread already finalizing it for us */ 79 if (ot.sync != 0) 80 return 0; 81 82 nt.pos = ot.pos; 83 nt.sync = 1; 84 rc = rte_atomic_compare_exchange_strong_explicit(&sht->tail.raw, 85 (uint64_t *)(uintptr_t)&ot.raw, nt.raw, 86 rte_memory_order_release, rte_memory_order_relaxed); 87 88 /* other thread won the race */ 89 if (rc == 0) 90 return 0; 91 92 /* Ensure the head is read before rstate[] */ 93 head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed); 94 rte_atomic_thread_fence(rte_memory_order_acquire); 95 96 /* 97 * start with current tail and walk through states that are 98 * already finished. 99 */ 100 101 n = RTE_MIN(head - ot.pos, maxn); 102 for (i = 0, tail = ot.pos; i < n; i += k, tail += k) { 103 104 idx = tail & rmask; 105 ftkn = SORING_FTKN_MAKE(tail, stage); 106 107 st.raw = rte_atomic_load_explicit(&rstate[idx].raw, 108 rte_memory_order_relaxed); 109 if ((st.stnum & SORING_ST_MASK) != SORING_ST_FINISH || 110 st.ftoken != ftkn) 111 break; 112 113 k = st.stnum & ~SORING_ST_MASK; 114 rte_atomic_store_explicit(&rstate[idx].raw, 0, 115 rte_memory_order_relaxed); 116 } 117 118 119 /* release exclusive right to update along with new tail value */ 120 ot.pos = tail; 121 rte_atomic_store_explicit(&sht->tail.raw, ot.raw, 122 rte_memory_order_release); 123 124 return i; 125 } 126 127 static __rte_always_inline uint32_t 128 __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num, 129 enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st, 130 uint32_t *head, uint32_t *next, uint32_t *free) 131 { 132 uint32_t n; 133 134 switch (st) { 135 case RTE_RING_SYNC_ST: 136 case RTE_RING_SYNC_MT: 137 n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht, 138 r->capacity, st, num, behavior, head, next, free); 139 break; 140 case RTE_RING_SYNC_MT_RTS: 141 n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht, 142 r->capacity, num, behavior, head, free); 143 *next = *head + n; 144 break; 145 case RTE_RING_SYNC_MT_HTS: 146 n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht, 147 r->capacity, num, behavior, head, free); 148 *next = *head + n; 149 break; 150 default: 151 /* unsupported mode, shouldn't be here */ 152 RTE_ASSERT(0); 153 *free = 0; 154 n = 0; 155 } 156 157 return n; 158 } 159 160 static __rte_always_inline uint32_t 161 __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num, 162 enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st, 163 uint32_t *head, uint32_t *next, uint32_t *avail) 164 { 165 uint32_t n; 166 167 switch (st) { 168 case RTE_RING_SYNC_ST: 169 case RTE_RING_SYNC_MT: 170 n = __rte_ring_headtail_move_head(&r->cons.ht, 171 &r->stage[stage].ht, 0, st, num, behavior, 172 head, next, avail); 173 break; 174 case RTE_RING_SYNC_MT_RTS: 175 n = __rte_ring_rts_move_head(&r->cons.rts, &r->stage[stage].ht, 176 0, num, behavior, head, avail); 177 *next = *head + n; 178 break; 179 case RTE_RING_SYNC_MT_HTS: 180 n = __rte_ring_hts_move_head(&r->cons.hts, &r->stage[stage].ht, 181 0, num, behavior, head, avail); 182 *next = *head + n; 183 break; 184 default: 185 /* unsupported mode, shouldn't be here */ 186 RTE_ASSERT(0); 187 *avail = 0; 188 n = 0; 189 } 190 191 return n; 192 } 193 194 static __rte_always_inline void 195 __rte_soring_update_tail(struct __rte_ring_headtail *rht, 196 enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq) 197 { 198 uint32_t n; 199 200 switch (st) { 201 case RTE_RING_SYNC_ST: 202 case RTE_RING_SYNC_MT: 203 __rte_ring_update_tail(&rht->ht, head, next, st, enq); 204 break; 205 case RTE_RING_SYNC_MT_RTS: 206 __rte_ring_rts_update_tail(&rht->rts); 207 break; 208 case RTE_RING_SYNC_MT_HTS: 209 n = next - head; 210 __rte_ring_hts_update_tail(&rht->hts, head, n, enq); 211 break; 212 default: 213 /* unsupported mode, shouldn't be here */ 214 RTE_ASSERT(0); 215 } 216 } 217 218 static __rte_always_inline uint32_t 219 __rte_soring_stage_move_head(struct soring_stage_headtail *d, 220 const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num, 221 enum rte_ring_queue_behavior behavior, 222 uint32_t *old_head, uint32_t *new_head, uint32_t *avail) 223 { 224 uint32_t n, tail; 225 226 *old_head = rte_atomic_load_explicit(&d->head, 227 rte_memory_order_relaxed); 228 229 do { 230 n = num; 231 232 /* Ensure the head is read before tail */ 233 rte_atomic_thread_fence(rte_memory_order_acquire); 234 235 tail = rte_atomic_load_explicit(&s->tail, 236 rte_memory_order_acquire); 237 *avail = capacity + tail - *old_head; 238 if (n > *avail) 239 n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail; 240 if (n == 0) 241 return 0; 242 *new_head = *old_head + n; 243 } while (rte_atomic_compare_exchange_strong_explicit(&d->head, 244 old_head, *new_head, rte_memory_order_acq_rel, 245 rte_memory_order_relaxed) == 0); 246 247 return n; 248 } 249 250 static __rte_always_inline uint32_t 251 soring_enqueue(struct rte_soring *r, const void *objs, 252 const void *meta, uint32_t n, enum rte_ring_queue_behavior behavior, 253 uint32_t *free_space) 254 { 255 enum rte_ring_sync_type st; 256 uint32_t nb_free, prod_head, prod_next; 257 258 RTE_ASSERT(r != NULL && r->nb_stage > 0); 259 RTE_ASSERT(meta == NULL || r->meta != NULL); 260 261 st = r->prod.ht.sync_type; 262 263 n = __rte_soring_move_prod_head(r, n, behavior, st, 264 &prod_head, &prod_next, &nb_free); 265 if (n != 0) { 266 __rte_ring_do_enqueue_elems(&r[1], objs, r->size, 267 prod_head & r->mask, r->esize, n); 268 if (meta != NULL) 269 __rte_ring_do_enqueue_elems(r->meta, meta, r->size, 270 prod_head & r->mask, r->msize, n); 271 __rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1); 272 } 273 274 if (free_space != NULL) 275 *free_space = nb_free - n; 276 return n; 277 } 278 279 static __rte_always_inline uint32_t 280 soring_dequeue(struct rte_soring *r, void *objs, void *meta, 281 uint32_t num, enum rte_ring_queue_behavior behavior, 282 uint32_t *available) 283 { 284 enum rte_ring_sync_type st; 285 uint32_t entries, cons_head, cons_next, n, ns, reqn; 286 287 RTE_ASSERT(r != NULL && r->nb_stage > 0); 288 RTE_ASSERT(meta == NULL || r->meta != NULL); 289 290 ns = r->nb_stage - 1; 291 st = r->cons.ht.sync_type; 292 293 /* try to grab exactly @num elems first */ 294 n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st, 295 &cons_head, &cons_next, &entries); 296 if (n == 0) { 297 /* try to finalize some elems from previous stage */ 298 n = __rte_soring_stage_finalize(&r->stage[ns].sht, ns, 299 r->state, r->mask, 2 * num); 300 entries += n; 301 302 /* repeat attempt to grab elems */ 303 reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0; 304 if (entries >= reqn) 305 n = __rte_soring_move_cons_head(r, ns, num, behavior, 306 st, &cons_head, &cons_next, &entries); 307 else 308 n = 0; 309 } 310 311 /* we have some elems to consume */ 312 if (n != 0) { 313 __rte_ring_do_dequeue_elems(objs, &r[1], r->size, 314 cons_head & r->mask, r->esize, n); 315 if (meta != NULL) 316 __rte_ring_do_dequeue_elems(meta, r->meta, r->size, 317 cons_head & r->mask, r->msize, n); 318 __rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0); 319 } 320 321 if (available != NULL) 322 *available = entries - n; 323 return n; 324 } 325 326 /* 327 * Verify internal SORING state. 328 * WARNING: if expected value is not equal to actual one, it means that for 329 * whatever reason SORING data constancy is broken. That is a very serious 330 * problem that most likely will cause race-conditions, memory corruption, 331 * program crash. 332 * To ease debugging it user might rebuild ring library with 333 * RTE_SORING_DEBUG enabled. 334 */ 335 static __rte_always_inline void 336 soring_verify_state(const struct rte_soring *r, uint32_t stage, uint32_t idx, 337 const char *msg, union soring_state val, union soring_state exp) 338 { 339 if (val.raw != exp.raw) { 340 #ifdef RTE_SORING_DEBUG 341 rte_soring_dump(stderr, r); 342 rte_panic("line:%d from:%s: soring=%p, stage=%#x, idx=%#x, " 343 "expected={.stnum=%#x, .ftoken=%#x}, " 344 "actual={.stnum=%#x, .ftoken=%#x};\n", 345 __LINE__, msg, r, stage, idx, 346 exp.stnum, exp.ftoken, 347 val.stnum, val.ftoken); 348 #else 349 SORING_LOG(EMERG, "from:%s: soring=%p, stage=%#x, idx=%#x, " 350 "expected={.stnum=%#x, .ftoken=%#x}, " 351 "actual={.stnum=%#x, .ftoken=%#x};", 352 msg, r, stage, idx, 353 exp.stnum, exp.ftoken, 354 val.stnum, val.ftoken); 355 #endif 356 } 357 } 358 359 /* check and update state ring at acquire op*/ 360 static __rte_always_inline void 361 acquire_state_update(const struct rte_soring *r, uint32_t stage, uint32_t idx, 362 uint32_t ftoken, uint32_t num) 363 { 364 union soring_state st; 365 const union soring_state est = {.raw = 0}; 366 367 st.raw = rte_atomic_load_explicit(&r->state[idx].raw, 368 rte_memory_order_relaxed); 369 soring_verify_state(r, stage, idx, __func__, st, est); 370 371 st.ftoken = ftoken; 372 st.stnum = (SORING_ST_START | num); 373 374 rte_atomic_store_explicit(&r->state[idx].raw, st.raw, 375 rte_memory_order_relaxed); 376 } 377 378 static __rte_always_inline uint32_t 379 soring_acquire(struct rte_soring *r, void *objs, void *meta, 380 uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior, 381 uint32_t *ftoken, uint32_t *available) 382 { 383 uint32_t avail, head, idx, n, next, reqn; 384 struct soring_stage *pstg; 385 struct soring_stage_headtail *cons; 386 387 RTE_ASSERT(r != NULL && stage < r->nb_stage); 388 RTE_ASSERT(meta == NULL || r->meta != NULL); 389 390 cons = &r->stage[stage].sht; 391 392 if (stage == 0) 393 n = __rte_soring_stage_move_head(cons, &r->prod.ht, 0, num, 394 behavior, &head, &next, &avail); 395 else { 396 pstg = r->stage + stage - 1; 397 398 /* try to grab exactly @num elems */ 399 n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num, 400 RTE_RING_QUEUE_FIXED, &head, &next, &avail); 401 if (n == 0) { 402 /* try to finalize some elems from previous stage */ 403 n = __rte_soring_stage_finalize(&pstg->sht, stage - 1, 404 r->state, r->mask, 2 * num); 405 avail += n; 406 407 /* repeat attempt to grab elems */ 408 reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0; 409 if (avail >= reqn) 410 n = __rte_soring_stage_move_head(cons, 411 &pstg->ht, 0, num, behavior, &head, 412 &next, &avail); 413 else 414 n = 0; 415 } 416 } 417 418 if (n != 0) { 419 420 idx = head & r->mask; 421 *ftoken = SORING_FTKN_MAKE(head, stage); 422 423 /* check and update state value */ 424 acquire_state_update(r, stage, idx, *ftoken, n); 425 426 /* copy elems that are ready for given stage */ 427 __rte_ring_do_dequeue_elems(objs, &r[1], r->size, idx, 428 r->esize, n); 429 if (meta != NULL) 430 __rte_ring_do_dequeue_elems(meta, r->meta, 431 r->size, idx, r->msize, n); 432 } 433 434 if (available != NULL) 435 *available = avail - n; 436 return n; 437 } 438 439 static __rte_always_inline void 440 soring_release(struct rte_soring *r, const void *objs, 441 const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken) 442 { 443 uint32_t idx, pos, tail; 444 struct soring_stage *stg; 445 union soring_state st; 446 447 const union soring_state est = { 448 .stnum = (SORING_ST_START | n), 449 .ftoken = ftoken, 450 }; 451 452 RTE_ASSERT(r != NULL && stage < r->nb_stage); 453 RTE_ASSERT(meta == NULL || r->meta != NULL); 454 455 stg = r->stage + stage; 456 457 pos = SORING_FTKN_POS(ftoken, stage); 458 idx = pos & r->mask; 459 st.raw = rte_atomic_load_explicit(&r->state[idx].raw, 460 rte_memory_order_relaxed); 461 462 /* check state ring contents */ 463 soring_verify_state(r, stage, idx, __func__, st, est); 464 465 /* update contents of the ring, if necessary */ 466 if (objs != NULL) 467 __rte_ring_do_enqueue_elems(&r[1], objs, r->size, idx, 468 r->esize, n); 469 if (meta != NULL) 470 __rte_ring_do_enqueue_elems(r->meta, meta, r->size, idx, 471 r->msize, n); 472 473 /* set state to FINISH, make sure it is not reordered */ 474 rte_atomic_thread_fence(rte_memory_order_release); 475 476 st.stnum = SORING_ST_FINISH | n; 477 rte_atomic_store_explicit(&r->state[idx].raw, st.raw, 478 rte_memory_order_relaxed); 479 480 /* try to do finalize(), if appropriate */ 481 tail = rte_atomic_load_explicit(&stg->sht.tail.pos, 482 rte_memory_order_relaxed); 483 if (tail == pos) 484 __rte_soring_stage_finalize(&stg->sht, stage, r->state, r->mask, 485 r->capacity); 486 } 487 488 /* 489 * Public functions (data-path) start here. 490 */ 491 492 void 493 rte_soring_release(struct rte_soring *r, const void *objs, 494 uint32_t stage, uint32_t n, uint32_t ftoken) 495 { 496 soring_release(r, objs, NULL, stage, n, ftoken); 497 } 498 499 500 void 501 rte_soring_releasx(struct rte_soring *r, const void *objs, 502 const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken) 503 { 504 soring_release(r, objs, meta, stage, n, ftoken); 505 } 506 507 uint32_t 508 rte_soring_enqueue_bulk(struct rte_soring *r, const void *objs, uint32_t n, 509 uint32_t *free_space) 510 { 511 return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_FIXED, 512 free_space); 513 } 514 515 uint32_t 516 rte_soring_enqueux_bulk(struct rte_soring *r, const void *objs, 517 const void *meta, uint32_t n, uint32_t *free_space) 518 { 519 return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_FIXED, 520 free_space); 521 } 522 523 uint32_t 524 rte_soring_enqueue_burst(struct rte_soring *r, const void *objs, uint32_t n, 525 uint32_t *free_space) 526 { 527 return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_VARIABLE, 528 free_space); 529 } 530 531 uint32_t 532 rte_soring_enqueux_burst(struct rte_soring *r, const void *objs, 533 const void *meta, uint32_t n, uint32_t *free_space) 534 { 535 return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_VARIABLE, 536 free_space); 537 } 538 539 uint32_t 540 rte_soring_dequeue_bulk(struct rte_soring *r, void *objs, uint32_t num, 541 uint32_t *available) 542 { 543 return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_FIXED, 544 available); 545 } 546 547 uint32_t 548 rte_soring_dequeux_bulk(struct rte_soring *r, void *objs, void *meta, 549 uint32_t num, uint32_t *available) 550 { 551 return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_FIXED, 552 available); 553 } 554 555 uint32_t 556 rte_soring_dequeue_burst(struct rte_soring *r, void *objs, uint32_t num, 557 uint32_t *available) 558 { 559 return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_VARIABLE, 560 available); 561 } 562 563 uint32_t 564 rte_soring_dequeux_burst(struct rte_soring *r, void *objs, void *meta, 565 uint32_t num, uint32_t *available) 566 { 567 return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_VARIABLE, 568 available); 569 } 570 571 uint32_t 572 rte_soring_acquire_bulk(struct rte_soring *r, void *objs, 573 uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available) 574 { 575 return soring_acquire(r, objs, NULL, stage, num, 576 RTE_RING_QUEUE_FIXED, ftoken, available); 577 } 578 579 uint32_t 580 rte_soring_acquirx_bulk(struct rte_soring *r, void *objs, void *meta, 581 uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available) 582 { 583 return soring_acquire(r, objs, meta, stage, num, 584 RTE_RING_QUEUE_FIXED, ftoken, available); 585 } 586 587 uint32_t 588 rte_soring_acquire_burst(struct rte_soring *r, void *objs, 589 uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available) 590 { 591 return soring_acquire(r, objs, NULL, stage, num, 592 RTE_RING_QUEUE_VARIABLE, ftoken, available); 593 } 594 595 uint32_t 596 rte_soring_acquirx_burst(struct rte_soring *r, void *objs, void *meta, 597 uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available) 598 { 599 return soring_acquire(r, objs, meta, stage, num, 600 RTE_RING_QUEUE_VARIABLE, ftoken, available); 601 } 602 603 unsigned int 604 rte_soring_count(const struct rte_soring *r) 605 { 606 uint32_t prod_tail = r->prod.ht.tail; 607 uint32_t cons_tail = r->cons.ht.tail; 608 uint32_t count = (prod_tail - cons_tail) & r->mask; 609 return (count > r->capacity) ? r->capacity : count; 610 } 611 612 unsigned int 613 rte_soring_free_count(const struct rte_soring *r) 614 { 615 return r->capacity - rte_soring_count(r); 616 } 617