1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2016-2017 Intel Corporation
3 */
4
5 #include <rte_ring.h>
6 #include <rte_hash_crc.h>
7 #include <rte_event_ring.h>
8 #include "sw_evdev.h"
9 #include "iq_chunk.h"
10 #include "event_ring.h"
11
12 #define SW_IQS_MASK (SW_IQS_MAX-1)
13
14 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
15 * CLZ twice is faster than caching the value due to data dependencies
16 */
17 #define PKT_MASK_TO_IQ(pkts) \
18 (rte_ctz32(pkts | (1 << SW_IQS_MAX)))
19
20 #if SW_IQS_MAX != 4
21 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
22 #endif
23 #define PRIO_TO_IQ(prio) (prio >> 6)
24
25 #define MAX_PER_IQ_DEQUEUE 48
26 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
27 /* use cheap bit mixing, we only need to lose a few bits */
28 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
29
30
31 static inline uint32_t
sw_schedule_atomic_to_cq(struct sw_evdev * sw,struct sw_qid * const qid,uint32_t iq_num,unsigned int count)32 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
33 uint32_t iq_num, unsigned int count)
34 {
35 struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
36 struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
37 uint32_t nb_blocked = 0;
38 uint32_t i;
39
40 if (count > MAX_PER_IQ_DEQUEUE)
41 count = MAX_PER_IQ_DEQUEUE;
42
43 /* This is the QID ID. The QID ID is static, hence it can be
44 * used to identify the stage of processing in history lists etc
45 */
46 uint32_t qid_id = qid->id;
47
48 iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
49 for (i = 0; i < count; i++) {
50 const struct rte_event *qe = &qes[i];
51 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
52 struct sw_fid_t *fid = &qid->fids[flow_id];
53 int cq = fid->cq;
54
55 if (cq < 0) {
56 uint32_t cq_idx;
57 if (qid->cq_next_tx >= qid->cq_num_mapped_cqs)
58 qid->cq_next_tx = 0;
59 cq_idx = qid->cq_next_tx++;
60
61 cq = qid->cq_map[cq_idx];
62
63 /* find least used */
64 int cq_free_cnt = sw->cq_ring_space[cq];
65 for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
66 cq_idx++) {
67 int test_cq = qid->cq_map[cq_idx];
68 int test_cq_free = sw->cq_ring_space[test_cq];
69 if (test_cq_free > cq_free_cnt) {
70 cq = test_cq;
71 cq_free_cnt = test_cq_free;
72 }
73 }
74
75 fid->cq = cq; /* this pins early */
76 }
77
78 if (sw->cq_ring_space[cq] == 0 ||
79 sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
80 blocked_qes[nb_blocked++] = *qe;
81 continue;
82 }
83
84 struct sw_port *p = &sw->ports[cq];
85
86 /* at this point we can queue up the packet on the cq_buf */
87 fid->pcount++;
88 p->cq_buf[p->cq_buf_count++] = *qe;
89 p->inflights++;
90 sw->cq_ring_space[cq]--;
91
92 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
93 p->hist_list[head] = (struct sw_hist_list_entry) {
94 .qid = qid_id,
95 .fid = flow_id,
96 };
97
98 p->stats.tx_pkts++;
99 qid->stats.tx_pkts++;
100 qid->to_port[cq]++;
101
102 /* if we just filled in the last slot, flush the buffer */
103 if (sw->cq_ring_space[cq] == 0) {
104 struct rte_event_ring *worker = p->cq_worker_ring;
105 rte_event_ring_enqueue_burst(worker, p->cq_buf,
106 p->cq_buf_count,
107 &sw->cq_ring_space[cq]);
108 p->cq_buf_count = 0;
109 }
110 }
111 iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
112
113 return count - nb_blocked;
114 }
115
116 static inline uint32_t
sw_schedule_parallel_to_cq(struct sw_evdev * sw,struct sw_qid * const qid,uint32_t iq_num,unsigned int count,int keep_order)117 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
118 uint32_t iq_num, unsigned int count, int keep_order)
119 {
120 uint32_t i;
121 uint32_t cq_idx = qid->cq_next_tx;
122
123 /* This is the QID ID. The QID ID is static, hence it can be
124 * used to identify the stage of processing in history lists etc
125 */
126 uint32_t qid_id = qid->id;
127
128 if (count > MAX_PER_IQ_DEQUEUE)
129 count = MAX_PER_IQ_DEQUEUE;
130
131 if (keep_order)
132 /* only schedule as many as we have reorder buffer entries */
133 count = RTE_MIN(count,
134 rob_ring_count(qid->reorder_buffer_freelist));
135
136 for (i = 0; i < count; i++) {
137 const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
138 uint32_t cq_check_count = 0;
139 uint32_t cq;
140
141 /*
142 * for parallel, just send to next available CQ in round-robin
143 * fashion. So scan for an available CQ. If all CQs are full
144 * just return and move on to next QID
145 */
146 do {
147 if (++cq_check_count > qid->cq_num_mapped_cqs)
148 goto exit;
149 if (cq_idx >= qid->cq_num_mapped_cqs)
150 cq_idx = 0;
151 cq = qid->cq_map[cq_idx++];
152
153 } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST ||
154 rte_event_ring_free_count(
155 sw->ports[cq].cq_worker_ring) == 0);
156
157 struct sw_port *p = &sw->ports[cq];
158 if (sw->cq_ring_space[cq] == 0 ||
159 p->inflights == SW_PORT_HIST_LIST)
160 break;
161
162 sw->cq_ring_space[cq]--;
163
164 qid->stats.tx_pkts++;
165
166 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
167 p->hist_list[head] = (struct sw_hist_list_entry) {
168 .qid = qid_id,
169 .fid = SW_HASH_FLOWID(qe->flow_id),
170 };
171
172 if (keep_order)
173 rob_ring_dequeue(qid->reorder_buffer_freelist,
174 (void *)&p->hist_list[head].rob_entry);
175
176 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
177 iq_pop(sw, &qid->iq[iq_num]);
178
179 rte_compiler_barrier();
180 p->inflights++;
181 p->stats.tx_pkts++;
182 p->hist_head++;
183 }
184 exit:
185 qid->cq_next_tx = cq_idx;
186 return i;
187 }
188
189 static uint32_t
sw_schedule_dir_to_cq(struct sw_evdev * sw,struct sw_qid * const qid,uint32_t iq_num,unsigned int count __rte_unused)190 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
191 uint32_t iq_num, unsigned int count __rte_unused)
192 {
193 uint32_t cq_id = qid->cq_map[0];
194 struct sw_port *port = &sw->ports[cq_id];
195
196 /* get max burst enq size for cq_ring */
197 uint32_t count_free = sw->cq_ring_space[cq_id];
198 if (count_free == 0)
199 return 0;
200
201 /* burst dequeue from the QID IQ ring */
202 struct sw_iq *iq = &qid->iq[iq_num];
203 uint32_t ret = iq_dequeue_burst(sw, iq,
204 &port->cq_buf[port->cq_buf_count], count_free);
205 port->cq_buf_count += ret;
206
207 /* Update QID, Port and Total TX stats */
208 qid->stats.tx_pkts += ret;
209 port->stats.tx_pkts += ret;
210
211 /* Subtract credits from cached value */
212 sw->cq_ring_space[cq_id] -= ret;
213
214 return ret;
215 }
216
217 static uint32_t
sw_schedule_qid_to_cq(struct sw_evdev * sw)218 sw_schedule_qid_to_cq(struct sw_evdev *sw)
219 {
220 uint32_t pkts = 0;
221 uint32_t qid_idx;
222
223 sw->sched_cq_qid_called++;
224
225 for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
226 struct sw_qid *qid = sw->qids_prioritized[qid_idx];
227
228 int type = qid->type;
229 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
230
231 /* zero mapped CQs indicates directed */
232 if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0)
233 continue;
234
235 uint32_t pkts_done = 0;
236 uint32_t count = iq_count(&qid->iq[iq_num]);
237
238 if (count >= sw->sched_min_burst) {
239 if (type == SW_SCHED_TYPE_DIRECT)
240 pkts_done += sw_schedule_dir_to_cq(sw, qid,
241 iq_num, count);
242 else if (type == RTE_SCHED_TYPE_ATOMIC)
243 pkts_done += sw_schedule_atomic_to_cq(sw, qid,
244 iq_num, count);
245 else
246 pkts_done += sw_schedule_parallel_to_cq(sw, qid,
247 iq_num, count,
248 type == RTE_SCHED_TYPE_ORDERED);
249 }
250
251 /* Check if the IQ that was polled is now empty, and unset it
252 * in the IQ mask if its empty.
253 */
254 int all_done = (pkts_done == count);
255
256 qid->iq_pkt_mask &= ~(all_done << (iq_num));
257 pkts += pkts_done;
258 }
259
260 return pkts;
261 }
262
263 /* This function will perform re-ordering of packets, and injecting into
264 * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
265 * contiguous in that array, this function accepts a "range" of QIDs to scan.
266 */
267 static uint16_t
sw_schedule_reorder(struct sw_evdev * sw,int qid_start,int qid_end)268 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
269 {
270 /* Perform egress reordering */
271 struct rte_event *qe;
272 uint32_t pkts_iter = 0;
273
274 for (; qid_start < qid_end; qid_start++) {
275 struct sw_qid *qid = &sw->qids[qid_start];
276 unsigned int i, num_entries_in_use;
277
278 if (qid->type != RTE_SCHED_TYPE_ORDERED)
279 continue;
280
281 num_entries_in_use = rob_ring_free_count(
282 qid->reorder_buffer_freelist);
283
284 if (num_entries_in_use < sw->sched_min_burst)
285 num_entries_in_use = 0;
286
287 for (i = 0; i < num_entries_in_use; i++) {
288 struct reorder_buffer_entry *entry;
289 int j;
290
291 entry = &qid->reorder_buffer[qid->reorder_buffer_index];
292
293 if (!entry->ready)
294 break;
295
296 for (j = 0; j < entry->num_fragments; j++) {
297 uint16_t dest_qid;
298 uint16_t dest_iq;
299
300 int idx = entry->fragment_index + j;
301 qe = &entry->fragments[idx];
302
303 dest_qid = qe->queue_id;
304 dest_iq = PRIO_TO_IQ(qe->priority);
305
306 if (dest_qid >= sw->qid_count) {
307 sw->stats.rx_dropped++;
308 continue;
309 }
310
311 pkts_iter++;
312
313 struct sw_qid *q = &sw->qids[dest_qid];
314 struct sw_iq *iq = &q->iq[dest_iq];
315
316 /* we checked for space above, so enqueue must
317 * succeed
318 */
319 iq_enqueue(sw, iq, qe);
320 q->iq_pkt_mask |= (1 << (dest_iq));
321 q->iq_pkt_count[dest_iq]++;
322 q->stats.rx_pkts++;
323 }
324
325 entry->ready = (j != entry->num_fragments);
326 entry->num_fragments -= j;
327 entry->fragment_index += j;
328
329 if (!entry->ready) {
330 entry->fragment_index = 0;
331
332 rob_ring_enqueue(
333 qid->reorder_buffer_freelist,
334 entry);
335
336 qid->reorder_buffer_index++;
337 qid->reorder_buffer_index %= qid->window_size;
338 }
339 }
340 }
341 return pkts_iter;
342 }
343
344 static __rte_always_inline void
sw_refill_pp_buf(struct sw_evdev * sw,struct sw_port * port)345 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
346 {
347 RTE_SET_USED(sw);
348 struct rte_event_ring *worker = port->rx_worker_ring;
349 port->pp_buf_start = 0;
350 port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
351 sw->sched_deq_burst_size, NULL);
352 }
353
354 static __rte_always_inline uint32_t
__pull_port_lb(struct sw_evdev * sw,uint32_t port_id,int allow_reorder)355 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
356 {
357 static struct reorder_buffer_entry dummy_rob;
358 uint32_t pkts_iter = 0;
359 struct sw_port *port = &sw->ports[port_id];
360
361 /* If shadow ring has 0 pkts, pull from worker ring */
362 if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
363 sw_refill_pp_buf(sw, port);
364
365 while (port->pp_buf_count) {
366 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
367 struct sw_hist_list_entry *hist_entry = NULL;
368 uint8_t flags = qe->op;
369 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
370 int needs_reorder = 0;
371 /* if no-reordering, having PARTIAL == NEW */
372 if (!allow_reorder && !eop)
373 flags = QE_FLAG_VALID;
374
375 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
376 struct sw_qid *qid = &sw->qids[qe->queue_id];
377
378 /* now process based on flags. Note that for directed
379 * queues, the enqueue_flush masks off all but the
380 * valid flag. This makes FWD and PARTIAL enqueues just
381 * NEW type, and makes DROPS no-op calls.
382 */
383 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
384 const uint32_t hist_tail = port->hist_tail &
385 (SW_PORT_HIST_LIST - 1);
386
387 hist_entry = &port->hist_list[hist_tail];
388 const uint32_t hist_qid = hist_entry->qid;
389 const uint32_t hist_fid = hist_entry->fid;
390
391 struct sw_fid_t *fid =
392 &sw->qids[hist_qid].fids[hist_fid];
393 fid->pcount -= eop;
394 if (fid->pcount == 0)
395 fid->cq = -1;
396
397 if (allow_reorder) {
398 /* set reorder ready if an ordered QID */
399 uintptr_t rob_ptr =
400 (uintptr_t)hist_entry->rob_entry;
401 const uintptr_t valid = (rob_ptr != 0);
402 needs_reorder = valid;
403 rob_ptr |=
404 ((valid - 1) & (uintptr_t)&dummy_rob);
405 struct reorder_buffer_entry *tmp_rob_ptr =
406 (struct reorder_buffer_entry *)rob_ptr;
407 tmp_rob_ptr->ready = eop * needs_reorder;
408 }
409
410 port->inflights -= eop;
411 port->hist_tail += eop;
412 }
413 if (flags & QE_FLAG_VALID) {
414 port->stats.rx_pkts++;
415
416 if (allow_reorder && needs_reorder) {
417 struct reorder_buffer_entry *rob_entry =
418 hist_entry->rob_entry;
419
420 /* Although fragmentation not currently
421 * supported by eventdev API, we support it
422 * here. Open: How do we alert the user that
423 * they've exceeded max frags?
424 */
425 int num_frag = rob_entry->num_fragments;
426 if (num_frag == SW_FRAGMENTS_MAX)
427 sw->stats.rx_dropped++;
428 else {
429 int idx = rob_entry->num_fragments++;
430 rob_entry->fragments[idx] = *qe;
431 }
432 goto end_qe;
433 }
434
435 /* Use the iq_num from above to push the QE
436 * into the qid at the right priority
437 */
438
439 qid->iq_pkt_mask |= (1 << (iq_num));
440 iq_enqueue(sw, &qid->iq[iq_num], qe);
441 qid->iq_pkt_count[iq_num]++;
442 qid->stats.rx_pkts++;
443 pkts_iter++;
444 }
445
446 end_qe:
447 port->pp_buf_start++;
448 port->pp_buf_count--;
449 } /* while (avail_qes) */
450
451 return pkts_iter;
452 }
453
454 static uint32_t
sw_schedule_pull_port_lb(struct sw_evdev * sw,uint32_t port_id)455 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
456 {
457 return __pull_port_lb(sw, port_id, 1);
458 }
459
460 static uint32_t
sw_schedule_pull_port_no_reorder(struct sw_evdev * sw,uint32_t port_id)461 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
462 {
463 return __pull_port_lb(sw, port_id, 0);
464 }
465
466 static uint32_t
sw_schedule_pull_port_dir(struct sw_evdev * sw,uint32_t port_id)467 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
468 {
469 uint32_t pkts_iter = 0;
470 struct sw_port *port = &sw->ports[port_id];
471
472 /* If shadow ring has 0 pkts, pull from worker ring */
473 if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
474 sw_refill_pp_buf(sw, port);
475
476 while (port->pp_buf_count) {
477 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
478 uint8_t flags = qe->op;
479
480 if ((flags & QE_FLAG_VALID) == 0)
481 goto end_qe;
482
483 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
484 struct sw_qid *qid = &sw->qids[qe->queue_id];
485 struct sw_iq *iq = &qid->iq[iq_num];
486
487 port->stats.rx_pkts++;
488
489 /* Use the iq_num from above to push the QE
490 * into the qid at the right priority
491 */
492 qid->iq_pkt_mask |= (1 << (iq_num));
493 iq_enqueue(sw, iq, qe);
494 qid->iq_pkt_count[iq_num]++;
495 qid->stats.rx_pkts++;
496 pkts_iter++;
497
498 end_qe:
499 port->pp_buf_start++;
500 port->pp_buf_count--;
501 } /* while port->pp_buf_count */
502
503 return pkts_iter;
504 }
505
506 int32_t
sw_event_schedule(struct rte_eventdev * dev)507 sw_event_schedule(struct rte_eventdev *dev)
508 {
509 struct sw_evdev *sw = sw_pmd_priv(dev);
510 uint32_t in_pkts, out_pkts;
511 uint32_t out_pkts_total = 0, in_pkts_total = 0;
512 int32_t sched_quanta = sw->sched_quanta;
513 uint32_t i;
514
515 sw->sched_called++;
516 if (unlikely(!sw->started))
517 return -EAGAIN;
518
519 do {
520 uint32_t in_pkts_this_iteration = 0;
521
522 /* Pull from rx_ring for ports */
523 do {
524 in_pkts = 0;
525 for (i = 0; i < sw->port_count; i++) {
526 /* ack the unlinks in progress as done */
527 if (sw->ports[i].unlinks_in_progress)
528 sw->ports[i].unlinks_in_progress = 0;
529
530 if (sw->ports[i].is_directed)
531 in_pkts += sw_schedule_pull_port_dir(sw, i);
532 else if (sw->ports[i].num_ordered_qids > 0)
533 in_pkts += sw_schedule_pull_port_lb(sw, i);
534 else
535 in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
536 }
537
538 /* QID scan for re-ordered */
539 in_pkts += sw_schedule_reorder(sw, 0,
540 sw->qid_count);
541 in_pkts_this_iteration += in_pkts;
542 } while (in_pkts > 4 &&
543 (int)in_pkts_this_iteration < sched_quanta);
544
545 out_pkts = sw_schedule_qid_to_cq(sw);
546 out_pkts_total += out_pkts;
547 in_pkts_total += in_pkts_this_iteration;
548
549 if (in_pkts == 0 && out_pkts == 0)
550 break;
551 } while ((int)out_pkts_total < sched_quanta);
552
553 sw->stats.tx_pkts += out_pkts_total;
554 sw->stats.rx_pkts += in_pkts_total;
555
556 sw->sched_no_iq_enqueues += (in_pkts_total == 0);
557 sw->sched_no_cq_enqueues += (out_pkts_total == 0);
558
559 uint64_t work_done = (in_pkts_total + out_pkts_total) != 0;
560 sw->sched_progress_last_iter = work_done;
561
562 uint64_t cqs_scheds_last_iter = 0;
563
564 /* push all the internal buffered QEs in port->cq_ring to the
565 * worker cores: aka, do the ring transfers batched.
566 */
567 int no_enq = 1;
568 for (i = 0; i < sw->port_count; i++) {
569 struct sw_port *port = &sw->ports[i];
570 struct rte_event_ring *worker = port->cq_worker_ring;
571
572 /* If shadow ring has 0 pkts, pull from worker ring */
573 if (sw->refill_once_per_iter && port->pp_buf_count == 0)
574 sw_refill_pp_buf(sw, port);
575
576 if (port->cq_buf_count >= sw->sched_min_burst) {
577 rte_event_ring_enqueue_burst(worker,
578 port->cq_buf,
579 port->cq_buf_count,
580 &sw->cq_ring_space[i]);
581 port->cq_buf_count = 0;
582 no_enq = 0;
583 cqs_scheds_last_iter |= (1ULL << i);
584 } else {
585 sw->cq_ring_space[i] =
586 rte_event_ring_free_count(worker) -
587 port->cq_buf_count;
588 }
589 }
590
591 if (no_enq) {
592 if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH))
593 sw->sched_min_burst = 1;
594 else
595 sw->sched_flush_count++;
596 } else {
597 if (sw->sched_flush_count)
598 sw->sched_flush_count--;
599 else
600 sw->sched_min_burst = sw->sched_min_burst_size;
601 }
602
603 /* Provide stats on what eventdev ports were scheduled to this
604 * iteration. If more than 64 ports are active, always report that
605 * all Eventdev ports have been scheduled events.
606 */
607 sw->sched_last_iter_bitmask = cqs_scheds_last_iter;
608 if (unlikely(sw->port_count >= 64))
609 sw->sched_last_iter_bitmask = UINT64_MAX;
610
611 return work_done ? 0 : -EAGAIN;
612 }
613