1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2017 Intel Corporation 3 */ 4 5 #include <rte_ring.h> 6 #include <rte_hash_crc.h> 7 #include <rte_event_ring.h> 8 #include "sw_evdev.h" 9 #include "iq_chunk.h" 10 11 #define SW_IQS_MASK (SW_IQS_MAX-1) 12 13 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the 14 * CLZ twice is faster than caching the value due to data dependencies 15 */ 16 #define PKT_MASK_TO_IQ(pkts) \ 17 (__builtin_ctz(pkts | (1 << SW_IQS_MAX))) 18 19 #if SW_IQS_MAX != 4 20 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change 21 #endif 22 #define PRIO_TO_IQ(prio) (prio >> 6) 23 24 #define MAX_PER_IQ_DEQUEUE 48 25 #define FLOWID_MASK (SW_QID_NUM_FIDS-1) 26 /* use cheap bit mixing, we only need to lose a few bits */ 27 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK) 28 29 static inline uint32_t 30 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 31 uint32_t iq_num, unsigned int count) 32 { 33 struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */ 34 struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE]; 35 uint32_t nb_blocked = 0; 36 uint32_t i; 37 38 if (count > MAX_PER_IQ_DEQUEUE) 39 count = MAX_PER_IQ_DEQUEUE; 40 41 /* This is the QID ID. The QID ID is static, hence it can be 42 * used to identify the stage of processing in history lists etc 43 */ 44 uint32_t qid_id = qid->id; 45 46 iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count); 47 for (i = 0; i < count; i++) { 48 const struct rte_event *qe = &qes[i]; 49 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id); 50 struct sw_fid_t *fid = &qid->fids[flow_id]; 51 int cq = fid->cq; 52 53 if (cq < 0) { 54 uint32_t cq_idx = qid->cq_next_tx++; 55 if (qid->cq_next_tx == qid->cq_num_mapped_cqs) 56 qid->cq_next_tx = 0; 57 cq = qid->cq_map[cq_idx]; 58 59 /* find least used */ 60 int cq_free_cnt = sw->cq_ring_space[cq]; 61 for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs; 62 cq_idx++) { 63 int test_cq = qid->cq_map[cq_idx]; 64 int test_cq_free = sw->cq_ring_space[test_cq]; 65 if (test_cq_free > cq_free_cnt) { 66 cq = test_cq; 67 cq_free_cnt = test_cq_free; 68 } 69 } 70 71 fid->cq = cq; /* this pins early */ 72 } 73 74 if (sw->cq_ring_space[cq] == 0 || 75 sw->ports[cq].inflights == SW_PORT_HIST_LIST) { 76 blocked_qes[nb_blocked++] = *qe; 77 continue; 78 } 79 80 struct sw_port *p = &sw->ports[cq]; 81 82 /* at this point we can queue up the packet on the cq_buf */ 83 fid->pcount++; 84 p->cq_buf[p->cq_buf_count++] = *qe; 85 p->inflights++; 86 sw->cq_ring_space[cq]--; 87 88 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1)); 89 p->hist_list[head].fid = flow_id; 90 p->hist_list[head].qid = qid_id; 91 92 p->stats.tx_pkts++; 93 qid->stats.tx_pkts++; 94 qid->to_port[cq]++; 95 96 /* if we just filled in the last slot, flush the buffer */ 97 if (sw->cq_ring_space[cq] == 0) { 98 struct rte_event_ring *worker = p->cq_worker_ring; 99 rte_event_ring_enqueue_burst(worker, p->cq_buf, 100 p->cq_buf_count, 101 &sw->cq_ring_space[cq]); 102 p->cq_buf_count = 0; 103 } 104 } 105 iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked); 106 107 return count - nb_blocked; 108 } 109 110 static inline uint32_t 111 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 112 uint32_t iq_num, unsigned int count, int keep_order) 113 { 114 uint32_t i; 115 uint32_t cq_idx = qid->cq_next_tx; 116 117 /* This is the QID ID. The QID ID is static, hence it can be 118 * used to identify the stage of processing in history lists etc 119 */ 120 uint32_t qid_id = qid->id; 121 122 if (count > MAX_PER_IQ_DEQUEUE) 123 count = MAX_PER_IQ_DEQUEUE; 124 125 if (keep_order) 126 /* only schedule as many as we have reorder buffer entries */ 127 count = RTE_MIN(count, 128 rte_ring_count(qid->reorder_buffer_freelist)); 129 130 for (i = 0; i < count; i++) { 131 const struct rte_event *qe = iq_peek(&qid->iq[iq_num]); 132 uint32_t cq_check_count = 0; 133 uint32_t cq; 134 135 /* 136 * for parallel, just send to next available CQ in round-robin 137 * fashion. So scan for an available CQ. If all CQs are full 138 * just return and move on to next QID 139 */ 140 do { 141 if (++cq_check_count > qid->cq_num_mapped_cqs) 142 goto exit; 143 cq = qid->cq_map[cq_idx]; 144 if (++cq_idx == qid->cq_num_mapped_cqs) 145 cq_idx = 0; 146 } while (rte_event_ring_free_count( 147 sw->ports[cq].cq_worker_ring) == 0 || 148 sw->ports[cq].inflights == SW_PORT_HIST_LIST); 149 150 struct sw_port *p = &sw->ports[cq]; 151 if (sw->cq_ring_space[cq] == 0 || 152 p->inflights == SW_PORT_HIST_LIST) 153 break; 154 155 sw->cq_ring_space[cq]--; 156 157 qid->stats.tx_pkts++; 158 159 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1)); 160 p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id); 161 p->hist_list[head].qid = qid_id; 162 163 if (keep_order) 164 rte_ring_sc_dequeue(qid->reorder_buffer_freelist, 165 (void *)&p->hist_list[head].rob_entry); 166 167 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; 168 iq_pop(sw, &qid->iq[iq_num]); 169 170 rte_compiler_barrier(); 171 p->inflights++; 172 p->stats.tx_pkts++; 173 p->hist_head++; 174 } 175 exit: 176 qid->cq_next_tx = cq_idx; 177 return i; 178 } 179 180 static uint32_t 181 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 182 uint32_t iq_num, unsigned int count __rte_unused) 183 { 184 uint32_t cq_id = qid->cq_map[0]; 185 struct sw_port *port = &sw->ports[cq_id]; 186 187 /* get max burst enq size for cq_ring */ 188 uint32_t count_free = sw->cq_ring_space[cq_id]; 189 if (count_free == 0) 190 return 0; 191 192 /* burst dequeue from the QID IQ ring */ 193 struct sw_iq *iq = &qid->iq[iq_num]; 194 uint32_t ret = iq_dequeue_burst(sw, iq, 195 &port->cq_buf[port->cq_buf_count], count_free); 196 port->cq_buf_count += ret; 197 198 /* Update QID, Port and Total TX stats */ 199 qid->stats.tx_pkts += ret; 200 port->stats.tx_pkts += ret; 201 202 /* Subtract credits from cached value */ 203 sw->cq_ring_space[cq_id] -= ret; 204 205 return ret; 206 } 207 208 static uint32_t 209 sw_schedule_qid_to_cq(struct sw_evdev *sw) 210 { 211 uint32_t pkts = 0; 212 uint32_t qid_idx; 213 214 sw->sched_cq_qid_called++; 215 216 for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) { 217 struct sw_qid *qid = sw->qids_prioritized[qid_idx]; 218 219 int type = qid->type; 220 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask); 221 222 /* zero mapped CQs indicates directed */ 223 if (iq_num >= SW_IQS_MAX) 224 continue; 225 226 uint32_t pkts_done = 0; 227 uint32_t count = iq_count(&qid->iq[iq_num]); 228 229 if (count > 0) { 230 if (type == SW_SCHED_TYPE_DIRECT) 231 pkts_done += sw_schedule_dir_to_cq(sw, qid, 232 iq_num, count); 233 else if (type == RTE_SCHED_TYPE_ATOMIC) 234 pkts_done += sw_schedule_atomic_to_cq(sw, qid, 235 iq_num, count); 236 else 237 pkts_done += sw_schedule_parallel_to_cq(sw, qid, 238 iq_num, count, 239 type == RTE_SCHED_TYPE_ORDERED); 240 } 241 242 /* Check if the IQ that was polled is now empty, and unset it 243 * in the IQ mask if its empty. 244 */ 245 int all_done = (pkts_done == count); 246 247 qid->iq_pkt_mask &= ~(all_done << (iq_num)); 248 pkts += pkts_done; 249 } 250 251 return pkts; 252 } 253 254 /* This function will perform re-ordering of packets, and injecting into 255 * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT* 256 * contiguous in that array, this function accepts a "range" of QIDs to scan. 257 */ 258 static uint16_t 259 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) 260 { 261 /* Perform egress reordering */ 262 struct rte_event *qe; 263 uint32_t pkts_iter = 0; 264 265 for (; qid_start < qid_end; qid_start++) { 266 struct sw_qid *qid = &sw->qids[qid_start]; 267 int i, num_entries_in_use; 268 269 if (qid->type != RTE_SCHED_TYPE_ORDERED) 270 continue; 271 272 num_entries_in_use = rte_ring_free_count( 273 qid->reorder_buffer_freelist); 274 275 for (i = 0; i < num_entries_in_use; i++) { 276 struct reorder_buffer_entry *entry; 277 int j; 278 279 entry = &qid->reorder_buffer[qid->reorder_buffer_index]; 280 281 if (!entry->ready) 282 break; 283 284 for (j = 0; j < entry->num_fragments; j++) { 285 uint16_t dest_qid; 286 uint16_t dest_iq; 287 288 int idx = entry->fragment_index + j; 289 qe = &entry->fragments[idx]; 290 291 dest_qid = qe->queue_id; 292 dest_iq = PRIO_TO_IQ(qe->priority); 293 294 if (dest_qid >= sw->qid_count) { 295 sw->stats.rx_dropped++; 296 continue; 297 } 298 299 pkts_iter++; 300 301 struct sw_qid *q = &sw->qids[dest_qid]; 302 struct sw_iq *iq = &q->iq[dest_iq]; 303 304 /* we checked for space above, so enqueue must 305 * succeed 306 */ 307 iq_enqueue(sw, iq, qe); 308 q->iq_pkt_mask |= (1 << (dest_iq)); 309 q->iq_pkt_count[dest_iq]++; 310 q->stats.rx_pkts++; 311 } 312 313 entry->ready = (j != entry->num_fragments); 314 entry->num_fragments -= j; 315 entry->fragment_index += j; 316 317 if (!entry->ready) { 318 entry->fragment_index = 0; 319 320 rte_ring_sp_enqueue( 321 qid->reorder_buffer_freelist, 322 entry); 323 324 qid->reorder_buffer_index++; 325 qid->reorder_buffer_index %= qid->window_size; 326 } 327 } 328 } 329 return pkts_iter; 330 } 331 332 static __rte_always_inline void 333 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) 334 { 335 RTE_SET_USED(sw); 336 struct rte_event_ring *worker = port->rx_worker_ring; 337 port->pp_buf_start = 0; 338 port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf, 339 RTE_DIM(port->pp_buf), NULL); 340 } 341 342 static __rte_always_inline uint32_t 343 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) 344 { 345 static struct reorder_buffer_entry dummy_rob; 346 uint32_t pkts_iter = 0; 347 struct sw_port *port = &sw->ports[port_id]; 348 349 /* If shadow ring has 0 pkts, pull from worker ring */ 350 if (port->pp_buf_count == 0) 351 sw_refill_pp_buf(sw, port); 352 353 while (port->pp_buf_count) { 354 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; 355 struct sw_hist_list_entry *hist_entry = NULL; 356 uint8_t flags = qe->op; 357 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP); 358 int needs_reorder = 0; 359 /* if no-reordering, having PARTIAL == NEW */ 360 if (!allow_reorder && !eop) 361 flags = QE_FLAG_VALID; 362 363 /* 364 * if we don't have space for this packet in an IQ, 365 * then move on to next queue. Technically, for a 366 * packet that needs reordering, we don't need to check 367 * here, but it simplifies things not to special-case 368 */ 369 uint32_t iq_num = PRIO_TO_IQ(qe->priority); 370 struct sw_qid *qid = &sw->qids[qe->queue_id]; 371 372 /* now process based on flags. Note that for directed 373 * queues, the enqueue_flush masks off all but the 374 * valid flag. This makes FWD and PARTIAL enqueues just 375 * NEW type, and makes DROPS no-op calls. 376 */ 377 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) { 378 const uint32_t hist_tail = port->hist_tail & 379 (SW_PORT_HIST_LIST - 1); 380 381 hist_entry = &port->hist_list[hist_tail]; 382 const uint32_t hist_qid = hist_entry->qid; 383 const uint32_t hist_fid = hist_entry->fid; 384 385 struct sw_fid_t *fid = 386 &sw->qids[hist_qid].fids[hist_fid]; 387 fid->pcount -= eop; 388 if (fid->pcount == 0) 389 fid->cq = -1; 390 391 if (allow_reorder) { 392 /* set reorder ready if an ordered QID */ 393 uintptr_t rob_ptr = 394 (uintptr_t)hist_entry->rob_entry; 395 const uintptr_t valid = (rob_ptr != 0); 396 needs_reorder = valid; 397 rob_ptr |= 398 ((valid - 1) & (uintptr_t)&dummy_rob); 399 struct reorder_buffer_entry *tmp_rob_ptr = 400 (struct reorder_buffer_entry *)rob_ptr; 401 tmp_rob_ptr->ready = eop * needs_reorder; 402 } 403 404 port->inflights -= eop; 405 port->hist_tail += eop; 406 } 407 if (flags & QE_FLAG_VALID) { 408 port->stats.rx_pkts++; 409 410 if (allow_reorder && needs_reorder) { 411 struct reorder_buffer_entry *rob_entry = 412 hist_entry->rob_entry; 413 414 hist_entry->rob_entry = NULL; 415 /* Although fragmentation not currently 416 * supported by eventdev API, we support it 417 * here. Open: How do we alert the user that 418 * they've exceeded max frags? 419 */ 420 int num_frag = rob_entry->num_fragments; 421 if (num_frag == SW_FRAGMENTS_MAX) 422 sw->stats.rx_dropped++; 423 else { 424 int idx = rob_entry->num_fragments++; 425 rob_entry->fragments[idx] = *qe; 426 } 427 goto end_qe; 428 } 429 430 /* Use the iq_num from above to push the QE 431 * into the qid at the right priority 432 */ 433 434 qid->iq_pkt_mask |= (1 << (iq_num)); 435 iq_enqueue(sw, &qid->iq[iq_num], qe); 436 qid->iq_pkt_count[iq_num]++; 437 qid->stats.rx_pkts++; 438 pkts_iter++; 439 } 440 441 end_qe: 442 port->pp_buf_start++; 443 port->pp_buf_count--; 444 } /* while (avail_qes) */ 445 446 return pkts_iter; 447 } 448 449 static uint32_t 450 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id) 451 { 452 return __pull_port_lb(sw, port_id, 1); 453 } 454 455 static uint32_t 456 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id) 457 { 458 return __pull_port_lb(sw, port_id, 0); 459 } 460 461 static uint32_t 462 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) 463 { 464 uint32_t pkts_iter = 0; 465 struct sw_port *port = &sw->ports[port_id]; 466 467 /* If shadow ring has 0 pkts, pull from worker ring */ 468 if (port->pp_buf_count == 0) 469 sw_refill_pp_buf(sw, port); 470 471 while (port->pp_buf_count) { 472 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; 473 uint8_t flags = qe->op; 474 475 if ((flags & QE_FLAG_VALID) == 0) 476 goto end_qe; 477 478 uint32_t iq_num = PRIO_TO_IQ(qe->priority); 479 struct sw_qid *qid = &sw->qids[qe->queue_id]; 480 struct sw_iq *iq = &qid->iq[iq_num]; 481 482 port->stats.rx_pkts++; 483 484 /* Use the iq_num from above to push the QE 485 * into the qid at the right priority 486 */ 487 qid->iq_pkt_mask |= (1 << (iq_num)); 488 iq_enqueue(sw, iq, qe); 489 qid->iq_pkt_count[iq_num]++; 490 qid->stats.rx_pkts++; 491 pkts_iter++; 492 493 end_qe: 494 port->pp_buf_start++; 495 port->pp_buf_count--; 496 } /* while port->pp_buf_count */ 497 498 return pkts_iter; 499 } 500 501 void 502 sw_event_schedule(struct rte_eventdev *dev) 503 { 504 struct sw_evdev *sw = sw_pmd_priv(dev); 505 uint32_t in_pkts, out_pkts; 506 uint32_t out_pkts_total = 0, in_pkts_total = 0; 507 int32_t sched_quanta = sw->sched_quanta; 508 uint32_t i; 509 510 sw->sched_called++; 511 if (!sw->started) 512 return; 513 514 do { 515 uint32_t in_pkts_this_iteration = 0; 516 517 /* Pull from rx_ring for ports */ 518 do { 519 in_pkts = 0; 520 for (i = 0; i < sw->port_count; i++) 521 if (sw->ports[i].is_directed) 522 in_pkts += sw_schedule_pull_port_dir(sw, i); 523 else if (sw->ports[i].num_ordered_qids > 0) 524 in_pkts += sw_schedule_pull_port_lb(sw, i); 525 else 526 in_pkts += sw_schedule_pull_port_no_reorder(sw, i); 527 528 /* QID scan for re-ordered */ 529 in_pkts += sw_schedule_reorder(sw, 0, 530 sw->qid_count); 531 in_pkts_this_iteration += in_pkts; 532 } while (in_pkts > 4 && 533 (int)in_pkts_this_iteration < sched_quanta); 534 535 out_pkts = 0; 536 out_pkts += sw_schedule_qid_to_cq(sw); 537 out_pkts_total += out_pkts; 538 in_pkts_total += in_pkts_this_iteration; 539 540 if (in_pkts == 0 && out_pkts == 0) 541 break; 542 } while ((int)out_pkts_total < sched_quanta); 543 544 /* push all the internal buffered QEs in port->cq_ring to the 545 * worker cores: aka, do the ring transfers batched. 546 */ 547 for (i = 0; i < sw->port_count; i++) { 548 struct rte_event_ring *worker = sw->ports[i].cq_worker_ring; 549 rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf, 550 sw->ports[i].cq_buf_count, 551 &sw->cq_ring_space[i]); 552 sw->ports[i].cq_buf_count = 0; 553 } 554 555 sw->stats.tx_pkts += out_pkts_total; 556 sw->stats.rx_pkts += in_pkts_total; 557 558 sw->sched_no_iq_enqueues += (in_pkts_total == 0); 559 sw->sched_no_cq_enqueues += (out_pkts_total == 0); 560 561 } 562