1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2017 Intel Corporation 3 */ 4 5 #include <rte_ring.h> 6 #include <rte_hash_crc.h> 7 #include <rte_event_ring.h> 8 #include "sw_evdev.h" 9 #include "iq_chunk.h" 10 #include "event_ring.h" 11 12 #define SW_IQS_MASK (SW_IQS_MAX-1) 13 14 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the 15 * CLZ twice is faster than caching the value due to data dependencies 16 */ 17 #define PKT_MASK_TO_IQ(pkts) \ 18 (rte_ctz32(pkts | (1 << SW_IQS_MAX))) 19 20 #if SW_IQS_MAX != 4 21 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change 22 #endif 23 #define PRIO_TO_IQ(prio) (prio >> 6) 24 25 #define MAX_PER_IQ_DEQUEUE 48 26 #define FLOWID_MASK (SW_QID_NUM_FIDS-1) 27 /* use cheap bit mixing, we only need to lose a few bits */ 28 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK) 29 30 31 static inline uint32_t 32 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 33 uint32_t iq_num, unsigned int count) 34 { 35 struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */ 36 struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE]; 37 uint32_t nb_blocked = 0; 38 uint32_t i; 39 40 if (count > MAX_PER_IQ_DEQUEUE) 41 count = MAX_PER_IQ_DEQUEUE; 42 43 /* This is the QID ID. The QID ID is static, hence it can be 44 * used to identify the stage of processing in history lists etc 45 */ 46 uint32_t qid_id = qid->id; 47 48 iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count); 49 for (i = 0; i < count; i++) { 50 const struct rte_event *qe = &qes[i]; 51 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id); 52 struct sw_fid_t *fid = &qid->fids[flow_id]; 53 int cq = fid->cq; 54 55 if (cq < 0) { 56 uint32_t cq_idx; 57 if (qid->cq_next_tx >= qid->cq_num_mapped_cqs) 58 qid->cq_next_tx = 0; 59 cq_idx = qid->cq_next_tx++; 60 61 cq = qid->cq_map[cq_idx]; 62 63 /* find least used */ 64 int cq_free_cnt = sw->cq_ring_space[cq]; 65 for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs; 66 cq_idx++) { 67 int test_cq = qid->cq_map[cq_idx]; 68 int test_cq_free = sw->cq_ring_space[test_cq]; 69 if (test_cq_free > cq_free_cnt) { 70 cq = test_cq; 71 cq_free_cnt = test_cq_free; 72 } 73 } 74 75 fid->cq = cq; /* this pins early */ 76 } 77 78 if (sw->cq_ring_space[cq] == 0 || 79 sw->ports[cq].inflights == SW_PORT_HIST_LIST) { 80 blocked_qes[nb_blocked++] = *qe; 81 continue; 82 } 83 84 struct sw_port *p = &sw->ports[cq]; 85 86 /* at this point we can queue up the packet on the cq_buf */ 87 fid->pcount++; 88 p->cq_buf[p->cq_buf_count++] = *qe; 89 p->inflights++; 90 sw->cq_ring_space[cq]--; 91 92 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1)); 93 p->hist_list[head] = (struct sw_hist_list_entry) { 94 .qid = qid_id, 95 .fid = flow_id, 96 }; 97 98 p->stats.tx_pkts++; 99 qid->stats.tx_pkts++; 100 qid->to_port[cq]++; 101 102 /* if we just filled in the last slot, flush the buffer */ 103 if (sw->cq_ring_space[cq] == 0) { 104 struct rte_event_ring *worker = p->cq_worker_ring; 105 rte_event_ring_enqueue_burst(worker, p->cq_buf, 106 p->cq_buf_count, 107 &sw->cq_ring_space[cq]); 108 p->cq_buf_count = 0; 109 } 110 } 111 iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked); 112 113 return count - nb_blocked; 114 } 115 116 static inline uint32_t 117 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 118 uint32_t iq_num, unsigned int count, int keep_order) 119 { 120 uint32_t i; 121 uint32_t cq_idx = qid->cq_next_tx; 122 123 /* This is the QID ID. The QID ID is static, hence it can be 124 * used to identify the stage of processing in history lists etc 125 */ 126 uint32_t qid_id = qid->id; 127 128 if (count > MAX_PER_IQ_DEQUEUE) 129 count = MAX_PER_IQ_DEQUEUE; 130 131 if (keep_order) 132 /* only schedule as many as we have reorder buffer entries */ 133 count = RTE_MIN(count, 134 rob_ring_count(qid->reorder_buffer_freelist)); 135 136 for (i = 0; i < count; i++) { 137 const struct rte_event *qe = iq_peek(&qid->iq[iq_num]); 138 uint32_t cq_check_count = 0; 139 uint32_t cq; 140 141 /* 142 * for parallel, just send to next available CQ in round-robin 143 * fashion. So scan for an available CQ. If all CQs are full 144 * just return and move on to next QID 145 */ 146 do { 147 if (++cq_check_count > qid->cq_num_mapped_cqs) 148 goto exit; 149 if (cq_idx >= qid->cq_num_mapped_cqs) 150 cq_idx = 0; 151 cq = qid->cq_map[cq_idx++]; 152 153 } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST || 154 rte_event_ring_free_count( 155 sw->ports[cq].cq_worker_ring) == 0); 156 157 struct sw_port *p = &sw->ports[cq]; 158 if (sw->cq_ring_space[cq] == 0 || 159 p->inflights == SW_PORT_HIST_LIST) 160 break; 161 162 sw->cq_ring_space[cq]--; 163 164 qid->stats.tx_pkts++; 165 166 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1)); 167 p->hist_list[head] = (struct sw_hist_list_entry) { 168 .qid = qid_id, 169 .fid = SW_HASH_FLOWID(qe->flow_id), 170 }; 171 172 if (keep_order) 173 rob_ring_dequeue(qid->reorder_buffer_freelist, 174 (void *)&p->hist_list[head].rob_entry); 175 176 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; 177 iq_pop(sw, &qid->iq[iq_num]); 178 179 rte_compiler_barrier(); 180 p->inflights++; 181 p->stats.tx_pkts++; 182 p->hist_head++; 183 } 184 exit: 185 qid->cq_next_tx = cq_idx; 186 return i; 187 } 188 189 static uint32_t 190 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 191 uint32_t iq_num, unsigned int count __rte_unused) 192 { 193 uint32_t cq_id = qid->cq_map[0]; 194 struct sw_port *port = &sw->ports[cq_id]; 195 196 /* get max burst enq size for cq_ring */ 197 uint32_t count_free = sw->cq_ring_space[cq_id]; 198 if (count_free == 0) 199 return 0; 200 201 /* burst dequeue from the QID IQ ring */ 202 struct sw_iq *iq = &qid->iq[iq_num]; 203 uint32_t ret = iq_dequeue_burst(sw, iq, 204 &port->cq_buf[port->cq_buf_count], count_free); 205 port->cq_buf_count += ret; 206 207 /* Update QID, Port and Total TX stats */ 208 qid->stats.tx_pkts += ret; 209 port->stats.tx_pkts += ret; 210 211 /* Subtract credits from cached value */ 212 sw->cq_ring_space[cq_id] -= ret; 213 214 return ret; 215 } 216 217 static uint32_t 218 sw_schedule_qid_to_cq(struct sw_evdev *sw) 219 { 220 uint32_t pkts = 0; 221 uint32_t qid_idx; 222 223 sw->sched_cq_qid_called++; 224 225 for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) { 226 struct sw_qid *qid = sw->qids_prioritized[qid_idx]; 227 228 int type = qid->type; 229 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask); 230 231 /* zero mapped CQs indicates directed */ 232 if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0) 233 continue; 234 235 uint32_t pkts_done = 0; 236 uint32_t count = iq_count(&qid->iq[iq_num]); 237 238 if (count >= sw->sched_min_burst) { 239 if (type == SW_SCHED_TYPE_DIRECT) 240 pkts_done += sw_schedule_dir_to_cq(sw, qid, 241 iq_num, count); 242 else if (type == RTE_SCHED_TYPE_ATOMIC) 243 pkts_done += sw_schedule_atomic_to_cq(sw, qid, 244 iq_num, count); 245 else 246 pkts_done += sw_schedule_parallel_to_cq(sw, qid, 247 iq_num, count, 248 type == RTE_SCHED_TYPE_ORDERED); 249 } 250 251 /* Check if the IQ that was polled is now empty, and unset it 252 * in the IQ mask if its empty. 253 */ 254 int all_done = (pkts_done == count); 255 256 qid->iq_pkt_mask &= ~(all_done << (iq_num)); 257 pkts += pkts_done; 258 } 259 260 return pkts; 261 } 262 263 /* This function will perform re-ordering of packets, and injecting into 264 * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT* 265 * contiguous in that array, this function accepts a "range" of QIDs to scan. 266 */ 267 static uint16_t 268 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) 269 { 270 /* Perform egress reordering */ 271 struct rte_event *qe; 272 uint32_t pkts_iter = 0; 273 274 for (; qid_start < qid_end; qid_start++) { 275 struct sw_qid *qid = &sw->qids[qid_start]; 276 unsigned int i, num_entries_in_use; 277 278 if (qid->type != RTE_SCHED_TYPE_ORDERED) 279 continue; 280 281 num_entries_in_use = rob_ring_free_count( 282 qid->reorder_buffer_freelist); 283 284 if (num_entries_in_use < sw->sched_min_burst) 285 num_entries_in_use = 0; 286 287 for (i = 0; i < num_entries_in_use; i++) { 288 struct reorder_buffer_entry *entry; 289 int j; 290 291 entry = &qid->reorder_buffer[qid->reorder_buffer_index]; 292 293 if (!entry->ready) 294 break; 295 296 for (j = 0; j < entry->num_fragments; j++) { 297 uint16_t dest_qid; 298 uint16_t dest_iq; 299 300 int idx = entry->fragment_index + j; 301 qe = &entry->fragments[idx]; 302 303 dest_qid = qe->queue_id; 304 dest_iq = PRIO_TO_IQ(qe->priority); 305 306 if (dest_qid >= sw->qid_count) { 307 sw->stats.rx_dropped++; 308 continue; 309 } 310 311 pkts_iter++; 312 313 struct sw_qid *q = &sw->qids[dest_qid]; 314 struct sw_iq *iq = &q->iq[dest_iq]; 315 316 /* we checked for space above, so enqueue must 317 * succeed 318 */ 319 iq_enqueue(sw, iq, qe); 320 q->iq_pkt_mask |= (1 << (dest_iq)); 321 q->iq_pkt_count[dest_iq]++; 322 q->stats.rx_pkts++; 323 } 324 325 entry->ready = (j != entry->num_fragments); 326 entry->num_fragments -= j; 327 entry->fragment_index += j; 328 329 if (!entry->ready) { 330 entry->fragment_index = 0; 331 332 rob_ring_enqueue( 333 qid->reorder_buffer_freelist, 334 entry); 335 336 qid->reorder_buffer_index++; 337 qid->reorder_buffer_index %= qid->window_size; 338 } 339 } 340 } 341 return pkts_iter; 342 } 343 344 static __rte_always_inline void 345 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) 346 { 347 RTE_SET_USED(sw); 348 struct rte_event_ring *worker = port->rx_worker_ring; 349 port->pp_buf_start = 0; 350 port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf, 351 sw->sched_deq_burst_size, NULL); 352 } 353 354 static __rte_always_inline uint32_t 355 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) 356 { 357 static struct reorder_buffer_entry dummy_rob; 358 uint32_t pkts_iter = 0; 359 struct sw_port *port = &sw->ports[port_id]; 360 361 /* If shadow ring has 0 pkts, pull from worker ring */ 362 if (!sw->refill_once_per_iter && port->pp_buf_count == 0) 363 sw_refill_pp_buf(sw, port); 364 365 while (port->pp_buf_count) { 366 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; 367 struct sw_hist_list_entry *hist_entry = NULL; 368 uint8_t flags = qe->op; 369 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP); 370 int needs_reorder = 0; 371 /* if no-reordering, having PARTIAL == NEW */ 372 if (!allow_reorder && !eop) 373 flags = QE_FLAG_VALID; 374 375 uint32_t iq_num = PRIO_TO_IQ(qe->priority); 376 struct sw_qid *qid = &sw->qids[qe->queue_id]; 377 378 /* now process based on flags. Note that for directed 379 * queues, the enqueue_flush masks off all but the 380 * valid flag. This makes FWD and PARTIAL enqueues just 381 * NEW type, and makes DROPS no-op calls. 382 */ 383 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) { 384 const uint32_t hist_tail = port->hist_tail & 385 (SW_PORT_HIST_LIST - 1); 386 387 hist_entry = &port->hist_list[hist_tail]; 388 const uint32_t hist_qid = hist_entry->qid; 389 const uint32_t hist_fid = hist_entry->fid; 390 391 struct sw_fid_t *fid = 392 &sw->qids[hist_qid].fids[hist_fid]; 393 fid->pcount -= eop; 394 if (fid->pcount == 0) 395 fid->cq = -1; 396 397 if (allow_reorder) { 398 /* set reorder ready if an ordered QID */ 399 uintptr_t rob_ptr = 400 (uintptr_t)hist_entry->rob_entry; 401 const uintptr_t valid = (rob_ptr != 0); 402 needs_reorder = valid; 403 rob_ptr |= 404 ((valid - 1) & (uintptr_t)&dummy_rob); 405 struct reorder_buffer_entry *tmp_rob_ptr = 406 (struct reorder_buffer_entry *)rob_ptr; 407 tmp_rob_ptr->ready = eop * needs_reorder; 408 } 409 410 port->inflights -= eop; 411 port->hist_tail += eop; 412 } 413 if (flags & QE_FLAG_VALID) { 414 port->stats.rx_pkts++; 415 416 if (allow_reorder && needs_reorder) { 417 struct reorder_buffer_entry *rob_entry = 418 hist_entry->rob_entry; 419 420 /* Although fragmentation not currently 421 * supported by eventdev API, we support it 422 * here. Open: How do we alert the user that 423 * they've exceeded max frags? 424 */ 425 int num_frag = rob_entry->num_fragments; 426 if (num_frag == SW_FRAGMENTS_MAX) 427 sw->stats.rx_dropped++; 428 else { 429 int idx = rob_entry->num_fragments++; 430 rob_entry->fragments[idx] = *qe; 431 } 432 goto end_qe; 433 } 434 435 /* Use the iq_num from above to push the QE 436 * into the qid at the right priority 437 */ 438 439 qid->iq_pkt_mask |= (1 << (iq_num)); 440 iq_enqueue(sw, &qid->iq[iq_num], qe); 441 qid->iq_pkt_count[iq_num]++; 442 qid->stats.rx_pkts++; 443 pkts_iter++; 444 } 445 446 end_qe: 447 port->pp_buf_start++; 448 port->pp_buf_count--; 449 } /* while (avail_qes) */ 450 451 return pkts_iter; 452 } 453 454 static uint32_t 455 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id) 456 { 457 return __pull_port_lb(sw, port_id, 1); 458 } 459 460 static uint32_t 461 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id) 462 { 463 return __pull_port_lb(sw, port_id, 0); 464 } 465 466 static uint32_t 467 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) 468 { 469 uint32_t pkts_iter = 0; 470 struct sw_port *port = &sw->ports[port_id]; 471 472 /* If shadow ring has 0 pkts, pull from worker ring */ 473 if (!sw->refill_once_per_iter && port->pp_buf_count == 0) 474 sw_refill_pp_buf(sw, port); 475 476 while (port->pp_buf_count) { 477 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; 478 uint8_t flags = qe->op; 479 480 if ((flags & QE_FLAG_VALID) == 0) 481 goto end_qe; 482 483 uint32_t iq_num = PRIO_TO_IQ(qe->priority); 484 struct sw_qid *qid = &sw->qids[qe->queue_id]; 485 struct sw_iq *iq = &qid->iq[iq_num]; 486 487 port->stats.rx_pkts++; 488 489 /* Use the iq_num from above to push the QE 490 * into the qid at the right priority 491 */ 492 qid->iq_pkt_mask |= (1 << (iq_num)); 493 iq_enqueue(sw, iq, qe); 494 qid->iq_pkt_count[iq_num]++; 495 qid->stats.rx_pkts++; 496 pkts_iter++; 497 498 end_qe: 499 port->pp_buf_start++; 500 port->pp_buf_count--; 501 } /* while port->pp_buf_count */ 502 503 return pkts_iter; 504 } 505 506 int32_t 507 sw_event_schedule(struct rte_eventdev *dev) 508 { 509 struct sw_evdev *sw = sw_pmd_priv(dev); 510 uint32_t in_pkts, out_pkts; 511 uint32_t out_pkts_total = 0, in_pkts_total = 0; 512 int32_t sched_quanta = sw->sched_quanta; 513 uint32_t i; 514 515 sw->sched_called++; 516 if (unlikely(!sw->started)) 517 return -EAGAIN; 518 519 do { 520 uint32_t in_pkts_this_iteration = 0; 521 522 /* Pull from rx_ring for ports */ 523 do { 524 in_pkts = 0; 525 for (i = 0; i < sw->port_count; i++) { 526 /* ack the unlinks in progress as done */ 527 if (sw->ports[i].unlinks_in_progress) 528 sw->ports[i].unlinks_in_progress = 0; 529 530 if (sw->ports[i].is_directed) 531 in_pkts += sw_schedule_pull_port_dir(sw, i); 532 else if (sw->ports[i].num_ordered_qids > 0) 533 in_pkts += sw_schedule_pull_port_lb(sw, i); 534 else 535 in_pkts += sw_schedule_pull_port_no_reorder(sw, i); 536 } 537 538 /* QID scan for re-ordered */ 539 in_pkts += sw_schedule_reorder(sw, 0, 540 sw->qid_count); 541 in_pkts_this_iteration += in_pkts; 542 } while (in_pkts > 4 && 543 (int)in_pkts_this_iteration < sched_quanta); 544 545 out_pkts = sw_schedule_qid_to_cq(sw); 546 out_pkts_total += out_pkts; 547 in_pkts_total += in_pkts_this_iteration; 548 549 if (in_pkts == 0 && out_pkts == 0) 550 break; 551 } while ((int)out_pkts_total < sched_quanta); 552 553 sw->stats.tx_pkts += out_pkts_total; 554 sw->stats.rx_pkts += in_pkts_total; 555 556 sw->sched_no_iq_enqueues += (in_pkts_total == 0); 557 sw->sched_no_cq_enqueues += (out_pkts_total == 0); 558 559 uint64_t work_done = (in_pkts_total + out_pkts_total) != 0; 560 sw->sched_progress_last_iter = work_done; 561 562 uint64_t cqs_scheds_last_iter = 0; 563 564 /* push all the internal buffered QEs in port->cq_ring to the 565 * worker cores: aka, do the ring transfers batched. 566 */ 567 int no_enq = 1; 568 for (i = 0; i < sw->port_count; i++) { 569 struct sw_port *port = &sw->ports[i]; 570 struct rte_event_ring *worker = port->cq_worker_ring; 571 572 /* If shadow ring has 0 pkts, pull from worker ring */ 573 if (sw->refill_once_per_iter && port->pp_buf_count == 0) 574 sw_refill_pp_buf(sw, port); 575 576 if (port->cq_buf_count >= sw->sched_min_burst) { 577 rte_event_ring_enqueue_burst(worker, 578 port->cq_buf, 579 port->cq_buf_count, 580 &sw->cq_ring_space[i]); 581 port->cq_buf_count = 0; 582 no_enq = 0; 583 cqs_scheds_last_iter |= (1ULL << i); 584 } else { 585 sw->cq_ring_space[i] = 586 rte_event_ring_free_count(worker) - 587 port->cq_buf_count; 588 } 589 } 590 591 if (no_enq) { 592 if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH)) 593 sw->sched_min_burst = 1; 594 else 595 sw->sched_flush_count++; 596 } else { 597 if (sw->sched_flush_count) 598 sw->sched_flush_count--; 599 else 600 sw->sched_min_burst = sw->sched_min_burst_size; 601 } 602 603 /* Provide stats on what eventdev ports were scheduled to this 604 * iteration. If more than 64 ports are active, always report that 605 * all Eventdev ports have been scheduled events. 606 */ 607 sw->sched_last_iter_bitmask = cqs_scheds_last_iter; 608 if (unlikely(sw->port_count >= 64)) 609 sw->sched_last_iter_bitmask = UINT64_MAX; 610 611 return work_done ? 0 : -EAGAIN; 612 } 613