1 /*- 2 * BSD LICENSE 3 * 4 * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 10 * * Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * * Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in 14 * the documentation and/or other materials provided with the 15 * distribution. 16 * * Neither the name of Intel Corporation nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <rte_ring.h> 34 #include <rte_hash_crc.h> 35 #include "sw_evdev.h" 36 #include "iq_ring.h" 37 #include "event_ring.h" 38 39 #define SW_IQS_MASK (SW_IQS_MAX-1) 40 41 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the 42 * CLZ twice is faster than caching the value due to data dependencies 43 */ 44 #define PKT_MASK_TO_IQ(pkts) \ 45 (__builtin_ctz(pkts | (1 << SW_IQS_MAX))) 46 47 #if SW_IQS_MAX != 4 48 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change 49 #endif 50 #define PRIO_TO_IQ(prio) (prio >> 6) 51 52 #define MAX_PER_IQ_DEQUEUE 48 53 #define FLOWID_MASK (SW_QID_NUM_FIDS-1) 54 /* use cheap bit mixing, we only need to lose a few bits */ 55 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK) 56 57 static inline uint32_t 58 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 59 uint32_t iq_num, unsigned int count) 60 { 61 struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */ 62 struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE]; 63 uint32_t nb_blocked = 0; 64 uint32_t i; 65 66 if (count > MAX_PER_IQ_DEQUEUE) 67 count = MAX_PER_IQ_DEQUEUE; 68 69 /* This is the QID ID. The QID ID is static, hence it can be 70 * used to identify the stage of processing in history lists etc 71 */ 72 uint32_t qid_id = qid->id; 73 74 iq_ring_dequeue_burst(qid->iq[iq_num], qes, count); 75 for (i = 0; i < count; i++) { 76 const struct rte_event *qe = &qes[i]; 77 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id); 78 struct sw_fid_t *fid = &qid->fids[flow_id]; 79 int cq = fid->cq; 80 81 if (cq < 0) { 82 uint32_t cq_idx = qid->cq_next_tx++; 83 if (qid->cq_next_tx == qid->cq_num_mapped_cqs) 84 qid->cq_next_tx = 0; 85 cq = qid->cq_map[cq_idx]; 86 87 /* find least used */ 88 int cq_free_cnt = sw->cq_ring_space[cq]; 89 for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs; 90 cq_idx++) { 91 int test_cq = qid->cq_map[cq_idx]; 92 int test_cq_free = sw->cq_ring_space[test_cq]; 93 if (test_cq_free > cq_free_cnt) { 94 cq = test_cq; 95 cq_free_cnt = test_cq_free; 96 } 97 } 98 99 fid->cq = cq; /* this pins early */ 100 } 101 102 if (sw->cq_ring_space[cq] == 0 || 103 sw->ports[cq].inflights == SW_PORT_HIST_LIST) { 104 blocked_qes[nb_blocked++] = *qe; 105 continue; 106 } 107 108 struct sw_port *p = &sw->ports[cq]; 109 110 /* at this point we can queue up the packet on the cq_buf */ 111 fid->pcount++; 112 p->cq_buf[p->cq_buf_count++] = *qe; 113 p->inflights++; 114 sw->cq_ring_space[cq]--; 115 116 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1)); 117 p->hist_list[head].fid = flow_id; 118 p->hist_list[head].qid = qid_id; 119 120 p->stats.tx_pkts++; 121 qid->stats.tx_pkts++; 122 qid->to_port[cq]++; 123 124 /* if we just filled in the last slot, flush the buffer */ 125 if (sw->cq_ring_space[cq] == 0) { 126 struct qe_ring *worker = p->cq_worker_ring; 127 qe_ring_enqueue_burst(worker, p->cq_buf, 128 p->cq_buf_count, 129 &sw->cq_ring_space[cq]); 130 p->cq_buf_count = 0; 131 } 132 } 133 iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked); 134 135 return count - nb_blocked; 136 } 137 138 static inline uint32_t 139 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 140 uint32_t iq_num, unsigned int count, int keep_order) 141 { 142 uint32_t i; 143 uint32_t cq_idx = qid->cq_next_tx; 144 145 /* This is the QID ID. The QID ID is static, hence it can be 146 * used to identify the stage of processing in history lists etc 147 */ 148 uint32_t qid_id = qid->id; 149 150 if (count > MAX_PER_IQ_DEQUEUE) 151 count = MAX_PER_IQ_DEQUEUE; 152 153 if (keep_order) 154 /* only schedule as many as we have reorder buffer entries */ 155 count = RTE_MIN(count, 156 rte_ring_count(qid->reorder_buffer_freelist)); 157 158 for (i = 0; i < count; i++) { 159 const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]); 160 uint32_t cq_check_count = 0; 161 uint32_t cq; 162 163 /* 164 * for parallel, just send to next available CQ in round-robin 165 * fashion. So scan for an available CQ. If all CQs are full 166 * just return and move on to next QID 167 */ 168 do { 169 if (++cq_check_count > qid->cq_num_mapped_cqs) 170 goto exit; 171 cq = qid->cq_map[cq_idx]; 172 if (++cq_idx == qid->cq_num_mapped_cqs) 173 cq_idx = 0; 174 } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 || 175 sw->ports[cq].inflights == SW_PORT_HIST_LIST); 176 177 struct sw_port *p = &sw->ports[cq]; 178 if (sw->cq_ring_space[cq] == 0 || 179 p->inflights == SW_PORT_HIST_LIST) 180 break; 181 182 sw->cq_ring_space[cq]--; 183 184 qid->stats.tx_pkts++; 185 186 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1)); 187 p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id); 188 p->hist_list[head].qid = qid_id; 189 190 if (keep_order) 191 rte_ring_sc_dequeue(qid->reorder_buffer_freelist, 192 (void *)&p->hist_list[head].rob_entry); 193 194 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; 195 iq_ring_pop(qid->iq[iq_num]); 196 197 rte_compiler_barrier(); 198 p->inflights++; 199 p->stats.tx_pkts++; 200 p->hist_head++; 201 } 202 exit: 203 qid->cq_next_tx = cq_idx; 204 return i; 205 } 206 207 static uint32_t 208 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, 209 uint32_t iq_num, unsigned int count __rte_unused) 210 { 211 uint32_t cq_id = qid->cq_map[0]; 212 struct sw_port *port = &sw->ports[cq_id]; 213 214 /* get max burst enq size for cq_ring */ 215 uint32_t count_free = sw->cq_ring_space[cq_id]; 216 if (count_free == 0) 217 return 0; 218 219 /* burst dequeue from the QID IQ ring */ 220 struct iq_ring *ring = qid->iq[iq_num]; 221 uint32_t ret = iq_ring_dequeue_burst(ring, 222 &port->cq_buf[port->cq_buf_count], count_free); 223 port->cq_buf_count += ret; 224 225 /* Update QID, Port and Total TX stats */ 226 qid->stats.tx_pkts += ret; 227 port->stats.tx_pkts += ret; 228 229 /* Subtract credits from cached value */ 230 sw->cq_ring_space[cq_id] -= ret; 231 232 return ret; 233 } 234 235 static uint32_t 236 sw_schedule_qid_to_cq(struct sw_evdev *sw) 237 { 238 uint32_t pkts = 0; 239 uint32_t qid_idx; 240 241 sw->sched_cq_qid_called++; 242 243 for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) { 244 struct sw_qid *qid = sw->qids_prioritized[qid_idx]; 245 246 int type = qid->type; 247 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask); 248 249 /* zero mapped CQs indicates directed */ 250 if (iq_num >= SW_IQS_MAX) 251 continue; 252 253 uint32_t pkts_done = 0; 254 uint32_t count = iq_ring_count(qid->iq[iq_num]); 255 256 if (count > 0) { 257 if (type == SW_SCHED_TYPE_DIRECT) 258 pkts_done += sw_schedule_dir_to_cq(sw, qid, 259 iq_num, count); 260 else if (type == RTE_SCHED_TYPE_ATOMIC) 261 pkts_done += sw_schedule_atomic_to_cq(sw, qid, 262 iq_num, count); 263 else 264 pkts_done += sw_schedule_parallel_to_cq(sw, qid, 265 iq_num, count, 266 type == RTE_SCHED_TYPE_ORDERED); 267 } 268 269 /* Check if the IQ that was polled is now empty, and unset it 270 * in the IQ mask if its empty. 271 */ 272 int all_done = (pkts_done == count); 273 274 qid->iq_pkt_mask &= ~(all_done << (iq_num)); 275 pkts += pkts_done; 276 } 277 278 return pkts; 279 } 280 281 /* This function will perform re-ordering of packets, and injecting into 282 * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT* 283 * contiguous in that array, this function accepts a "range" of QIDs to scan. 284 */ 285 static uint16_t 286 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) 287 { 288 /* Perform egress reordering */ 289 struct rte_event *qe; 290 uint32_t pkts_iter = 0; 291 292 for (; qid_start < qid_end; qid_start++) { 293 struct sw_qid *qid = &sw->qids[qid_start]; 294 int i, num_entries_in_use; 295 296 if (qid->type != RTE_SCHED_TYPE_ORDERED) 297 continue; 298 299 num_entries_in_use = rte_ring_free_count( 300 qid->reorder_buffer_freelist); 301 302 for (i = 0; i < num_entries_in_use; i++) { 303 struct reorder_buffer_entry *entry; 304 int j; 305 306 entry = &qid->reorder_buffer[qid->reorder_buffer_index]; 307 308 if (!entry->ready) 309 break; 310 311 for (j = 0; j < entry->num_fragments; j++) { 312 uint16_t dest_qid; 313 uint16_t dest_iq; 314 315 int idx = entry->fragment_index + j; 316 qe = &entry->fragments[idx]; 317 318 dest_qid = qe->queue_id; 319 dest_iq = PRIO_TO_IQ(qe->priority); 320 321 if (dest_qid >= sw->qid_count) { 322 sw->stats.rx_dropped++; 323 continue; 324 } 325 326 struct sw_qid *dest_qid_ptr = 327 &sw->qids[dest_qid]; 328 const struct iq_ring *dest_iq_ptr = 329 dest_qid_ptr->iq[dest_iq]; 330 if (iq_ring_free_count(dest_iq_ptr) == 0) 331 break; 332 333 pkts_iter++; 334 335 struct sw_qid *q = &sw->qids[dest_qid]; 336 struct iq_ring *r = q->iq[dest_iq]; 337 338 /* we checked for space above, so enqueue must 339 * succeed 340 */ 341 iq_ring_enqueue(r, qe); 342 q->iq_pkt_mask |= (1 << (dest_iq)); 343 q->iq_pkt_count[dest_iq]++; 344 q->stats.rx_pkts++; 345 } 346 347 entry->ready = (j != entry->num_fragments); 348 entry->num_fragments -= j; 349 entry->fragment_index += j; 350 351 if (!entry->ready) { 352 entry->fragment_index = 0; 353 354 rte_ring_sp_enqueue( 355 qid->reorder_buffer_freelist, 356 entry); 357 358 qid->reorder_buffer_index++; 359 qid->reorder_buffer_index %= qid->window_size; 360 } 361 } 362 } 363 return pkts_iter; 364 } 365 366 static __rte_always_inline void 367 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) 368 { 369 RTE_SET_USED(sw); 370 struct qe_ring *worker = port->rx_worker_ring; 371 port->pp_buf_start = 0; 372 port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf, 373 RTE_DIM(port->pp_buf)); 374 } 375 376 static __rte_always_inline uint32_t 377 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) 378 { 379 static struct reorder_buffer_entry dummy_rob; 380 uint32_t pkts_iter = 0; 381 struct sw_port *port = &sw->ports[port_id]; 382 383 /* If shadow ring has 0 pkts, pull from worker ring */ 384 if (port->pp_buf_count == 0) 385 sw_refill_pp_buf(sw, port); 386 387 while (port->pp_buf_count) { 388 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; 389 struct sw_hist_list_entry *hist_entry = NULL; 390 uint8_t flags = qe->op; 391 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP); 392 int needs_reorder = 0; 393 /* if no-reordering, having PARTIAL == NEW */ 394 if (!allow_reorder && !eop) 395 flags = QE_FLAG_VALID; 396 397 /* 398 * if we don't have space for this packet in an IQ, 399 * then move on to next queue. Technically, for a 400 * packet that needs reordering, we don't need to check 401 * here, but it simplifies things not to special-case 402 */ 403 uint32_t iq_num = PRIO_TO_IQ(qe->priority); 404 struct sw_qid *qid = &sw->qids[qe->queue_id]; 405 406 if ((flags & QE_FLAG_VALID) && 407 iq_ring_free_count(qid->iq[iq_num]) == 0) 408 break; 409 410 /* now process based on flags. Note that for directed 411 * queues, the enqueue_flush masks off all but the 412 * valid flag. This makes FWD and PARTIAL enqueues just 413 * NEW type, and makes DROPS no-op calls. 414 */ 415 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) { 416 const uint32_t hist_tail = port->hist_tail & 417 (SW_PORT_HIST_LIST - 1); 418 419 hist_entry = &port->hist_list[hist_tail]; 420 const uint32_t hist_qid = hist_entry->qid; 421 const uint32_t hist_fid = hist_entry->fid; 422 423 struct sw_fid_t *fid = 424 &sw->qids[hist_qid].fids[hist_fid]; 425 fid->pcount -= eop; 426 if (fid->pcount == 0) 427 fid->cq = -1; 428 429 if (allow_reorder) { 430 /* set reorder ready if an ordered QID */ 431 uintptr_t rob_ptr = 432 (uintptr_t)hist_entry->rob_entry; 433 const uintptr_t valid = (rob_ptr != 0); 434 needs_reorder = valid; 435 rob_ptr |= 436 ((valid - 1) & (uintptr_t)&dummy_rob); 437 struct reorder_buffer_entry *tmp_rob_ptr = 438 (struct reorder_buffer_entry *)rob_ptr; 439 tmp_rob_ptr->ready = eop * needs_reorder; 440 } 441 442 port->inflights -= eop; 443 port->hist_tail += eop; 444 } 445 if (flags & QE_FLAG_VALID) { 446 port->stats.rx_pkts++; 447 448 if (allow_reorder && needs_reorder) { 449 struct reorder_buffer_entry *rob_entry = 450 hist_entry->rob_entry; 451 452 hist_entry->rob_entry = NULL; 453 /* Although fragmentation not currently 454 * supported by eventdev API, we support it 455 * here. Open: How do we alert the user that 456 * they've exceeded max frags? 457 */ 458 int num_frag = rob_entry->num_fragments; 459 if (num_frag == SW_FRAGMENTS_MAX) 460 sw->stats.rx_dropped++; 461 else { 462 int idx = rob_entry->num_fragments++; 463 rob_entry->fragments[idx] = *qe; 464 } 465 goto end_qe; 466 } 467 468 /* Use the iq_num from above to push the QE 469 * into the qid at the right priority 470 */ 471 472 qid->iq_pkt_mask |= (1 << (iq_num)); 473 iq_ring_enqueue(qid->iq[iq_num], qe); 474 qid->iq_pkt_count[iq_num]++; 475 qid->stats.rx_pkts++; 476 pkts_iter++; 477 } 478 479 end_qe: 480 port->pp_buf_start++; 481 port->pp_buf_count--; 482 } /* while (avail_qes) */ 483 484 return pkts_iter; 485 } 486 487 static uint32_t 488 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id) 489 { 490 return __pull_port_lb(sw, port_id, 1); 491 } 492 493 static uint32_t 494 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id) 495 { 496 return __pull_port_lb(sw, port_id, 0); 497 } 498 499 static uint32_t 500 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) 501 { 502 uint32_t pkts_iter = 0; 503 struct sw_port *port = &sw->ports[port_id]; 504 505 /* If shadow ring has 0 pkts, pull from worker ring */ 506 if (port->pp_buf_count == 0) 507 sw_refill_pp_buf(sw, port); 508 509 while (port->pp_buf_count) { 510 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; 511 uint8_t flags = qe->op; 512 513 if ((flags & QE_FLAG_VALID) == 0) 514 goto end_qe; 515 516 uint32_t iq_num = PRIO_TO_IQ(qe->priority); 517 struct sw_qid *qid = &sw->qids[qe->queue_id]; 518 struct iq_ring *iq_ring = qid->iq[iq_num]; 519 520 if (iq_ring_free_count(iq_ring) == 0) 521 break; /* move to next port */ 522 523 port->stats.rx_pkts++; 524 525 /* Use the iq_num from above to push the QE 526 * into the qid at the right priority 527 */ 528 qid->iq_pkt_mask |= (1 << (iq_num)); 529 iq_ring_enqueue(iq_ring, qe); 530 qid->iq_pkt_count[iq_num]++; 531 qid->stats.rx_pkts++; 532 pkts_iter++; 533 534 end_qe: 535 port->pp_buf_start++; 536 port->pp_buf_count--; 537 } /* while port->pp_buf_count */ 538 539 return pkts_iter; 540 } 541 542 void 543 sw_event_schedule(struct rte_eventdev *dev) 544 { 545 struct sw_evdev *sw = sw_pmd_priv(dev); 546 uint32_t in_pkts, out_pkts; 547 uint32_t out_pkts_total = 0, in_pkts_total = 0; 548 int32_t sched_quanta = sw->sched_quanta; 549 uint32_t i; 550 551 sw->sched_called++; 552 if (!sw->started) 553 return; 554 555 do { 556 uint32_t in_pkts_this_iteration = 0; 557 558 /* Pull from rx_ring for ports */ 559 do { 560 in_pkts = 0; 561 for (i = 0; i < sw->port_count; i++) 562 if (sw->ports[i].is_directed) 563 in_pkts += sw_schedule_pull_port_dir(sw, i); 564 else if (sw->ports[i].num_ordered_qids > 0) 565 in_pkts += sw_schedule_pull_port_lb(sw, i); 566 else 567 in_pkts += sw_schedule_pull_port_no_reorder(sw, i); 568 569 /* QID scan for re-ordered */ 570 in_pkts += sw_schedule_reorder(sw, 0, 571 sw->qid_count); 572 in_pkts_this_iteration += in_pkts; 573 } while (in_pkts > 4 && 574 (int)in_pkts_this_iteration < sched_quanta); 575 576 out_pkts = 0; 577 out_pkts += sw_schedule_qid_to_cq(sw); 578 out_pkts_total += out_pkts; 579 in_pkts_total += in_pkts_this_iteration; 580 581 if (in_pkts == 0 && out_pkts == 0) 582 break; 583 } while ((int)out_pkts_total < sched_quanta); 584 585 /* push all the internal buffered QEs in port->cq_ring to the 586 * worker cores: aka, do the ring transfers batched. 587 */ 588 for (i = 0; i < sw->port_count; i++) { 589 struct qe_ring *worker = sw->ports[i].cq_worker_ring; 590 qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf, 591 sw->ports[i].cq_buf_count, 592 &sw->cq_ring_space[i]); 593 sw->ports[i].cq_buf_count = 0; 594 } 595 596 sw->stats.tx_pkts += out_pkts_total; 597 sw->stats.rx_pkts += in_pkts_total; 598 599 sw->sched_no_iq_enqueues += (in_pkts_total == 0); 600 sw->sched_no_cq_enqueues += (out_pkts_total == 0); 601 602 } 603