1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2017 Intel Corporation 3 */ 4 5 #include <rte_ring.h> 6 #include <rte_hash_crc.h> 7 #include <rte_event_ring.h> 8 #include "sw_evdev.h" 9 #include "iq_chunk.h" 10 11 #define SW_IQS_MASK (SW_IQS_MAX-1) 12 13 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the 14 * CLZ twice is faster than caching the value due to data dependencies 15 */ 16 #define PKT_MASK_TO_IQ(pkts) \ 17 (__builtin_ctz(pkts | (1 << SW_IQS_MAX))) 18 19 #if SW_IQS_MAX != 4 20 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change 21 #endif 22 #define PRIO_TO_IQ(prio) (prio >> 6) 23 24 #define MAX_PER_IQ_DEQUEUE 48 25 #define FLOWID_MASK (SW_QID_NUM_FIDS-1) 26 /* use cheap bit mixing, we only need to lose a few bits */ 27 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK) 28 29 static inline uint32_t 30 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 31 uint32_t iq_num, unsigned int count) 32 { 33 struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */ 34 struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE]; 35 uint32_t nb_blocked = 0; 36 uint32_t i; 37 38 if (count > MAX_PER_IQ_DEQUEUE) 39 count = MAX_PER_IQ_DEQUEUE; 40 41 /* This is the QID ID. The QID ID is static, hence it can be 42 * used to identify the stage of processing in history lists etc 43 */ 44 uint32_t qid_id = qid->id; 45 46 iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count); 47 for (i = 0; i < count; i++) { 48 const struct rte_event *qe = &qes[i]; 49 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id); 50 struct sw_fid_t *fid = &qid->fids[flow_id]; 51 int cq = fid->cq; 52 53 if (cq < 0) { 54 uint32_t cq_idx; 55 if (qid->cq_next_tx >= qid->cq_num_mapped_cqs) 56 qid->cq_next_tx = 0; 57 cq_idx = qid->cq_next_tx++; 58 59 cq = qid->cq_map[cq_idx]; 60 61 /* find least used */ 62 int cq_free_cnt = sw->cq_ring_space[cq]; 63 for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs; 64 cq_idx++) { 65 int test_cq = qid->cq_map[cq_idx]; 66 int test_cq_free = sw->cq_ring_space[test_cq]; 67 if (test_cq_free > cq_free_cnt) { 68 cq = test_cq; 69 cq_free_cnt = test_cq_free; 70 } 71 } 72 73 fid->cq = cq; /* this pins early */ 74 } 75 76 if (sw->cq_ring_space[cq] == 0 || 77 sw->ports[cq].inflights == SW_PORT_HIST_LIST) { 78 blocked_qes[nb_blocked++] = *qe; 79 continue; 80 } 81 82 struct sw_port *p = &sw->ports[cq]; 83 84 /* at this point we can queue up the packet on the cq_buf */ 85 fid->pcount++; 86 p->cq_buf[p->cq_buf_count++] = *qe; 87 p->inflights++; 88 sw->cq_ring_space[cq]--; 89 90 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1)); 91 p->hist_list[head].fid = flow_id; 92 p->hist_list[head].qid = qid_id; 93 94 p->stats.tx_pkts++; 95 qid->stats.tx_pkts++; 96 qid->to_port[cq]++; 97 98 /* if we just filled in the last slot, flush the buffer */ 99 if (sw->cq_ring_space[cq] == 0) { 100 struct rte_event_ring *worker = p->cq_worker_ring; 101 rte_event_ring_enqueue_burst(worker, p->cq_buf, 102 p->cq_buf_count, 103 &sw->cq_ring_space[cq]); 104 p->cq_buf_count = 0; 105 } 106 } 107 iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked); 108 109 return count - nb_blocked; 110 } 111 112 static inline uint32_t 113 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 114 uint32_t iq_num, unsigned int count, int keep_order) 115 { 116 uint32_t i; 117 uint32_t cq_idx = qid->cq_next_tx; 118 119 /* This is the QID ID. The QID ID is static, hence it can be 120 * used to identify the stage of processing in history lists etc 121 */ 122 uint32_t qid_id = qid->id; 123 124 if (count > MAX_PER_IQ_DEQUEUE) 125 count = MAX_PER_IQ_DEQUEUE; 126 127 if (keep_order) 128 /* only schedule as many as we have reorder buffer entries */ 129 count = RTE_MIN(count, 130 rte_ring_count(qid->reorder_buffer_freelist)); 131 132 for (i = 0; i < count; i++) { 133 const struct rte_event *qe = iq_peek(&qid->iq[iq_num]); 134 uint32_t cq_check_count = 0; 135 uint32_t cq; 136 137 /* 138 * for parallel, just send to next available CQ in round-robin 139 * fashion. So scan for an available CQ. If all CQs are full 140 * just return and move on to next QID 141 */ 142 do { 143 if (++cq_check_count > qid->cq_num_mapped_cqs) 144 goto exit; 145 if (cq_idx >= qid->cq_num_mapped_cqs) 146 cq_idx = 0; 147 cq = qid->cq_map[cq_idx++]; 148 149 } while (rte_event_ring_free_count( 150 sw->ports[cq].cq_worker_ring) == 0 || 151 sw->ports[cq].inflights == SW_PORT_HIST_LIST); 152 153 struct sw_port *p = &sw->ports[cq]; 154 if (sw->cq_ring_space[cq] == 0 || 155 p->inflights == SW_PORT_HIST_LIST) 156 break; 157 158 sw->cq_ring_space[cq]--; 159 160 qid->stats.tx_pkts++; 161 162 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1)); 163 p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id); 164 p->hist_list[head].qid = qid_id; 165 166 if (keep_order) 167 rte_ring_sc_dequeue(qid->reorder_buffer_freelist, 168 (void *)&p->hist_list[head].rob_entry); 169 170 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; 171 iq_pop(sw, &qid->iq[iq_num]); 172 173 rte_compiler_barrier(); 174 p->inflights++; 175 p->stats.tx_pkts++; 176 p->hist_head++; 177 } 178 exit: 179 qid->cq_next_tx = cq_idx; 180 return i; 181 } 182 183 static uint32_t 184 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 185 uint32_t iq_num, unsigned int count __rte_unused) 186 { 187 uint32_t cq_id = qid->cq_map[0]; 188 struct sw_port *port = &sw->ports[cq_id]; 189 190 /* get max burst enq size for cq_ring */ 191 uint32_t count_free = sw->cq_ring_space[cq_id]; 192 if (count_free == 0) 193 return 0; 194 195 /* burst dequeue from the QID IQ ring */ 196 struct sw_iq *iq = &qid->iq[iq_num]; 197 uint32_t ret = iq_dequeue_burst(sw, iq, 198 &port->cq_buf[port->cq_buf_count], count_free); 199 port->cq_buf_count += ret; 200 201 /* Update QID, Port and Total TX stats */ 202 qid->stats.tx_pkts += ret; 203 port->stats.tx_pkts += ret; 204 205 /* Subtract credits from cached value */ 206 sw->cq_ring_space[cq_id] -= ret; 207 208 return ret; 209 } 210 211 static uint32_t 212 sw_schedule_qid_to_cq(struct sw_evdev *sw) 213 { 214 uint32_t pkts = 0; 215 uint32_t qid_idx; 216 217 sw->sched_cq_qid_called++; 218 219 for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) { 220 struct sw_qid *qid = sw->qids_prioritized[qid_idx]; 221 222 int type = qid->type; 223 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask); 224 225 /* zero mapped CQs indicates directed */ 226 if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0) 227 continue; 228 229 uint32_t pkts_done = 0; 230 uint32_t count = iq_count(&qid->iq[iq_num]); 231 232 if (count > 0) { 233 if (type == SW_SCHED_TYPE_DIRECT) 234 pkts_done += sw_schedule_dir_to_cq(sw, qid, 235 iq_num, count); 236 else if (type == RTE_SCHED_TYPE_ATOMIC) 237 pkts_done += sw_schedule_atomic_to_cq(sw, qid, 238 iq_num, count); 239 else 240 pkts_done += sw_schedule_parallel_to_cq(sw, qid, 241 iq_num, count, 242 type == RTE_SCHED_TYPE_ORDERED); 243 } 244 245 /* Check if the IQ that was polled is now empty, and unset it 246 * in the IQ mask if its empty. 247 */ 248 int all_done = (pkts_done == count); 249 250 qid->iq_pkt_mask &= ~(all_done << (iq_num)); 251 pkts += pkts_done; 252 } 253 254 return pkts; 255 } 256 257 /* This function will perform re-ordering of packets, and injecting into 258 * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT* 259 * contiguous in that array, this function accepts a "range" of QIDs to scan. 260 */ 261 static uint16_t 262 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) 263 { 264 /* Perform egress reordering */ 265 struct rte_event *qe; 266 uint32_t pkts_iter = 0; 267 268 for (; qid_start < qid_end; qid_start++) { 269 struct sw_qid *qid = &sw->qids[qid_start]; 270 int i, num_entries_in_use; 271 272 if (qid->type != RTE_SCHED_TYPE_ORDERED) 273 continue; 274 275 num_entries_in_use = rte_ring_free_count( 276 qid->reorder_buffer_freelist); 277 278 for (i = 0; i < num_entries_in_use; i++) { 279 struct reorder_buffer_entry *entry; 280 int j; 281 282 entry = &qid->reorder_buffer[qid->reorder_buffer_index]; 283 284 if (!entry->ready) 285 break; 286 287 for (j = 0; j < entry->num_fragments; j++) { 288 uint16_t dest_qid; 289 uint16_t dest_iq; 290 291 int idx = entry->fragment_index + j; 292 qe = &entry->fragments[idx]; 293 294 dest_qid = qe->queue_id; 295 dest_iq = PRIO_TO_IQ(qe->priority); 296 297 if (dest_qid >= sw->qid_count) { 298 sw->stats.rx_dropped++; 299 continue; 300 } 301 302 pkts_iter++; 303 304 struct sw_qid *q = &sw->qids[dest_qid]; 305 struct sw_iq *iq = &q->iq[dest_iq]; 306 307 /* we checked for space above, so enqueue must 308 * succeed 309 */ 310 iq_enqueue(sw, iq, qe); 311 q->iq_pkt_mask |= (1 << (dest_iq)); 312 q->iq_pkt_count[dest_iq]++; 313 q->stats.rx_pkts++; 314 } 315 316 entry->ready = (j != entry->num_fragments); 317 entry->num_fragments -= j; 318 entry->fragment_index += j; 319 320 if (!entry->ready) { 321 entry->fragment_index = 0; 322 323 rte_ring_sp_enqueue( 324 qid->reorder_buffer_freelist, 325 entry); 326 327 qid->reorder_buffer_index++; 328 qid->reorder_buffer_index %= qid->window_size; 329 } 330 } 331 } 332 return pkts_iter; 333 } 334 335 static __rte_always_inline void 336 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) 337 { 338 RTE_SET_USED(sw); 339 struct rte_event_ring *worker = port->rx_worker_ring; 340 port->pp_buf_start = 0; 341 port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf, 342 RTE_DIM(port->pp_buf), NULL); 343 } 344 345 static __rte_always_inline uint32_t 346 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) 347 { 348 static struct reorder_buffer_entry dummy_rob; 349 uint32_t pkts_iter = 0; 350 struct sw_port *port = &sw->ports[port_id]; 351 352 /* If shadow ring has 0 pkts, pull from worker ring */ 353 if (port->pp_buf_count == 0) 354 sw_refill_pp_buf(sw, port); 355 356 while (port->pp_buf_count) { 357 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; 358 struct sw_hist_list_entry *hist_entry = NULL; 359 uint8_t flags = qe->op; 360 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP); 361 int needs_reorder = 0; 362 /* if no-reordering, having PARTIAL == NEW */ 363 if (!allow_reorder && !eop) 364 flags = QE_FLAG_VALID; 365 366 /* 367 * if we don't have space for this packet in an IQ, 368 * then move on to next queue. Technically, for a 369 * packet that needs reordering, we don't need to check 370 * here, but it simplifies things not to special-case 371 */ 372 uint32_t iq_num = PRIO_TO_IQ(qe->priority); 373 struct sw_qid *qid = &sw->qids[qe->queue_id]; 374 375 /* now process based on flags. Note that for directed 376 * queues, the enqueue_flush masks off all but the 377 * valid flag. This makes FWD and PARTIAL enqueues just 378 * NEW type, and makes DROPS no-op calls. 379 */ 380 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) { 381 const uint32_t hist_tail = port->hist_tail & 382 (SW_PORT_HIST_LIST - 1); 383 384 hist_entry = &port->hist_list[hist_tail]; 385 const uint32_t hist_qid = hist_entry->qid; 386 const uint32_t hist_fid = hist_entry->fid; 387 388 struct sw_fid_t *fid = 389 &sw->qids[hist_qid].fids[hist_fid]; 390 fid->pcount -= eop; 391 if (fid->pcount == 0) 392 fid->cq = -1; 393 394 if (allow_reorder) { 395 /* set reorder ready if an ordered QID */ 396 uintptr_t rob_ptr = 397 (uintptr_t)hist_entry->rob_entry; 398 const uintptr_t valid = (rob_ptr != 0); 399 needs_reorder = valid; 400 rob_ptr |= 401 ((valid - 1) & (uintptr_t)&dummy_rob); 402 struct reorder_buffer_entry *tmp_rob_ptr = 403 (struct reorder_buffer_entry *)rob_ptr; 404 tmp_rob_ptr->ready = eop * needs_reorder; 405 } 406 407 port->inflights -= eop; 408 port->hist_tail += eop; 409 } 410 if (flags & QE_FLAG_VALID) { 411 port->stats.rx_pkts++; 412 413 if (allow_reorder && needs_reorder) { 414 struct reorder_buffer_entry *rob_entry = 415 hist_entry->rob_entry; 416 417 hist_entry->rob_entry = NULL; 418 /* Although fragmentation not currently 419 * supported by eventdev API, we support it 420 * here. Open: How do we alert the user that 421 * they've exceeded max frags? 422 */ 423 int num_frag = rob_entry->num_fragments; 424 if (num_frag == SW_FRAGMENTS_MAX) 425 sw->stats.rx_dropped++; 426 else { 427 int idx = rob_entry->num_fragments++; 428 rob_entry->fragments[idx] = *qe; 429 } 430 goto end_qe; 431 } 432 433 /* Use the iq_num from above to push the QE 434 * into the qid at the right priority 435 */ 436 437 qid->iq_pkt_mask |= (1 << (iq_num)); 438 iq_enqueue(sw, &qid->iq[iq_num], qe); 439 qid->iq_pkt_count[iq_num]++; 440 qid->stats.rx_pkts++; 441 pkts_iter++; 442 } 443 444 end_qe: 445 port->pp_buf_start++; 446 port->pp_buf_count--; 447 } /* while (avail_qes) */ 448 449 return pkts_iter; 450 } 451 452 static uint32_t 453 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id) 454 { 455 return __pull_port_lb(sw, port_id, 1); 456 } 457 458 static uint32_t 459 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id) 460 { 461 return __pull_port_lb(sw, port_id, 0); 462 } 463 464 static uint32_t 465 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) 466 { 467 uint32_t pkts_iter = 0; 468 struct sw_port *port = &sw->ports[port_id]; 469 470 /* If shadow ring has 0 pkts, pull from worker ring */ 471 if (port->pp_buf_count == 0) 472 sw_refill_pp_buf(sw, port); 473 474 while (port->pp_buf_count) { 475 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; 476 uint8_t flags = qe->op; 477 478 if ((flags & QE_FLAG_VALID) == 0) 479 goto end_qe; 480 481 uint32_t iq_num = PRIO_TO_IQ(qe->priority); 482 struct sw_qid *qid = &sw->qids[qe->queue_id]; 483 struct sw_iq *iq = &qid->iq[iq_num]; 484 485 port->stats.rx_pkts++; 486 487 /* Use the iq_num from above to push the QE 488 * into the qid at the right priority 489 */ 490 qid->iq_pkt_mask |= (1 << (iq_num)); 491 iq_enqueue(sw, iq, qe); 492 qid->iq_pkt_count[iq_num]++; 493 qid->stats.rx_pkts++; 494 pkts_iter++; 495 496 end_qe: 497 port->pp_buf_start++; 498 port->pp_buf_count--; 499 } /* while port->pp_buf_count */ 500 501 return pkts_iter; 502 } 503 504 void 505 sw_event_schedule(struct rte_eventdev *dev) 506 { 507 struct sw_evdev *sw = sw_pmd_priv(dev); 508 uint32_t in_pkts, out_pkts; 509 uint32_t out_pkts_total = 0, in_pkts_total = 0; 510 int32_t sched_quanta = sw->sched_quanta; 511 uint32_t i; 512 513 sw->sched_called++; 514 if (unlikely(!sw->started)) 515 return; 516 517 do { 518 uint32_t in_pkts_this_iteration = 0; 519 520 /* Pull from rx_ring for ports */ 521 do { 522 in_pkts = 0; 523 for (i = 0; i < sw->port_count; i++) { 524 /* ack the unlinks in progress as done */ 525 if (sw->ports[i].unlinks_in_progress) 526 sw->ports[i].unlinks_in_progress = 0; 527 528 if (sw->ports[i].is_directed) 529 in_pkts += sw_schedule_pull_port_dir(sw, i); 530 else if (sw->ports[i].num_ordered_qids > 0) 531 in_pkts += sw_schedule_pull_port_lb(sw, i); 532 else 533 in_pkts += sw_schedule_pull_port_no_reorder(sw, i); 534 } 535 536 /* QID scan for re-ordered */ 537 in_pkts += sw_schedule_reorder(sw, 0, 538 sw->qid_count); 539 in_pkts_this_iteration += in_pkts; 540 } while (in_pkts > 4 && 541 (int)in_pkts_this_iteration < sched_quanta); 542 543 out_pkts = sw_schedule_qid_to_cq(sw); 544 out_pkts_total += out_pkts; 545 in_pkts_total += in_pkts_this_iteration; 546 547 if (in_pkts == 0 && out_pkts == 0) 548 break; 549 } while ((int)out_pkts_total < sched_quanta); 550 551 sw->stats.tx_pkts += out_pkts_total; 552 sw->stats.rx_pkts += in_pkts_total; 553 554 sw->sched_no_iq_enqueues += (in_pkts_total == 0); 555 sw->sched_no_cq_enqueues += (out_pkts_total == 0); 556 557 /* push all the internal buffered QEs in port->cq_ring to the 558 * worker cores: aka, do the ring transfers batched. 559 */ 560 for (i = 0; i < sw->port_count; i++) { 561 struct rte_event_ring *worker = sw->ports[i].cq_worker_ring; 562 rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf, 563 sw->ports[i].cq_buf_count, 564 &sw->cq_ring_space[i]); 565 sw->ports[i].cq_buf_count = 0; 566 } 567 568 } 569