1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2017 Intel Corporation 3 */ 4 5 #include <rte_atomic.h> 6 #include <rte_cycles.h> 7 #include <rte_event_ring.h> 8 9 #include "sw_evdev.h" 10 11 #define PORT_ENQUEUE_MAX_BURST_SIZE 64 12 13 static inline void 14 sw_event_release(struct sw_port *p, uint8_t index) 15 { 16 /* 17 * Drops the next outstanding event in our history. Used on dequeue 18 * to clear any history before dequeuing more events. 19 */ 20 RTE_SET_USED(index); 21 22 /* create drop message */ 23 struct rte_event ev; 24 ev.op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE]; 25 26 uint16_t free_count; 27 rte_event_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count); 28 29 /* each release returns one credit */ 30 p->outstanding_releases--; 31 p->inflight_credits++; 32 } 33 34 /* 35 * special-case of rte_event_ring enqueue, with overriding the ops member on 36 * the events that get written to the ring. 37 */ 38 static inline unsigned int 39 enqueue_burst_with_ops(struct rte_event_ring *r, const struct rte_event *events, 40 unsigned int n, uint8_t *ops) 41 { 42 struct rte_event tmp_evs[PORT_ENQUEUE_MAX_BURST_SIZE]; 43 unsigned int i; 44 45 memcpy(tmp_evs, events, n * sizeof(events[0])); 46 for (i = 0; i < n; i++) 47 tmp_evs[i].op = ops[i]; 48 49 return rte_event_ring_enqueue_burst(r, tmp_evs, n, NULL); 50 } 51 52 uint16_t 53 sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) 54 { 55 int32_t i; 56 uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE]; 57 struct sw_port *p = port; 58 struct sw_evdev *sw = (void *)p->sw; 59 uint32_t sw_inflights = rte_atomic32_read(&sw->inflights); 60 uint32_t credit_update_quanta = sw->credit_update_quanta; 61 int new = 0; 62 63 if (num > PORT_ENQUEUE_MAX_BURST_SIZE) 64 num = PORT_ENQUEUE_MAX_BURST_SIZE; 65 66 for (i = 0; i < num; i++) 67 new += (ev[i].op == RTE_EVENT_OP_NEW); 68 69 if (unlikely(new > 0 && p->inflight_max < sw_inflights)) 70 return 0; 71 72 if (p->inflight_credits < new) { 73 /* check if event enqueue brings port over max threshold */ 74 if (sw_inflights + credit_update_quanta > sw->nb_events_limit) 75 return 0; 76 77 rte_atomic32_add(&sw->inflights, credit_update_quanta); 78 p->inflight_credits += (credit_update_quanta); 79 80 /* If there are fewer inflight credits than new events, limit 81 * the number of enqueued events. 82 */ 83 num = (p->inflight_credits < new) ? p->inflight_credits : new; 84 } 85 86 for (i = 0; i < num; i++) { 87 int op = ev[i].op; 88 int outstanding = p->outstanding_releases > 0; 89 const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count); 90 91 p->inflight_credits -= (op == RTE_EVENT_OP_NEW); 92 p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) * 93 outstanding; 94 95 new_ops[i] = sw_qe_flag_map[op]; 96 new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT); 97 98 /* FWD and RELEASE packets will both resolve to taken (assuming 99 * correct usage of the API), providing very high correct 100 * prediction rate. 101 */ 102 if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) 103 p->outstanding_releases--; 104 105 /* error case: branch to avoid touching p->stats */ 106 if (unlikely(invalid_qid && op != RTE_EVENT_OP_RELEASE)) { 107 p->stats.rx_dropped++; 108 p->inflight_credits++; 109 } 110 } 111 112 /* returns number of events actually enqueued */ 113 uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i, 114 new_ops); 115 if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) { 116 uint64_t burst_ticks = rte_get_timer_cycles() - 117 p->last_dequeue_ticks; 118 uint64_t burst_pkt_ticks = 119 burst_ticks / p->last_dequeue_burst_sz; 120 p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES; 121 p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES; 122 p->last_dequeue_ticks = 0; 123 } 124 125 /* Replenish credits if enough releases are performed */ 126 if (p->inflight_credits >= credit_update_quanta * 2) { 127 rte_atomic32_sub(&sw->inflights, credit_update_quanta); 128 p->inflight_credits -= credit_update_quanta; 129 } 130 131 return enq; 132 } 133 134 uint16_t 135 sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, 136 uint64_t wait) 137 { 138 RTE_SET_USED(wait); 139 struct sw_port *p = (void *)port; 140 struct rte_event_ring *ring = p->cq_worker_ring; 141 142 /* check that all previous dequeues have been released */ 143 if (p->implicit_release) { 144 struct sw_evdev *sw = (void *)p->sw; 145 uint32_t credit_update_quanta = sw->credit_update_quanta; 146 uint16_t out_rels = p->outstanding_releases; 147 uint16_t i; 148 for (i = 0; i < out_rels; i++) 149 sw_event_release(p, i); 150 151 /* Replenish credits if enough releases are performed */ 152 if (p->inflight_credits >= credit_update_quanta * 2) { 153 rte_atomic32_sub(&sw->inflights, credit_update_quanta); 154 p->inflight_credits -= credit_update_quanta; 155 } 156 } 157 158 /* returns number of events actually dequeued */ 159 uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL); 160 if (unlikely(ndeq == 0)) { 161 p->zero_polls++; 162 p->total_polls++; 163 goto end; 164 } 165 166 p->outstanding_releases += ndeq; 167 p->last_dequeue_burst_sz = ndeq; 168 p->last_dequeue_ticks = rte_get_timer_cycles(); 169 p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++; 170 p->total_polls++; 171 172 end: 173 return ndeq; 174 } 175