1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2017 Intel Corporation 3 */ 4 5 #include <rte_ring.h> 6 #include <rte_hash_crc.h> 7 #include <rte_event_ring.h> 8 #include "sw_evdev.h" 9 #include "iq_chunk.h" 10 #include "event_ring.h" 11 12 #define SW_IQS_MASK (SW_IQS_MAX-1) 13 14 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the 15 * CLZ twice is faster than caching the value due to data dependencies 16 */ 17 #define PKT_MASK_TO_IQ(pkts) \ 18 (__builtin_ctz(pkts | (1 << SW_IQS_MAX))) 19 20 #if SW_IQS_MAX != 4 21 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change 22 #endif 23 #define PRIO_TO_IQ(prio) (prio >> 6) 24 25 #define MAX_PER_IQ_DEQUEUE 48 26 #define FLOWID_MASK (SW_QID_NUM_FIDS-1) 27 /* use cheap bit mixing, we only need to lose a few bits */ 28 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK) 29 30 31 static inline uint32_t 32 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 33 uint32_t iq_num, unsigned int count) 34 { 35 struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */ 36 struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE]; 37 uint32_t nb_blocked = 0; 38 uint32_t i; 39 40 if (count > MAX_PER_IQ_DEQUEUE) 41 count = MAX_PER_IQ_DEQUEUE; 42 43 /* This is the QID ID. The QID ID is static, hence it can be 44 * used to identify the stage of processing in history lists etc 45 */ 46 uint32_t qid_id = qid->id; 47 48 iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count); 49 for (i = 0; i < count; i++) { 50 const struct rte_event *qe = &qes[i]; 51 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id); 52 struct sw_fid_t *fid = &qid->fids[flow_id]; 53 int cq = fid->cq; 54 55 if (cq < 0) { 56 uint32_t cq_idx; 57 if (qid->cq_next_tx >= qid->cq_num_mapped_cqs) 58 qid->cq_next_tx = 0; 59 cq_idx = qid->cq_next_tx++; 60 61 cq = qid->cq_map[cq_idx]; 62 63 /* find least used */ 64 int cq_free_cnt = sw->cq_ring_space[cq]; 65 for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs; 66 cq_idx++) { 67 int test_cq = qid->cq_map[cq_idx]; 68 int test_cq_free = sw->cq_ring_space[test_cq]; 69 if (test_cq_free > cq_free_cnt) { 70 cq = test_cq; 71 cq_free_cnt = test_cq_free; 72 } 73 } 74 75 fid->cq = cq; /* this pins early */ 76 } 77 78 if (sw->cq_ring_space[cq] == 0 || 79 sw->ports[cq].inflights == SW_PORT_HIST_LIST) { 80 blocked_qes[nb_blocked++] = *qe; 81 continue; 82 } 83 84 struct sw_port *p = &sw->ports[cq]; 85 86 /* at this point we can queue up the packet on the cq_buf */ 87 fid->pcount++; 88 p->cq_buf[p->cq_buf_count++] = *qe; 89 p->inflights++; 90 sw->cq_ring_space[cq]--; 91 92 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1)); 93 p->hist_list[head].fid = flow_id; 94 p->hist_list[head].qid = qid_id; 95 96 p->stats.tx_pkts++; 97 qid->stats.tx_pkts++; 98 qid->to_port[cq]++; 99 100 /* if we just filled in the last slot, flush the buffer */ 101 if (sw->cq_ring_space[cq] == 0) { 102 struct rte_event_ring *worker = p->cq_worker_ring; 103 rte_event_ring_enqueue_burst(worker, p->cq_buf, 104 p->cq_buf_count, 105 &sw->cq_ring_space[cq]); 106 p->cq_buf_count = 0; 107 } 108 } 109 iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked); 110 111 return count - nb_blocked; 112 } 113 114 static inline uint32_t 115 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 116 uint32_t iq_num, unsigned int count, int keep_order) 117 { 118 uint32_t i; 119 uint32_t cq_idx = qid->cq_next_tx; 120 121 /* This is the QID ID. The QID ID is static, hence it can be 122 * used to identify the stage of processing in history lists etc 123 */ 124 uint32_t qid_id = qid->id; 125 126 if (count > MAX_PER_IQ_DEQUEUE) 127 count = MAX_PER_IQ_DEQUEUE; 128 129 if (keep_order) 130 /* only schedule as many as we have reorder buffer entries */ 131 count = RTE_MIN(count, 132 rob_ring_count(qid->reorder_buffer_freelist)); 133 134 for (i = 0; i < count; i++) { 135 const struct rte_event *qe = iq_peek(&qid->iq[iq_num]); 136 uint32_t cq_check_count = 0; 137 uint32_t cq; 138 139 /* 140 * for parallel, just send to next available CQ in round-robin 141 * fashion. So scan for an available CQ. If all CQs are full 142 * just return and move on to next QID 143 */ 144 do { 145 if (++cq_check_count > qid->cq_num_mapped_cqs) 146 goto exit; 147 if (cq_idx >= qid->cq_num_mapped_cqs) 148 cq_idx = 0; 149 cq = qid->cq_map[cq_idx++]; 150 151 } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST || 152 rte_event_ring_free_count( 153 sw->ports[cq].cq_worker_ring) == 0); 154 155 struct sw_port *p = &sw->ports[cq]; 156 if (sw->cq_ring_space[cq] == 0 || 157 p->inflights == SW_PORT_HIST_LIST) 158 break; 159 160 sw->cq_ring_space[cq]--; 161 162 qid->stats.tx_pkts++; 163 164 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1)); 165 p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id); 166 p->hist_list[head].qid = qid_id; 167 168 if (keep_order) 169 rob_ring_dequeue(qid->reorder_buffer_freelist, 170 (void *)&p->hist_list[head].rob_entry); 171 172 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; 173 iq_pop(sw, &qid->iq[iq_num]); 174 175 rte_compiler_barrier(); 176 p->inflights++; 177 p->stats.tx_pkts++; 178 p->hist_head++; 179 } 180 exit: 181 qid->cq_next_tx = cq_idx; 182 return i; 183 } 184 185 static uint32_t 186 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 187 uint32_t iq_num, unsigned int count __rte_unused) 188 { 189 uint32_t cq_id = qid->cq_map[0]; 190 struct sw_port *port = &sw->ports[cq_id]; 191 192 /* get max burst enq size for cq_ring */ 193 uint32_t count_free = sw->cq_ring_space[cq_id]; 194 if (count_free == 0) 195 return 0; 196 197 /* burst dequeue from the QID IQ ring */ 198 struct sw_iq *iq = &qid->iq[iq_num]; 199 uint32_t ret = iq_dequeue_burst(sw, iq, 200 &port->cq_buf[port->cq_buf_count], count_free); 201 port->cq_buf_count += ret; 202 203 /* Update QID, Port and Total TX stats */ 204 qid->stats.tx_pkts += ret; 205 port->stats.tx_pkts += ret; 206 207 /* Subtract credits from cached value */ 208 sw->cq_ring_space[cq_id] -= ret; 209 210 return ret; 211 } 212 213 static uint32_t 214 sw_schedule_qid_to_cq(struct sw_evdev *sw) 215 { 216 uint32_t pkts = 0; 217 uint32_t qid_idx; 218 219 sw->sched_cq_qid_called++; 220 221 for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) { 222 struct sw_qid *qid = sw->qids_prioritized[qid_idx]; 223 224 int type = qid->type; 225 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask); 226 227 /* zero mapped CQs indicates directed */ 228 if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0) 229 continue; 230 231 uint32_t pkts_done = 0; 232 uint32_t count = iq_count(&qid->iq[iq_num]); 233 234 if (count >= sw->sched_min_burst) { 235 if (type == SW_SCHED_TYPE_DIRECT) 236 pkts_done += sw_schedule_dir_to_cq(sw, qid, 237 iq_num, count); 238 else if (type == RTE_SCHED_TYPE_ATOMIC) 239 pkts_done += sw_schedule_atomic_to_cq(sw, qid, 240 iq_num, count); 241 else 242 pkts_done += sw_schedule_parallel_to_cq(sw, qid, 243 iq_num, count, 244 type == RTE_SCHED_TYPE_ORDERED); 245 } 246 247 /* Check if the IQ that was polled is now empty, and unset it 248 * in the IQ mask if its empty. 249 */ 250 int all_done = (pkts_done == count); 251 252 qid->iq_pkt_mask &= ~(all_done << (iq_num)); 253 pkts += pkts_done; 254 } 255 256 return pkts; 257 } 258 259 /* This function will perform re-ordering of packets, and injecting into 260 * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT* 261 * contiguous in that array, this function accepts a "range" of QIDs to scan. 262 */ 263 static uint16_t 264 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) 265 { 266 /* Perform egress reordering */ 267 struct rte_event *qe; 268 uint32_t pkts_iter = 0; 269 270 for (; qid_start < qid_end; qid_start++) { 271 struct sw_qid *qid = &sw->qids[qid_start]; 272 unsigned int i, num_entries_in_use; 273 274 if (qid->type != RTE_SCHED_TYPE_ORDERED) 275 continue; 276 277 num_entries_in_use = rob_ring_free_count( 278 qid->reorder_buffer_freelist); 279 280 if (num_entries_in_use < sw->sched_min_burst) 281 num_entries_in_use = 0; 282 283 for (i = 0; i < num_entries_in_use; i++) { 284 struct reorder_buffer_entry *entry; 285 int j; 286 287 entry = &qid->reorder_buffer[qid->reorder_buffer_index]; 288 289 if (!entry->ready) 290 break; 291 292 for (j = 0; j < entry->num_fragments; j++) { 293 uint16_t dest_qid; 294 uint16_t dest_iq; 295 296 int idx = entry->fragment_index + j; 297 qe = &entry->fragments[idx]; 298 299 dest_qid = qe->queue_id; 300 dest_iq = PRIO_TO_IQ(qe->priority); 301 302 if (dest_qid >= sw->qid_count) { 303 sw->stats.rx_dropped++; 304 continue; 305 } 306 307 pkts_iter++; 308 309 struct sw_qid *q = &sw->qids[dest_qid]; 310 struct sw_iq *iq = &q->iq[dest_iq]; 311 312 /* we checked for space above, so enqueue must 313 * succeed 314 */ 315 iq_enqueue(sw, iq, qe); 316 q->iq_pkt_mask |= (1 << (dest_iq)); 317 q->iq_pkt_count[dest_iq]++; 318 q->stats.rx_pkts++; 319 } 320 321 entry->ready = (j != entry->num_fragments); 322 entry->num_fragments -= j; 323 entry->fragment_index += j; 324 325 if (!entry->ready) { 326 entry->fragment_index = 0; 327 328 rob_ring_enqueue( 329 qid->reorder_buffer_freelist, 330 entry); 331 332 qid->reorder_buffer_index++; 333 qid->reorder_buffer_index %= qid->window_size; 334 } 335 } 336 } 337 return pkts_iter; 338 } 339 340 static __rte_always_inline void 341 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) 342 { 343 RTE_SET_USED(sw); 344 struct rte_event_ring *worker = port->rx_worker_ring; 345 port->pp_buf_start = 0; 346 port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf, 347 sw->sched_deq_burst_size, NULL); 348 } 349 350 static __rte_always_inline uint32_t 351 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) 352 { 353 static struct reorder_buffer_entry dummy_rob; 354 uint32_t pkts_iter = 0; 355 struct sw_port *port = &sw->ports[port_id]; 356 357 /* If shadow ring has 0 pkts, pull from worker ring */ 358 if (!sw->refill_once_per_iter && port->pp_buf_count == 0) 359 sw_refill_pp_buf(sw, port); 360 361 while (port->pp_buf_count) { 362 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; 363 struct sw_hist_list_entry *hist_entry = NULL; 364 uint8_t flags = qe->op; 365 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP); 366 int needs_reorder = 0; 367 /* if no-reordering, having PARTIAL == NEW */ 368 if (!allow_reorder && !eop) 369 flags = QE_FLAG_VALID; 370 371 /* 372 * if we don't have space for this packet in an IQ, 373 * then move on to next queue. Technically, for a 374 * packet that needs reordering, we don't need to check 375 * here, but it simplifies things not to special-case 376 */ 377 uint32_t iq_num = PRIO_TO_IQ(qe->priority); 378 struct sw_qid *qid = &sw->qids[qe->queue_id]; 379 380 /* now process based on flags. Note that for directed 381 * queues, the enqueue_flush masks off all but the 382 * valid flag. This makes FWD and PARTIAL enqueues just 383 * NEW type, and makes DROPS no-op calls. 384 */ 385 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) { 386 const uint32_t hist_tail = port->hist_tail & 387 (SW_PORT_HIST_LIST - 1); 388 389 hist_entry = &port->hist_list[hist_tail]; 390 const uint32_t hist_qid = hist_entry->qid; 391 const uint32_t hist_fid = hist_entry->fid; 392 393 struct sw_fid_t *fid = 394 &sw->qids[hist_qid].fids[hist_fid]; 395 fid->pcount -= eop; 396 if (fid->pcount == 0) 397 fid->cq = -1; 398 399 if (allow_reorder) { 400 /* set reorder ready if an ordered QID */ 401 uintptr_t rob_ptr = 402 (uintptr_t)hist_entry->rob_entry; 403 const uintptr_t valid = (rob_ptr != 0); 404 needs_reorder = valid; 405 rob_ptr |= 406 ((valid - 1) & (uintptr_t)&dummy_rob); 407 struct reorder_buffer_entry *tmp_rob_ptr = 408 (struct reorder_buffer_entry *)rob_ptr; 409 tmp_rob_ptr->ready = eop * needs_reorder; 410 } 411 412 port->inflights -= eop; 413 port->hist_tail += eop; 414 } 415 if (flags & QE_FLAG_VALID) { 416 port->stats.rx_pkts++; 417 418 if (allow_reorder && needs_reorder) { 419 struct reorder_buffer_entry *rob_entry = 420 hist_entry->rob_entry; 421 422 hist_entry->rob_entry = NULL; 423 /* Although fragmentation not currently 424 * supported by eventdev API, we support it 425 * here. Open: How do we alert the user that 426 * they've exceeded max frags? 427 */ 428 int num_frag = rob_entry->num_fragments; 429 if (num_frag == SW_FRAGMENTS_MAX) 430 sw->stats.rx_dropped++; 431 else { 432 int idx = rob_entry->num_fragments++; 433 rob_entry->fragments[idx] = *qe; 434 } 435 goto end_qe; 436 } 437 438 /* Use the iq_num from above to push the QE 439 * into the qid at the right priority 440 */ 441 442 qid->iq_pkt_mask |= (1 << (iq_num)); 443 iq_enqueue(sw, &qid->iq[iq_num], qe); 444 qid->iq_pkt_count[iq_num]++; 445 qid->stats.rx_pkts++; 446 pkts_iter++; 447 } 448 449 end_qe: 450 port->pp_buf_start++; 451 port->pp_buf_count--; 452 } /* while (avail_qes) */ 453 454 return pkts_iter; 455 } 456 457 static uint32_t 458 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id) 459 { 460 return __pull_port_lb(sw, port_id, 1); 461 } 462 463 static uint32_t 464 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id) 465 { 466 return __pull_port_lb(sw, port_id, 0); 467 } 468 469 static uint32_t 470 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) 471 { 472 uint32_t pkts_iter = 0; 473 struct sw_port *port = &sw->ports[port_id]; 474 475 /* If shadow ring has 0 pkts, pull from worker ring */ 476 if (!sw->refill_once_per_iter && port->pp_buf_count == 0) 477 sw_refill_pp_buf(sw, port); 478 479 while (port->pp_buf_count) { 480 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; 481 uint8_t flags = qe->op; 482 483 if ((flags & QE_FLAG_VALID) == 0) 484 goto end_qe; 485 486 uint32_t iq_num = PRIO_TO_IQ(qe->priority); 487 struct sw_qid *qid = &sw->qids[qe->queue_id]; 488 struct sw_iq *iq = &qid->iq[iq_num]; 489 490 port->stats.rx_pkts++; 491 492 /* Use the iq_num from above to push the QE 493 * into the qid at the right priority 494 */ 495 qid->iq_pkt_mask |= (1 << (iq_num)); 496 iq_enqueue(sw, iq, qe); 497 qid->iq_pkt_count[iq_num]++; 498 qid->stats.rx_pkts++; 499 pkts_iter++; 500 501 end_qe: 502 port->pp_buf_start++; 503 port->pp_buf_count--; 504 } /* while port->pp_buf_count */ 505 506 return pkts_iter; 507 } 508 509 void 510 sw_event_schedule(struct rte_eventdev *dev) 511 { 512 struct sw_evdev *sw = sw_pmd_priv(dev); 513 uint32_t in_pkts, out_pkts; 514 uint32_t out_pkts_total = 0, in_pkts_total = 0; 515 int32_t sched_quanta = sw->sched_quanta; 516 uint32_t i; 517 518 sw->sched_called++; 519 if (unlikely(!sw->started)) 520 return; 521 522 do { 523 uint32_t in_pkts_this_iteration = 0; 524 525 /* Pull from rx_ring for ports */ 526 do { 527 in_pkts = 0; 528 for (i = 0; i < sw->port_count; i++) { 529 /* ack the unlinks in progress as done */ 530 if (sw->ports[i].unlinks_in_progress) 531 sw->ports[i].unlinks_in_progress = 0; 532 533 if (sw->ports[i].is_directed) 534 in_pkts += sw_schedule_pull_port_dir(sw, i); 535 else if (sw->ports[i].num_ordered_qids > 0) 536 in_pkts += sw_schedule_pull_port_lb(sw, i); 537 else 538 in_pkts += sw_schedule_pull_port_no_reorder(sw, i); 539 } 540 541 /* QID scan for re-ordered */ 542 in_pkts += sw_schedule_reorder(sw, 0, 543 sw->qid_count); 544 in_pkts_this_iteration += in_pkts; 545 } while (in_pkts > 4 && 546 (int)in_pkts_this_iteration < sched_quanta); 547 548 out_pkts = sw_schedule_qid_to_cq(sw); 549 out_pkts_total += out_pkts; 550 in_pkts_total += in_pkts_this_iteration; 551 552 if (in_pkts == 0 && out_pkts == 0) 553 break; 554 } while ((int)out_pkts_total < sched_quanta); 555 556 sw->stats.tx_pkts += out_pkts_total; 557 sw->stats.rx_pkts += in_pkts_total; 558 559 sw->sched_no_iq_enqueues += (in_pkts_total == 0); 560 sw->sched_no_cq_enqueues += (out_pkts_total == 0); 561 562 /* push all the internal buffered QEs in port->cq_ring to the 563 * worker cores: aka, do the ring transfers batched. 564 */ 565 int no_enq = 1; 566 for (i = 0; i < sw->port_count; i++) { 567 struct sw_port *port = &sw->ports[i]; 568 struct rte_event_ring *worker = port->cq_worker_ring; 569 570 /* If shadow ring has 0 pkts, pull from worker ring */ 571 if (sw->refill_once_per_iter && port->pp_buf_count == 0) 572 sw_refill_pp_buf(sw, port); 573 574 if (port->cq_buf_count >= sw->sched_min_burst) { 575 rte_event_ring_enqueue_burst(worker, 576 port->cq_buf, 577 port->cq_buf_count, 578 &sw->cq_ring_space[i]); 579 port->cq_buf_count = 0; 580 no_enq = 0; 581 } else { 582 sw->cq_ring_space[i] = 583 rte_event_ring_free_count(worker) - 584 port->cq_buf_count; 585 } 586 } 587 588 if (no_enq) { 589 if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH)) 590 sw->sched_min_burst = 1; 591 else 592 sw->sched_flush_count++; 593 } else { 594 if (sw->sched_flush_count) 595 sw->sched_flush_count--; 596 else 597 sw->sched_min_burst = sw->sched_min_burst_size; 598 } 599 600 } 601