1*b5458e2cSKonstantin Ananyev /* SPDX-License-Identifier: BSD-3-Clause 2*b5458e2cSKonstantin Ananyev * Copyright(c) 2024 Huawei Technologies Co., Ltd 3*b5458e2cSKonstantin Ananyev */ 4*b5458e2cSKonstantin Ananyev 5*b5458e2cSKonstantin Ananyev /** 6*b5458e2cSKonstantin Ananyev * @file 7*b5458e2cSKonstantin Ananyev * This file contains implementation of SORING 'datapath' functions. 8*b5458e2cSKonstantin Ananyev * 9*b5458e2cSKonstantin Ananyev * Brief description: 10*b5458e2cSKonstantin Ananyev * ================== 11*b5458e2cSKonstantin Ananyev * enqueue/dequeue works the same as for conventional rte_ring: 12*b5458e2cSKonstantin Ananyev * any rte_ring sync types can be used, etc. 13*b5458e2cSKonstantin Ananyev * Plus there could be multiple 'stages'. 14*b5458e2cSKonstantin Ananyev * For each stage there is an acquire (start) and release (finish) operation. 15*b5458e2cSKonstantin Ananyev * After some elems are 'acquired' - user can safely assume that he has 16*b5458e2cSKonstantin Ananyev * exclusive possession of these elems till 'release' for them is done. 17*b5458e2cSKonstantin Ananyev * Note that right now user has to release exactly the same number of elems 18*b5458e2cSKonstantin Ananyev * he acquired before. 19*b5458e2cSKonstantin Ananyev * After 'release', elems can be 'acquired' by next stage and/or dequeued 20*b5458e2cSKonstantin Ananyev * (in case of last stage). 21*b5458e2cSKonstantin Ananyev * 22*b5458e2cSKonstantin Ananyev * Internal structure: 23*b5458e2cSKonstantin Ananyev * =================== 24*b5458e2cSKonstantin Ananyev * In addition to 'normal' ring of elems, it also has a ring of states of the 25*b5458e2cSKonstantin Ananyev * same size. Each state[] corresponds to exactly one elem[]. 26*b5458e2cSKonstantin Ananyev * state[] will be used by acquire/release/dequeue functions to store internal 27*b5458e2cSKonstantin Ananyev * information and should not be accessed by the user directly. 28*b5458e2cSKonstantin Ananyev * 29*b5458e2cSKonstantin Ananyev * How it works: 30*b5458e2cSKonstantin Ananyev * ============= 31*b5458e2cSKonstantin Ananyev * 'acquire()' just moves stage's head (same as rte_ring move_head does), 32*b5458e2cSKonstantin Ananyev * plus it saves in state[stage.cur_head] information about how many elems 33*b5458e2cSKonstantin Ananyev * were acquired, current head position and special flag value to indicate 34*b5458e2cSKonstantin Ananyev * that elems are acquired (SORING_ST_START). 35*b5458e2cSKonstantin Ananyev * Note that 'acquire()' returns to the user a special 'ftoken' that user has 36*b5458e2cSKonstantin Ananyev * to provide for 'release()' (in fact it is just a position for current head 37*b5458e2cSKonstantin Ananyev * plus current stage index). 38*b5458e2cSKonstantin Ananyev * 'release()' extracts old head value from provided ftoken and checks that 39*b5458e2cSKonstantin Ananyev * corresponding 'state[]' contains expected values(mostly for sanity 40*b5458e2cSKonstantin Ananyev * purposes). 41*b5458e2cSKonstantin Ananyev * Then it marks this state[] with 'SORING_ST_FINISH' flag to indicate 42*b5458e2cSKonstantin Ananyev * that given subset of objects was released. 43*b5458e2cSKonstantin Ananyev * After that, it checks does old head value equals to current tail value? 44*b5458e2cSKonstantin Ananyev * If yes, then it performs 'finalize()' operation, otherwise 'release()' 45*b5458e2cSKonstantin Ananyev * just returns (without spinning on stage tail value). 46*b5458e2cSKonstantin Ananyev * As updated state[] is shared by all threads, some other thread can do 47*b5458e2cSKonstantin Ananyev * 'finalize()' for given stage. 48*b5458e2cSKonstantin Ananyev * That allows 'release()' to avoid excessive waits on the tail value. 49*b5458e2cSKonstantin Ananyev * Main purpose of 'finalize()' operation is to walk through 'state[]' 50*b5458e2cSKonstantin Ananyev * from current stage tail up to its head, check state[] and move stage tail 51*b5458e2cSKonstantin Ananyev * through elements that already are in SORING_ST_FINISH state. 52*b5458e2cSKonstantin Ananyev * Along with that, corresponding state[] values are reset to zero. 53*b5458e2cSKonstantin Ananyev * Note that 'finalize()' for given stage can be done from multiple places: 54*b5458e2cSKonstantin Ananyev * 'release()' for that stage or from 'acquire()' for next stage 55*b5458e2cSKonstantin Ananyev * even from consumer's 'dequeue()' - in case given stage is the last one. 56*b5458e2cSKonstantin Ananyev * So 'finalize()' has to be MT-safe and inside it we have to 57*b5458e2cSKonstantin Ananyev * guarantee that only one thread will update state[] and stage's tail values. 58*b5458e2cSKonstantin Ananyev */ 59*b5458e2cSKonstantin Ananyev 60*b5458e2cSKonstantin Ananyev #include "soring.h" 61*b5458e2cSKonstantin Ananyev 62*b5458e2cSKonstantin Ananyev /* 63*b5458e2cSKonstantin Ananyev * Inline functions (fastpath) start here. 64*b5458e2cSKonstantin Ananyev */ 65*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t 66*b5458e2cSKonstantin Ananyev __rte_soring_stage_finalize(struct soring_stage_headtail *sht, uint32_t stage, 67*b5458e2cSKonstantin Ananyev union soring_state *rstate, uint32_t rmask, uint32_t maxn) 68*b5458e2cSKonstantin Ananyev { 69*b5458e2cSKonstantin Ananyev int32_t rc; 70*b5458e2cSKonstantin Ananyev uint32_t ftkn, head, i, idx, k, n, tail; 71*b5458e2cSKonstantin Ananyev union soring_stage_tail nt, ot; 72*b5458e2cSKonstantin Ananyev union soring_state st; 73*b5458e2cSKonstantin Ananyev 74*b5458e2cSKonstantin Ananyev /* try to grab exclusive right to update tail value */ 75*b5458e2cSKonstantin Ananyev ot.raw = rte_atomic_load_explicit(&sht->tail.raw, 76*b5458e2cSKonstantin Ananyev rte_memory_order_acquire); 77*b5458e2cSKonstantin Ananyev 78*b5458e2cSKonstantin Ananyev /* other thread already finalizing it for us */ 79*b5458e2cSKonstantin Ananyev if (ot.sync != 0) 80*b5458e2cSKonstantin Ananyev return 0; 81*b5458e2cSKonstantin Ananyev 82*b5458e2cSKonstantin Ananyev nt.pos = ot.pos; 83*b5458e2cSKonstantin Ananyev nt.sync = 1; 84*b5458e2cSKonstantin Ananyev rc = rte_atomic_compare_exchange_strong_explicit(&sht->tail.raw, 85*b5458e2cSKonstantin Ananyev (uint64_t *)(uintptr_t)&ot.raw, nt.raw, 86*b5458e2cSKonstantin Ananyev rte_memory_order_release, rte_memory_order_relaxed); 87*b5458e2cSKonstantin Ananyev 88*b5458e2cSKonstantin Ananyev /* other thread won the race */ 89*b5458e2cSKonstantin Ananyev if (rc == 0) 90*b5458e2cSKonstantin Ananyev return 0; 91*b5458e2cSKonstantin Ananyev 92*b5458e2cSKonstantin Ananyev /* Ensure the head is read before rstate[] */ 93*b5458e2cSKonstantin Ananyev head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed); 94*b5458e2cSKonstantin Ananyev rte_atomic_thread_fence(rte_memory_order_acquire); 95*b5458e2cSKonstantin Ananyev 96*b5458e2cSKonstantin Ananyev /* 97*b5458e2cSKonstantin Ananyev * start with current tail and walk through states that are 98*b5458e2cSKonstantin Ananyev * already finished. 99*b5458e2cSKonstantin Ananyev */ 100*b5458e2cSKonstantin Ananyev 101*b5458e2cSKonstantin Ananyev n = RTE_MIN(head - ot.pos, maxn); 102*b5458e2cSKonstantin Ananyev for (i = 0, tail = ot.pos; i < n; i += k, tail += k) { 103*b5458e2cSKonstantin Ananyev 104*b5458e2cSKonstantin Ananyev idx = tail & rmask; 105*b5458e2cSKonstantin Ananyev ftkn = SORING_FTKN_MAKE(tail, stage); 106*b5458e2cSKonstantin Ananyev 107*b5458e2cSKonstantin Ananyev st.raw = rte_atomic_load_explicit(&rstate[idx].raw, 108*b5458e2cSKonstantin Ananyev rte_memory_order_relaxed); 109*b5458e2cSKonstantin Ananyev if ((st.stnum & SORING_ST_MASK) != SORING_ST_FINISH || 110*b5458e2cSKonstantin Ananyev st.ftoken != ftkn) 111*b5458e2cSKonstantin Ananyev break; 112*b5458e2cSKonstantin Ananyev 113*b5458e2cSKonstantin Ananyev k = st.stnum & ~SORING_ST_MASK; 114*b5458e2cSKonstantin Ananyev rte_atomic_store_explicit(&rstate[idx].raw, 0, 115*b5458e2cSKonstantin Ananyev rte_memory_order_relaxed); 116*b5458e2cSKonstantin Ananyev } 117*b5458e2cSKonstantin Ananyev 118*b5458e2cSKonstantin Ananyev 119*b5458e2cSKonstantin Ananyev /* release exclusive right to update along with new tail value */ 120*b5458e2cSKonstantin Ananyev ot.pos = tail; 121*b5458e2cSKonstantin Ananyev rte_atomic_store_explicit(&sht->tail.raw, ot.raw, 122*b5458e2cSKonstantin Ananyev rte_memory_order_release); 123*b5458e2cSKonstantin Ananyev 124*b5458e2cSKonstantin Ananyev return i; 125*b5458e2cSKonstantin Ananyev } 126*b5458e2cSKonstantin Ananyev 127*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t 128*b5458e2cSKonstantin Ananyev __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num, 129*b5458e2cSKonstantin Ananyev enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st, 130*b5458e2cSKonstantin Ananyev uint32_t *head, uint32_t *next, uint32_t *free) 131*b5458e2cSKonstantin Ananyev { 132*b5458e2cSKonstantin Ananyev uint32_t n; 133*b5458e2cSKonstantin Ananyev 134*b5458e2cSKonstantin Ananyev switch (st) { 135*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_ST: 136*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_MT: 137*b5458e2cSKonstantin Ananyev n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht, 138*b5458e2cSKonstantin Ananyev r->capacity, st, num, behavior, head, next, free); 139*b5458e2cSKonstantin Ananyev break; 140*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_MT_RTS: 141*b5458e2cSKonstantin Ananyev n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht, 142*b5458e2cSKonstantin Ananyev r->capacity, num, behavior, head, free); 143*b5458e2cSKonstantin Ananyev *next = *head + n; 144*b5458e2cSKonstantin Ananyev break; 145*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_MT_HTS: 146*b5458e2cSKonstantin Ananyev n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht, 147*b5458e2cSKonstantin Ananyev r->capacity, num, behavior, head, free); 148*b5458e2cSKonstantin Ananyev *next = *head + n; 149*b5458e2cSKonstantin Ananyev break; 150*b5458e2cSKonstantin Ananyev default: 151*b5458e2cSKonstantin Ananyev /* unsupported mode, shouldn't be here */ 152*b5458e2cSKonstantin Ananyev RTE_ASSERT(0); 153*b5458e2cSKonstantin Ananyev *free = 0; 154*b5458e2cSKonstantin Ananyev n = 0; 155*b5458e2cSKonstantin Ananyev } 156*b5458e2cSKonstantin Ananyev 157*b5458e2cSKonstantin Ananyev return n; 158*b5458e2cSKonstantin Ananyev } 159*b5458e2cSKonstantin Ananyev 160*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t 161*b5458e2cSKonstantin Ananyev __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num, 162*b5458e2cSKonstantin Ananyev enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st, 163*b5458e2cSKonstantin Ananyev uint32_t *head, uint32_t *next, uint32_t *avail) 164*b5458e2cSKonstantin Ananyev { 165*b5458e2cSKonstantin Ananyev uint32_t n; 166*b5458e2cSKonstantin Ananyev 167*b5458e2cSKonstantin Ananyev switch (st) { 168*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_ST: 169*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_MT: 170*b5458e2cSKonstantin Ananyev n = __rte_ring_headtail_move_head(&r->cons.ht, 171*b5458e2cSKonstantin Ananyev &r->stage[stage].ht, 0, st, num, behavior, 172*b5458e2cSKonstantin Ananyev head, next, avail); 173*b5458e2cSKonstantin Ananyev break; 174*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_MT_RTS: 175*b5458e2cSKonstantin Ananyev n = __rte_ring_rts_move_head(&r->cons.rts, &r->stage[stage].ht, 176*b5458e2cSKonstantin Ananyev 0, num, behavior, head, avail); 177*b5458e2cSKonstantin Ananyev *next = *head + n; 178*b5458e2cSKonstantin Ananyev break; 179*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_MT_HTS: 180*b5458e2cSKonstantin Ananyev n = __rte_ring_hts_move_head(&r->cons.hts, &r->stage[stage].ht, 181*b5458e2cSKonstantin Ananyev 0, num, behavior, head, avail); 182*b5458e2cSKonstantin Ananyev *next = *head + n; 183*b5458e2cSKonstantin Ananyev break; 184*b5458e2cSKonstantin Ananyev default: 185*b5458e2cSKonstantin Ananyev /* unsupported mode, shouldn't be here */ 186*b5458e2cSKonstantin Ananyev RTE_ASSERT(0); 187*b5458e2cSKonstantin Ananyev *avail = 0; 188*b5458e2cSKonstantin Ananyev n = 0; 189*b5458e2cSKonstantin Ananyev } 190*b5458e2cSKonstantin Ananyev 191*b5458e2cSKonstantin Ananyev return n; 192*b5458e2cSKonstantin Ananyev } 193*b5458e2cSKonstantin Ananyev 194*b5458e2cSKonstantin Ananyev static __rte_always_inline void 195*b5458e2cSKonstantin Ananyev __rte_soring_update_tail(struct __rte_ring_headtail *rht, 196*b5458e2cSKonstantin Ananyev enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq) 197*b5458e2cSKonstantin Ananyev { 198*b5458e2cSKonstantin Ananyev uint32_t n; 199*b5458e2cSKonstantin Ananyev 200*b5458e2cSKonstantin Ananyev switch (st) { 201*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_ST: 202*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_MT: 203*b5458e2cSKonstantin Ananyev __rte_ring_update_tail(&rht->ht, head, next, st, enq); 204*b5458e2cSKonstantin Ananyev break; 205*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_MT_RTS: 206*b5458e2cSKonstantin Ananyev __rte_ring_rts_update_tail(&rht->rts); 207*b5458e2cSKonstantin Ananyev break; 208*b5458e2cSKonstantin Ananyev case RTE_RING_SYNC_MT_HTS: 209*b5458e2cSKonstantin Ananyev n = next - head; 210*b5458e2cSKonstantin Ananyev __rte_ring_hts_update_tail(&rht->hts, head, n, enq); 211*b5458e2cSKonstantin Ananyev break; 212*b5458e2cSKonstantin Ananyev default: 213*b5458e2cSKonstantin Ananyev /* unsupported mode, shouldn't be here */ 214*b5458e2cSKonstantin Ananyev RTE_ASSERT(0); 215*b5458e2cSKonstantin Ananyev } 216*b5458e2cSKonstantin Ananyev } 217*b5458e2cSKonstantin Ananyev 218*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t 219*b5458e2cSKonstantin Ananyev __rte_soring_stage_move_head(struct soring_stage_headtail *d, 220*b5458e2cSKonstantin Ananyev const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num, 221*b5458e2cSKonstantin Ananyev enum rte_ring_queue_behavior behavior, 222*b5458e2cSKonstantin Ananyev uint32_t *old_head, uint32_t *new_head, uint32_t *avail) 223*b5458e2cSKonstantin Ananyev { 224*b5458e2cSKonstantin Ananyev uint32_t n, tail; 225*b5458e2cSKonstantin Ananyev 226*b5458e2cSKonstantin Ananyev *old_head = rte_atomic_load_explicit(&d->head, 227*b5458e2cSKonstantin Ananyev rte_memory_order_relaxed); 228*b5458e2cSKonstantin Ananyev 229*b5458e2cSKonstantin Ananyev do { 230*b5458e2cSKonstantin Ananyev n = num; 231*b5458e2cSKonstantin Ananyev 232*b5458e2cSKonstantin Ananyev /* Ensure the head is read before tail */ 233*b5458e2cSKonstantin Ananyev rte_atomic_thread_fence(rte_memory_order_acquire); 234*b5458e2cSKonstantin Ananyev 235*b5458e2cSKonstantin Ananyev tail = rte_atomic_load_explicit(&s->tail, 236*b5458e2cSKonstantin Ananyev rte_memory_order_acquire); 237*b5458e2cSKonstantin Ananyev *avail = capacity + tail - *old_head; 238*b5458e2cSKonstantin Ananyev if (n > *avail) 239*b5458e2cSKonstantin Ananyev n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail; 240*b5458e2cSKonstantin Ananyev if (n == 0) 241*b5458e2cSKonstantin Ananyev return 0; 242*b5458e2cSKonstantin Ananyev *new_head = *old_head + n; 243*b5458e2cSKonstantin Ananyev } while (rte_atomic_compare_exchange_strong_explicit(&d->head, 244*b5458e2cSKonstantin Ananyev old_head, *new_head, rte_memory_order_acq_rel, 245*b5458e2cSKonstantin Ananyev rte_memory_order_relaxed) == 0); 246*b5458e2cSKonstantin Ananyev 247*b5458e2cSKonstantin Ananyev return n; 248*b5458e2cSKonstantin Ananyev } 249*b5458e2cSKonstantin Ananyev 250*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t 251*b5458e2cSKonstantin Ananyev soring_enqueue(struct rte_soring *r, const void *objs, 252*b5458e2cSKonstantin Ananyev const void *meta, uint32_t n, enum rte_ring_queue_behavior behavior, 253*b5458e2cSKonstantin Ananyev uint32_t *free_space) 254*b5458e2cSKonstantin Ananyev { 255*b5458e2cSKonstantin Ananyev enum rte_ring_sync_type st; 256*b5458e2cSKonstantin Ananyev uint32_t nb_free, prod_head, prod_next; 257*b5458e2cSKonstantin Ananyev 258*b5458e2cSKonstantin Ananyev RTE_ASSERT(r != NULL && r->nb_stage > 0); 259*b5458e2cSKonstantin Ananyev RTE_ASSERT(meta == NULL || r->meta != NULL); 260*b5458e2cSKonstantin Ananyev 261*b5458e2cSKonstantin Ananyev st = r->prod.ht.sync_type; 262*b5458e2cSKonstantin Ananyev 263*b5458e2cSKonstantin Ananyev n = __rte_soring_move_prod_head(r, n, behavior, st, 264*b5458e2cSKonstantin Ananyev &prod_head, &prod_next, &nb_free); 265*b5458e2cSKonstantin Ananyev if (n != 0) { 266*b5458e2cSKonstantin Ananyev __rte_ring_do_enqueue_elems(&r[1], objs, r->size, 267*b5458e2cSKonstantin Ananyev prod_head & r->mask, r->esize, n); 268*b5458e2cSKonstantin Ananyev if (meta != NULL) 269*b5458e2cSKonstantin Ananyev __rte_ring_do_enqueue_elems(r->meta, meta, r->size, 270*b5458e2cSKonstantin Ananyev prod_head & r->mask, r->msize, n); 271*b5458e2cSKonstantin Ananyev __rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1); 272*b5458e2cSKonstantin Ananyev } 273*b5458e2cSKonstantin Ananyev 274*b5458e2cSKonstantin Ananyev if (free_space != NULL) 275*b5458e2cSKonstantin Ananyev *free_space = nb_free - n; 276*b5458e2cSKonstantin Ananyev return n; 277*b5458e2cSKonstantin Ananyev } 278*b5458e2cSKonstantin Ananyev 279*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t 280*b5458e2cSKonstantin Ananyev soring_dequeue(struct rte_soring *r, void *objs, void *meta, 281*b5458e2cSKonstantin Ananyev uint32_t num, enum rte_ring_queue_behavior behavior, 282*b5458e2cSKonstantin Ananyev uint32_t *available) 283*b5458e2cSKonstantin Ananyev { 284*b5458e2cSKonstantin Ananyev enum rte_ring_sync_type st; 285*b5458e2cSKonstantin Ananyev uint32_t entries, cons_head, cons_next, n, ns, reqn; 286*b5458e2cSKonstantin Ananyev 287*b5458e2cSKonstantin Ananyev RTE_ASSERT(r != NULL && r->nb_stage > 0); 288*b5458e2cSKonstantin Ananyev RTE_ASSERT(meta == NULL || r->meta != NULL); 289*b5458e2cSKonstantin Ananyev 290*b5458e2cSKonstantin Ananyev ns = r->nb_stage - 1; 291*b5458e2cSKonstantin Ananyev st = r->cons.ht.sync_type; 292*b5458e2cSKonstantin Ananyev 293*b5458e2cSKonstantin Ananyev /* try to grab exactly @num elems first */ 294*b5458e2cSKonstantin Ananyev n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st, 295*b5458e2cSKonstantin Ananyev &cons_head, &cons_next, &entries); 296*b5458e2cSKonstantin Ananyev if (n == 0) { 297*b5458e2cSKonstantin Ananyev /* try to finalize some elems from previous stage */ 298*b5458e2cSKonstantin Ananyev n = __rte_soring_stage_finalize(&r->stage[ns].sht, ns, 299*b5458e2cSKonstantin Ananyev r->state, r->mask, 2 * num); 300*b5458e2cSKonstantin Ananyev entries += n; 301*b5458e2cSKonstantin Ananyev 302*b5458e2cSKonstantin Ananyev /* repeat attempt to grab elems */ 303*b5458e2cSKonstantin Ananyev reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0; 304*b5458e2cSKonstantin Ananyev if (entries >= reqn) 305*b5458e2cSKonstantin Ananyev n = __rte_soring_move_cons_head(r, ns, num, behavior, 306*b5458e2cSKonstantin Ananyev st, &cons_head, &cons_next, &entries); 307*b5458e2cSKonstantin Ananyev else 308*b5458e2cSKonstantin Ananyev n = 0; 309*b5458e2cSKonstantin Ananyev } 310*b5458e2cSKonstantin Ananyev 311*b5458e2cSKonstantin Ananyev /* we have some elems to consume */ 312*b5458e2cSKonstantin Ananyev if (n != 0) { 313*b5458e2cSKonstantin Ananyev __rte_ring_do_dequeue_elems(objs, &r[1], r->size, 314*b5458e2cSKonstantin Ananyev cons_head & r->mask, r->esize, n); 315*b5458e2cSKonstantin Ananyev if (meta != NULL) 316*b5458e2cSKonstantin Ananyev __rte_ring_do_dequeue_elems(meta, r->meta, r->size, 317*b5458e2cSKonstantin Ananyev cons_head & r->mask, r->msize, n); 318*b5458e2cSKonstantin Ananyev __rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0); 319*b5458e2cSKonstantin Ananyev } 320*b5458e2cSKonstantin Ananyev 321*b5458e2cSKonstantin Ananyev if (available != NULL) 322*b5458e2cSKonstantin Ananyev *available = entries - n; 323*b5458e2cSKonstantin Ananyev return n; 324*b5458e2cSKonstantin Ananyev } 325*b5458e2cSKonstantin Ananyev 326*b5458e2cSKonstantin Ananyev /* 327*b5458e2cSKonstantin Ananyev * Verify internal SORING state. 328*b5458e2cSKonstantin Ananyev * WARNING: if expected value is not equal to actual one, it means that for 329*b5458e2cSKonstantin Ananyev * whatever reason SORING data constancy is broken. That is a very serious 330*b5458e2cSKonstantin Ananyev * problem that most likely will cause race-conditions, memory corruption, 331*b5458e2cSKonstantin Ananyev * program crash. 332*b5458e2cSKonstantin Ananyev * To ease debugging it user might rebuild ring library with 333*b5458e2cSKonstantin Ananyev * RTE_SORING_DEBUG enabled. 334*b5458e2cSKonstantin Ananyev */ 335*b5458e2cSKonstantin Ananyev static __rte_always_inline void 336*b5458e2cSKonstantin Ananyev soring_verify_state(const struct rte_soring *r, uint32_t stage, uint32_t idx, 337*b5458e2cSKonstantin Ananyev const char *msg, union soring_state val, union soring_state exp) 338*b5458e2cSKonstantin Ananyev { 339*b5458e2cSKonstantin Ananyev if (val.raw != exp.raw) { 340*b5458e2cSKonstantin Ananyev #ifdef RTE_SORING_DEBUG 341*b5458e2cSKonstantin Ananyev rte_soring_dump(stderr, r); 342*b5458e2cSKonstantin Ananyev rte_panic("line:%d from:%s: soring=%p, stage=%#x, idx=%#x, " 343*b5458e2cSKonstantin Ananyev "expected={.stnum=%#x, .ftoken=%#x}, " 344*b5458e2cSKonstantin Ananyev "actual={.stnum=%#x, .ftoken=%#x};\n", 345*b5458e2cSKonstantin Ananyev __LINE__, msg, r, stage, idx, 346*b5458e2cSKonstantin Ananyev exp.stnum, exp.ftoken, 347*b5458e2cSKonstantin Ananyev val.stnum, val.ftoken); 348*b5458e2cSKonstantin Ananyev #else 349*b5458e2cSKonstantin Ananyev SORING_LOG(EMERG, "from:%s: soring=%p, stage=%#x, idx=%#x, " 350*b5458e2cSKonstantin Ananyev "expected={.stnum=%#x, .ftoken=%#x}, " 351*b5458e2cSKonstantin Ananyev "actual={.stnum=%#x, .ftoken=%#x};", 352*b5458e2cSKonstantin Ananyev msg, r, stage, idx, 353*b5458e2cSKonstantin Ananyev exp.stnum, exp.ftoken, 354*b5458e2cSKonstantin Ananyev val.stnum, val.ftoken); 355*b5458e2cSKonstantin Ananyev #endif 356*b5458e2cSKonstantin Ananyev } 357*b5458e2cSKonstantin Ananyev } 358*b5458e2cSKonstantin Ananyev 359*b5458e2cSKonstantin Ananyev /* check and update state ring at acquire op*/ 360*b5458e2cSKonstantin Ananyev static __rte_always_inline void 361*b5458e2cSKonstantin Ananyev acquire_state_update(const struct rte_soring *r, uint32_t stage, uint32_t idx, 362*b5458e2cSKonstantin Ananyev uint32_t ftoken, uint32_t num) 363*b5458e2cSKonstantin Ananyev { 364*b5458e2cSKonstantin Ananyev union soring_state st; 365*b5458e2cSKonstantin Ananyev const union soring_state est = {.raw = 0}; 366*b5458e2cSKonstantin Ananyev 367*b5458e2cSKonstantin Ananyev st.raw = rte_atomic_load_explicit(&r->state[idx].raw, 368*b5458e2cSKonstantin Ananyev rte_memory_order_relaxed); 369*b5458e2cSKonstantin Ananyev soring_verify_state(r, stage, idx, __func__, st, est); 370*b5458e2cSKonstantin Ananyev 371*b5458e2cSKonstantin Ananyev st.ftoken = ftoken; 372*b5458e2cSKonstantin Ananyev st.stnum = (SORING_ST_START | num); 373*b5458e2cSKonstantin Ananyev 374*b5458e2cSKonstantin Ananyev rte_atomic_store_explicit(&r->state[idx].raw, st.raw, 375*b5458e2cSKonstantin Ananyev rte_memory_order_relaxed); 376*b5458e2cSKonstantin Ananyev } 377*b5458e2cSKonstantin Ananyev 378*b5458e2cSKonstantin Ananyev static __rte_always_inline uint32_t 379*b5458e2cSKonstantin Ananyev soring_acquire(struct rte_soring *r, void *objs, void *meta, 380*b5458e2cSKonstantin Ananyev uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior, 381*b5458e2cSKonstantin Ananyev uint32_t *ftoken, uint32_t *available) 382*b5458e2cSKonstantin Ananyev { 383*b5458e2cSKonstantin Ananyev uint32_t avail, head, idx, n, next, reqn; 384*b5458e2cSKonstantin Ananyev struct soring_stage *pstg; 385*b5458e2cSKonstantin Ananyev struct soring_stage_headtail *cons; 386*b5458e2cSKonstantin Ananyev 387*b5458e2cSKonstantin Ananyev RTE_ASSERT(r != NULL && stage < r->nb_stage); 388*b5458e2cSKonstantin Ananyev RTE_ASSERT(meta == NULL || r->meta != NULL); 389*b5458e2cSKonstantin Ananyev 390*b5458e2cSKonstantin Ananyev cons = &r->stage[stage].sht; 391*b5458e2cSKonstantin Ananyev 392*b5458e2cSKonstantin Ananyev if (stage == 0) 393*b5458e2cSKonstantin Ananyev n = __rte_soring_stage_move_head(cons, &r->prod.ht, 0, num, 394*b5458e2cSKonstantin Ananyev behavior, &head, &next, &avail); 395*b5458e2cSKonstantin Ananyev else { 396*b5458e2cSKonstantin Ananyev pstg = r->stage + stage - 1; 397*b5458e2cSKonstantin Ananyev 398*b5458e2cSKonstantin Ananyev /* try to grab exactly @num elems */ 399*b5458e2cSKonstantin Ananyev n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num, 400*b5458e2cSKonstantin Ananyev RTE_RING_QUEUE_FIXED, &head, &next, &avail); 401*b5458e2cSKonstantin Ananyev if (n == 0) { 402*b5458e2cSKonstantin Ananyev /* try to finalize some elems from previous stage */ 403*b5458e2cSKonstantin Ananyev n = __rte_soring_stage_finalize(&pstg->sht, stage - 1, 404*b5458e2cSKonstantin Ananyev r->state, r->mask, 2 * num); 405*b5458e2cSKonstantin Ananyev avail += n; 406*b5458e2cSKonstantin Ananyev 407*b5458e2cSKonstantin Ananyev /* repeat attempt to grab elems */ 408*b5458e2cSKonstantin Ananyev reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0; 409*b5458e2cSKonstantin Ananyev if (avail >= reqn) 410*b5458e2cSKonstantin Ananyev n = __rte_soring_stage_move_head(cons, 411*b5458e2cSKonstantin Ananyev &pstg->ht, 0, num, behavior, &head, 412*b5458e2cSKonstantin Ananyev &next, &avail); 413*b5458e2cSKonstantin Ananyev else 414*b5458e2cSKonstantin Ananyev n = 0; 415*b5458e2cSKonstantin Ananyev } 416*b5458e2cSKonstantin Ananyev } 417*b5458e2cSKonstantin Ananyev 418*b5458e2cSKonstantin Ananyev if (n != 0) { 419*b5458e2cSKonstantin Ananyev 420*b5458e2cSKonstantin Ananyev idx = head & r->mask; 421*b5458e2cSKonstantin Ananyev *ftoken = SORING_FTKN_MAKE(head, stage); 422*b5458e2cSKonstantin Ananyev 423*b5458e2cSKonstantin Ananyev /* check and update state value */ 424*b5458e2cSKonstantin Ananyev acquire_state_update(r, stage, idx, *ftoken, n); 425*b5458e2cSKonstantin Ananyev 426*b5458e2cSKonstantin Ananyev /* copy elems that are ready for given stage */ 427*b5458e2cSKonstantin Ananyev __rte_ring_do_dequeue_elems(objs, &r[1], r->size, idx, 428*b5458e2cSKonstantin Ananyev r->esize, n); 429*b5458e2cSKonstantin Ananyev if (meta != NULL) 430*b5458e2cSKonstantin Ananyev __rte_ring_do_dequeue_elems(meta, r->meta, 431*b5458e2cSKonstantin Ananyev r->size, idx, r->msize, n); 432*b5458e2cSKonstantin Ananyev } 433*b5458e2cSKonstantin Ananyev 434*b5458e2cSKonstantin Ananyev if (available != NULL) 435*b5458e2cSKonstantin Ananyev *available = avail - n; 436*b5458e2cSKonstantin Ananyev return n; 437*b5458e2cSKonstantin Ananyev } 438*b5458e2cSKonstantin Ananyev 439*b5458e2cSKonstantin Ananyev static __rte_always_inline void 440*b5458e2cSKonstantin Ananyev soring_release(struct rte_soring *r, const void *objs, 441*b5458e2cSKonstantin Ananyev const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken) 442*b5458e2cSKonstantin Ananyev { 443*b5458e2cSKonstantin Ananyev uint32_t idx, pos, tail; 444*b5458e2cSKonstantin Ananyev struct soring_stage *stg; 445*b5458e2cSKonstantin Ananyev union soring_state st; 446*b5458e2cSKonstantin Ananyev 447*b5458e2cSKonstantin Ananyev const union soring_state est = { 448*b5458e2cSKonstantin Ananyev .stnum = (SORING_ST_START | n), 449*b5458e2cSKonstantin Ananyev .ftoken = ftoken, 450*b5458e2cSKonstantin Ananyev }; 451*b5458e2cSKonstantin Ananyev 452*b5458e2cSKonstantin Ananyev RTE_ASSERT(r != NULL && stage < r->nb_stage); 453*b5458e2cSKonstantin Ananyev RTE_ASSERT(meta == NULL || r->meta != NULL); 454*b5458e2cSKonstantin Ananyev 455*b5458e2cSKonstantin Ananyev stg = r->stage + stage; 456*b5458e2cSKonstantin Ananyev 457*b5458e2cSKonstantin Ananyev pos = SORING_FTKN_POS(ftoken, stage); 458*b5458e2cSKonstantin Ananyev idx = pos & r->mask; 459*b5458e2cSKonstantin Ananyev st.raw = rte_atomic_load_explicit(&r->state[idx].raw, 460*b5458e2cSKonstantin Ananyev rte_memory_order_relaxed); 461*b5458e2cSKonstantin Ananyev 462*b5458e2cSKonstantin Ananyev /* check state ring contents */ 463*b5458e2cSKonstantin Ananyev soring_verify_state(r, stage, idx, __func__, st, est); 464*b5458e2cSKonstantin Ananyev 465*b5458e2cSKonstantin Ananyev /* update contents of the ring, if necessary */ 466*b5458e2cSKonstantin Ananyev if (objs != NULL) 467*b5458e2cSKonstantin Ananyev __rte_ring_do_enqueue_elems(&r[1], objs, r->size, idx, 468*b5458e2cSKonstantin Ananyev r->esize, n); 469*b5458e2cSKonstantin Ananyev if (meta != NULL) 470*b5458e2cSKonstantin Ananyev __rte_ring_do_enqueue_elems(r->meta, meta, r->size, idx, 471*b5458e2cSKonstantin Ananyev r->msize, n); 472*b5458e2cSKonstantin Ananyev 473*b5458e2cSKonstantin Ananyev /* set state to FINISH, make sure it is not reordered */ 474*b5458e2cSKonstantin Ananyev rte_atomic_thread_fence(rte_memory_order_release); 475*b5458e2cSKonstantin Ananyev 476*b5458e2cSKonstantin Ananyev st.stnum = SORING_ST_FINISH | n; 477*b5458e2cSKonstantin Ananyev rte_atomic_store_explicit(&r->state[idx].raw, st.raw, 478*b5458e2cSKonstantin Ananyev rte_memory_order_relaxed); 479*b5458e2cSKonstantin Ananyev 480*b5458e2cSKonstantin Ananyev /* try to do finalize(), if appropriate */ 481*b5458e2cSKonstantin Ananyev tail = rte_atomic_load_explicit(&stg->sht.tail.pos, 482*b5458e2cSKonstantin Ananyev rte_memory_order_relaxed); 483*b5458e2cSKonstantin Ananyev if (tail == pos) 484*b5458e2cSKonstantin Ananyev __rte_soring_stage_finalize(&stg->sht, stage, r->state, r->mask, 485*b5458e2cSKonstantin Ananyev r->capacity); 486*b5458e2cSKonstantin Ananyev } 487*b5458e2cSKonstantin Ananyev 488*b5458e2cSKonstantin Ananyev /* 489*b5458e2cSKonstantin Ananyev * Public functions (data-path) start here. 490*b5458e2cSKonstantin Ananyev */ 491*b5458e2cSKonstantin Ananyev 492*b5458e2cSKonstantin Ananyev void 493*b5458e2cSKonstantin Ananyev rte_soring_release(struct rte_soring *r, const void *objs, 494*b5458e2cSKonstantin Ananyev uint32_t stage, uint32_t n, uint32_t ftoken) 495*b5458e2cSKonstantin Ananyev { 496*b5458e2cSKonstantin Ananyev soring_release(r, objs, NULL, stage, n, ftoken); 497*b5458e2cSKonstantin Ananyev } 498*b5458e2cSKonstantin Ananyev 499*b5458e2cSKonstantin Ananyev 500*b5458e2cSKonstantin Ananyev void 501*b5458e2cSKonstantin Ananyev rte_soring_releasx(struct rte_soring *r, const void *objs, 502*b5458e2cSKonstantin Ananyev const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken) 503*b5458e2cSKonstantin Ananyev { 504*b5458e2cSKonstantin Ananyev soring_release(r, objs, meta, stage, n, ftoken); 505*b5458e2cSKonstantin Ananyev } 506*b5458e2cSKonstantin Ananyev 507*b5458e2cSKonstantin Ananyev uint32_t 508*b5458e2cSKonstantin Ananyev rte_soring_enqueue_bulk(struct rte_soring *r, const void *objs, uint32_t n, 509*b5458e2cSKonstantin Ananyev uint32_t *free_space) 510*b5458e2cSKonstantin Ananyev { 511*b5458e2cSKonstantin Ananyev return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_FIXED, 512*b5458e2cSKonstantin Ananyev free_space); 513*b5458e2cSKonstantin Ananyev } 514*b5458e2cSKonstantin Ananyev 515*b5458e2cSKonstantin Ananyev uint32_t 516*b5458e2cSKonstantin Ananyev rte_soring_enqueux_bulk(struct rte_soring *r, const void *objs, 517*b5458e2cSKonstantin Ananyev const void *meta, uint32_t n, uint32_t *free_space) 518*b5458e2cSKonstantin Ananyev { 519*b5458e2cSKonstantin Ananyev return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_FIXED, 520*b5458e2cSKonstantin Ananyev free_space); 521*b5458e2cSKonstantin Ananyev } 522*b5458e2cSKonstantin Ananyev 523*b5458e2cSKonstantin Ananyev uint32_t 524*b5458e2cSKonstantin Ananyev rte_soring_enqueue_burst(struct rte_soring *r, const void *objs, uint32_t n, 525*b5458e2cSKonstantin Ananyev uint32_t *free_space) 526*b5458e2cSKonstantin Ananyev { 527*b5458e2cSKonstantin Ananyev return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_VARIABLE, 528*b5458e2cSKonstantin Ananyev free_space); 529*b5458e2cSKonstantin Ananyev } 530*b5458e2cSKonstantin Ananyev 531*b5458e2cSKonstantin Ananyev uint32_t 532*b5458e2cSKonstantin Ananyev rte_soring_enqueux_burst(struct rte_soring *r, const void *objs, 533*b5458e2cSKonstantin Ananyev const void *meta, uint32_t n, uint32_t *free_space) 534*b5458e2cSKonstantin Ananyev { 535*b5458e2cSKonstantin Ananyev return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_VARIABLE, 536*b5458e2cSKonstantin Ananyev free_space); 537*b5458e2cSKonstantin Ananyev } 538*b5458e2cSKonstantin Ananyev 539*b5458e2cSKonstantin Ananyev uint32_t 540*b5458e2cSKonstantin Ananyev rte_soring_dequeue_bulk(struct rte_soring *r, void *objs, uint32_t num, 541*b5458e2cSKonstantin Ananyev uint32_t *available) 542*b5458e2cSKonstantin Ananyev { 543*b5458e2cSKonstantin Ananyev return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_FIXED, 544*b5458e2cSKonstantin Ananyev available); 545*b5458e2cSKonstantin Ananyev } 546*b5458e2cSKonstantin Ananyev 547*b5458e2cSKonstantin Ananyev uint32_t 548*b5458e2cSKonstantin Ananyev rte_soring_dequeux_bulk(struct rte_soring *r, void *objs, void *meta, 549*b5458e2cSKonstantin Ananyev uint32_t num, uint32_t *available) 550*b5458e2cSKonstantin Ananyev { 551*b5458e2cSKonstantin Ananyev return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_FIXED, 552*b5458e2cSKonstantin Ananyev available); 553*b5458e2cSKonstantin Ananyev } 554*b5458e2cSKonstantin Ananyev 555*b5458e2cSKonstantin Ananyev uint32_t 556*b5458e2cSKonstantin Ananyev rte_soring_dequeue_burst(struct rte_soring *r, void *objs, uint32_t num, 557*b5458e2cSKonstantin Ananyev uint32_t *available) 558*b5458e2cSKonstantin Ananyev { 559*b5458e2cSKonstantin Ananyev return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_VARIABLE, 560*b5458e2cSKonstantin Ananyev available); 561*b5458e2cSKonstantin Ananyev } 562*b5458e2cSKonstantin Ananyev 563*b5458e2cSKonstantin Ananyev uint32_t 564*b5458e2cSKonstantin Ananyev rte_soring_dequeux_burst(struct rte_soring *r, void *objs, void *meta, 565*b5458e2cSKonstantin Ananyev uint32_t num, uint32_t *available) 566*b5458e2cSKonstantin Ananyev { 567*b5458e2cSKonstantin Ananyev return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_VARIABLE, 568*b5458e2cSKonstantin Ananyev available); 569*b5458e2cSKonstantin Ananyev } 570*b5458e2cSKonstantin Ananyev 571*b5458e2cSKonstantin Ananyev uint32_t 572*b5458e2cSKonstantin Ananyev rte_soring_acquire_bulk(struct rte_soring *r, void *objs, 573*b5458e2cSKonstantin Ananyev uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available) 574*b5458e2cSKonstantin Ananyev { 575*b5458e2cSKonstantin Ananyev return soring_acquire(r, objs, NULL, stage, num, 576*b5458e2cSKonstantin Ananyev RTE_RING_QUEUE_FIXED, ftoken, available); 577*b5458e2cSKonstantin Ananyev } 578*b5458e2cSKonstantin Ananyev 579*b5458e2cSKonstantin Ananyev uint32_t 580*b5458e2cSKonstantin Ananyev rte_soring_acquirx_bulk(struct rte_soring *r, void *objs, void *meta, 581*b5458e2cSKonstantin Ananyev uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available) 582*b5458e2cSKonstantin Ananyev { 583*b5458e2cSKonstantin Ananyev return soring_acquire(r, objs, meta, stage, num, 584*b5458e2cSKonstantin Ananyev RTE_RING_QUEUE_FIXED, ftoken, available); 585*b5458e2cSKonstantin Ananyev } 586*b5458e2cSKonstantin Ananyev 587*b5458e2cSKonstantin Ananyev uint32_t 588*b5458e2cSKonstantin Ananyev rte_soring_acquire_burst(struct rte_soring *r, void *objs, 589*b5458e2cSKonstantin Ananyev uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available) 590*b5458e2cSKonstantin Ananyev { 591*b5458e2cSKonstantin Ananyev return soring_acquire(r, objs, NULL, stage, num, 592*b5458e2cSKonstantin Ananyev RTE_RING_QUEUE_VARIABLE, ftoken, available); 593*b5458e2cSKonstantin Ananyev } 594*b5458e2cSKonstantin Ananyev 595*b5458e2cSKonstantin Ananyev uint32_t 596*b5458e2cSKonstantin Ananyev rte_soring_acquirx_burst(struct rte_soring *r, void *objs, void *meta, 597*b5458e2cSKonstantin Ananyev uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available) 598*b5458e2cSKonstantin Ananyev { 599*b5458e2cSKonstantin Ananyev return soring_acquire(r, objs, meta, stage, num, 600*b5458e2cSKonstantin Ananyev RTE_RING_QUEUE_VARIABLE, ftoken, available); 601*b5458e2cSKonstantin Ananyev } 602*b5458e2cSKonstantin Ananyev 603*b5458e2cSKonstantin Ananyev unsigned int 604*b5458e2cSKonstantin Ananyev rte_soring_count(const struct rte_soring *r) 605*b5458e2cSKonstantin Ananyev { 606*b5458e2cSKonstantin Ananyev uint32_t prod_tail = r->prod.ht.tail; 607*b5458e2cSKonstantin Ananyev uint32_t cons_tail = r->cons.ht.tail; 608*b5458e2cSKonstantin Ananyev uint32_t count = (prod_tail - cons_tail) & r->mask; 609*b5458e2cSKonstantin Ananyev return (count > r->capacity) ? r->capacity : count; 610*b5458e2cSKonstantin Ananyev } 611*b5458e2cSKonstantin Ananyev 612*b5458e2cSKonstantin Ananyev unsigned int 613*b5458e2cSKonstantin Ananyev rte_soring_free_count(const struct rte_soring *r) 614*b5458e2cSKonstantin Ananyev { 615*b5458e2cSKonstantin Ananyev return r->capacity - rte_soring_count(r); 616*b5458e2cSKonstantin Ananyev } 617