1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright(c) 2010-2014 Intel Corporation 4 * Copyright 2017 Cavium, Inc. 5 */ 6 7 #include "pipeline_common.h" 8 9 static __rte_always_inline void 10 worker_fwd_event(struct rte_event *ev, uint8_t sched) 11 { 12 ev->event_type = RTE_EVENT_TYPE_CPU; 13 ev->op = RTE_EVENT_OP_FORWARD; 14 ev->sched_type = sched; 15 } 16 17 static __rte_always_inline void 18 worker_event_enqueue(const uint8_t dev, const uint8_t port, 19 struct rte_event *ev) 20 { 21 while (rte_event_enqueue_burst(dev, port, ev, 1) != 1) 22 rte_pause(); 23 } 24 25 static __rte_always_inline void 26 worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, 27 struct rte_event *ev, const uint16_t nb_rx) 28 { 29 uint16_t enq; 30 31 enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); 32 while (enq < nb_rx) { 33 enq += rte_event_enqueue_burst(dev, port, 34 ev + enq, nb_rx - enq); 35 } 36 } 37 38 static __rte_always_inline void 39 worker_tx_pkt(struct rte_mbuf *mbuf) 40 { 41 exchange_mac(mbuf); 42 while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1) 43 rte_pause(); 44 } 45 46 /* Single stage pipeline workers */ 47 48 static int 49 worker_do_tx_single(void *arg) 50 { 51 struct worker_data *data = (struct worker_data *)arg; 52 const uint8_t dev = data->dev_id; 53 const uint8_t port = data->port_id; 54 size_t fwd = 0, received = 0, tx = 0; 55 struct rte_event ev; 56 57 while (!fdata->done) { 58 59 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 60 rte_pause(); 61 continue; 62 } 63 64 received++; 65 66 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 67 worker_tx_pkt(ev.mbuf); 68 tx++; 69 continue; 70 } 71 work(); 72 ev.queue_id++; 73 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 74 worker_event_enqueue(dev, port, &ev); 75 fwd++; 76 } 77 78 if (!cdata.quiet) 79 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 80 rte_lcore_id(), received, fwd, tx); 81 return 0; 82 } 83 84 static int 85 worker_do_tx_single_atq(void *arg) 86 { 87 struct worker_data *data = (struct worker_data *)arg; 88 const uint8_t dev = data->dev_id; 89 const uint8_t port = data->port_id; 90 size_t fwd = 0, received = 0, tx = 0; 91 struct rte_event ev; 92 93 while (!fdata->done) { 94 95 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 96 rte_pause(); 97 continue; 98 } 99 100 received++; 101 102 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 103 worker_tx_pkt(ev.mbuf); 104 tx++; 105 continue; 106 } 107 work(); 108 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 109 worker_event_enqueue(dev, port, &ev); 110 fwd++; 111 } 112 113 if (!cdata.quiet) 114 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 115 rte_lcore_id(), received, fwd, tx); 116 return 0; 117 } 118 119 static int 120 worker_do_tx_single_burst(void *arg) 121 { 122 struct rte_event ev[BATCH_SIZE + 1]; 123 124 struct worker_data *data = (struct worker_data *)arg; 125 const uint8_t dev = data->dev_id; 126 const uint8_t port = data->port_id; 127 size_t fwd = 0, received = 0, tx = 0; 128 129 while (!fdata->done) { 130 uint16_t i; 131 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 132 BATCH_SIZE, 0); 133 134 if (!nb_rx) { 135 rte_pause(); 136 continue; 137 } 138 received += nb_rx; 139 140 for (i = 0; i < nb_rx; i++) { 141 rte_prefetch0(ev[i + 1].mbuf); 142 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 143 144 worker_tx_pkt(ev[i].mbuf); 145 ev[i].op = RTE_EVENT_OP_RELEASE; 146 tx++; 147 148 } else { 149 ev[i].queue_id++; 150 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 151 } 152 work(); 153 } 154 155 worker_event_enqueue_burst(dev, port, ev, nb_rx); 156 fwd += nb_rx; 157 } 158 159 if (!cdata.quiet) 160 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 161 rte_lcore_id(), received, fwd, tx); 162 return 0; 163 } 164 165 static int 166 worker_do_tx_single_burst_atq(void *arg) 167 { 168 struct rte_event ev[BATCH_SIZE + 1]; 169 170 struct worker_data *data = (struct worker_data *)arg; 171 const uint8_t dev = data->dev_id; 172 const uint8_t port = data->port_id; 173 size_t fwd = 0, received = 0, tx = 0; 174 175 while (!fdata->done) { 176 uint16_t i; 177 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 178 BATCH_SIZE, 0); 179 180 if (!nb_rx) { 181 rte_pause(); 182 continue; 183 } 184 185 received += nb_rx; 186 187 for (i = 0; i < nb_rx; i++) { 188 rte_prefetch0(ev[i + 1].mbuf); 189 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 190 191 worker_tx_pkt(ev[i].mbuf); 192 ev[i].op = RTE_EVENT_OP_RELEASE; 193 tx++; 194 } else 195 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 196 work(); 197 } 198 199 worker_event_enqueue_burst(dev, port, ev, nb_rx); 200 fwd += nb_rx; 201 } 202 203 if (!cdata.quiet) 204 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 205 rte_lcore_id(), received, fwd, tx); 206 return 0; 207 } 208 209 /* Multi stage Pipeline Workers */ 210 211 static int 212 worker_do_tx(void *arg) 213 { 214 struct rte_event ev; 215 216 struct worker_data *data = (struct worker_data *)arg; 217 const uint8_t dev = data->dev_id; 218 const uint8_t port = data->port_id; 219 const uint8_t lst_qid = cdata.num_stages - 1; 220 size_t fwd = 0, received = 0, tx = 0; 221 222 223 while (!fdata->done) { 224 225 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 226 rte_pause(); 227 continue; 228 } 229 230 received++; 231 const uint8_t cq_id = ev.queue_id % cdata.num_stages; 232 233 if (cq_id >= lst_qid) { 234 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 235 worker_tx_pkt(ev.mbuf); 236 tx++; 237 continue; 238 } 239 240 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 241 ev.queue_id = (cq_id == lst_qid) ? 242 cdata.next_qid[ev.queue_id] : ev.queue_id; 243 } else { 244 ev.queue_id = cdata.next_qid[ev.queue_id]; 245 worker_fwd_event(&ev, cdata.queue_type); 246 } 247 work(); 248 249 worker_event_enqueue(dev, port, &ev); 250 fwd++; 251 } 252 253 if (!cdata.quiet) 254 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 255 rte_lcore_id(), received, fwd, tx); 256 257 return 0; 258 } 259 260 static int 261 worker_do_tx_atq(void *arg) 262 { 263 struct rte_event ev; 264 265 struct worker_data *data = (struct worker_data *)arg; 266 const uint8_t dev = data->dev_id; 267 const uint8_t port = data->port_id; 268 const uint8_t lst_qid = cdata.num_stages - 1; 269 size_t fwd = 0, received = 0, tx = 0; 270 271 while (!fdata->done) { 272 273 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 274 rte_pause(); 275 continue; 276 } 277 278 received++; 279 const uint8_t cq_id = ev.sub_event_type % cdata.num_stages; 280 281 if (cq_id == lst_qid) { 282 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 283 worker_tx_pkt(ev.mbuf); 284 tx++; 285 continue; 286 } 287 288 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 289 } else { 290 ev.sub_event_type++; 291 worker_fwd_event(&ev, cdata.queue_type); 292 } 293 work(); 294 295 worker_event_enqueue(dev, port, &ev); 296 fwd++; 297 } 298 299 if (!cdata.quiet) 300 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 301 rte_lcore_id(), received, fwd, tx); 302 303 return 0; 304 } 305 306 static int 307 worker_do_tx_burst(void *arg) 308 { 309 struct rte_event ev[BATCH_SIZE]; 310 311 struct worker_data *data = (struct worker_data *)arg; 312 uint8_t dev = data->dev_id; 313 uint8_t port = data->port_id; 314 uint8_t lst_qid = cdata.num_stages - 1; 315 size_t fwd = 0, received = 0, tx = 0; 316 317 while (!fdata->done) { 318 uint16_t i; 319 const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, 320 ev, BATCH_SIZE, 0); 321 322 if (nb_rx == 0) { 323 rte_pause(); 324 continue; 325 } 326 received += nb_rx; 327 328 for (i = 0; i < nb_rx; i++) { 329 const uint8_t cq_id = ev[i].queue_id % cdata.num_stages; 330 331 if (cq_id >= lst_qid) { 332 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 333 worker_tx_pkt(ev[i].mbuf); 334 tx++; 335 ev[i].op = RTE_EVENT_OP_RELEASE; 336 continue; 337 } 338 ev[i].queue_id = (cq_id == lst_qid) ? 339 cdata.next_qid[ev[i].queue_id] : 340 ev[i].queue_id; 341 342 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 343 } else { 344 ev[i].queue_id = cdata.next_qid[ev[i].queue_id]; 345 worker_fwd_event(&ev[i], cdata.queue_type); 346 } 347 work(); 348 } 349 worker_event_enqueue_burst(dev, port, ev, nb_rx); 350 351 fwd += nb_rx; 352 } 353 354 if (!cdata.quiet) 355 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 356 rte_lcore_id(), received, fwd, tx); 357 358 return 0; 359 } 360 361 static int 362 worker_do_tx_burst_atq(void *arg) 363 { 364 struct rte_event ev[BATCH_SIZE]; 365 366 struct worker_data *data = (struct worker_data *)arg; 367 uint8_t dev = data->dev_id; 368 uint8_t port = data->port_id; 369 uint8_t lst_qid = cdata.num_stages - 1; 370 size_t fwd = 0, received = 0, tx = 0; 371 372 while (!fdata->done) { 373 uint16_t i; 374 375 const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, 376 ev, BATCH_SIZE, 0); 377 378 if (nb_rx == 0) { 379 rte_pause(); 380 continue; 381 } 382 received += nb_rx; 383 384 for (i = 0; i < nb_rx; i++) { 385 const uint8_t cq_id = ev[i].sub_event_type % 386 cdata.num_stages; 387 388 if (cq_id == lst_qid) { 389 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 390 worker_tx_pkt(ev[i].mbuf); 391 tx++; 392 ev[i].op = RTE_EVENT_OP_RELEASE; 393 continue; 394 } 395 396 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 397 } else { 398 ev[i].sub_event_type++; 399 worker_fwd_event(&ev[i], cdata.queue_type); 400 } 401 work(); 402 } 403 404 worker_event_enqueue_burst(dev, port, ev, nb_rx); 405 fwd += nb_rx; 406 } 407 408 if (!cdata.quiet) 409 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 410 rte_lcore_id(), received, fwd, tx); 411 412 return 0; 413 } 414 415 static int 416 setup_eventdev_worker_tx(struct cons_data *cons_data, 417 struct worker_data *worker_data) 418 { 419 RTE_SET_USED(cons_data); 420 uint8_t i; 421 const uint8_t atq = cdata.all_type_queues ? 1 : 0; 422 const uint8_t dev_id = 0; 423 const uint8_t nb_ports = cdata.num_workers; 424 uint8_t nb_slots = 0; 425 uint8_t nb_queues = rte_eth_dev_count(); 426 427 /* 428 * In case where all type queues are not enabled, use queues equal to 429 * number of stages * eth_dev_count and one extra queue per pipeline 430 * for Tx. 431 */ 432 if (!atq) { 433 nb_queues *= cdata.num_stages; 434 nb_queues += rte_eth_dev_count(); 435 } 436 437 struct rte_event_dev_config config = { 438 .nb_event_queues = nb_queues, 439 .nb_event_ports = nb_ports, 440 .nb_events_limit = 4096, 441 .nb_event_queue_flows = 1024, 442 .nb_event_port_dequeue_depth = 128, 443 .nb_event_port_enqueue_depth = 128, 444 }; 445 struct rte_event_port_conf wkr_p_conf = { 446 .dequeue_depth = cdata.worker_cq_depth, 447 .enqueue_depth = 64, 448 .new_event_threshold = 4096, 449 }; 450 struct rte_event_queue_conf wkr_q_conf = { 451 .schedule_type = cdata.queue_type, 452 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 453 .nb_atomic_flows = 1024, 454 .nb_atomic_order_sequences = 1024, 455 }; 456 457 int ret, ndev = rte_event_dev_count(); 458 459 if (ndev < 1) { 460 printf("%d: No Eventdev Devices Found\n", __LINE__); 461 return -1; 462 } 463 464 465 struct rte_event_dev_info dev_info; 466 ret = rte_event_dev_info_get(dev_id, &dev_info); 467 printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); 468 469 if (dev_info.max_event_port_dequeue_depth < 470 config.nb_event_port_dequeue_depth) 471 config.nb_event_port_dequeue_depth = 472 dev_info.max_event_port_dequeue_depth; 473 if (dev_info.max_event_port_enqueue_depth < 474 config.nb_event_port_enqueue_depth) 475 config.nb_event_port_enqueue_depth = 476 dev_info.max_event_port_enqueue_depth; 477 478 ret = rte_event_dev_configure(dev_id, &config); 479 if (ret < 0) { 480 printf("%d: Error configuring device\n", __LINE__); 481 return -1; 482 } 483 484 printf(" Stages:\n"); 485 for (i = 0; i < nb_queues; i++) { 486 487 if (atq) { 488 489 nb_slots = cdata.num_stages; 490 wkr_q_conf.event_queue_cfg = 491 RTE_EVENT_QUEUE_CFG_ALL_TYPES; 492 } else { 493 uint8_t slot; 494 495 nb_slots = cdata.num_stages + 1; 496 slot = i % nb_slots; 497 wkr_q_conf.schedule_type = slot == cdata.num_stages ? 498 RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; 499 } 500 501 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) { 502 printf("%d: error creating qid %d\n", __LINE__, i); 503 return -1; 504 } 505 cdata.qid[i] = i; 506 cdata.next_qid[i] = i+1; 507 if (cdata.enable_queue_priorities) { 508 const uint32_t prio_delta = 509 (RTE_EVENT_DEV_PRIORITY_LOWEST) / 510 nb_slots; 511 512 /* higher priority for queues closer to tx */ 513 wkr_q_conf.priority = 514 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * 515 (i % nb_slots); 516 } 517 518 const char *type_str = "Atomic"; 519 switch (wkr_q_conf.schedule_type) { 520 case RTE_SCHED_TYPE_ORDERED: 521 type_str = "Ordered"; 522 break; 523 case RTE_SCHED_TYPE_PARALLEL: 524 type_str = "Parallel"; 525 break; 526 } 527 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str, 528 wkr_q_conf.priority); 529 } 530 531 printf("\n"); 532 if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) 533 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; 534 if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) 535 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; 536 537 /* set up one port per worker, linking to all stage queues */ 538 for (i = 0; i < cdata.num_workers; i++) { 539 struct worker_data *w = &worker_data[i]; 540 w->dev_id = dev_id; 541 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) { 542 printf("Error setting up port %d\n", i); 543 return -1; 544 } 545 546 if (rte_event_port_link(dev_id, i, NULL, NULL, 0) 547 != nb_queues) { 548 printf("%d: error creating link for port %d\n", 549 __LINE__, i); 550 return -1; 551 } 552 w->port_id = i; 553 } 554 /* 555 * Reduce the load on ingress event queue by splitting the traffic 556 * across multiple event queues. 557 * for example, nb_stages = 2 and nb_ethdev = 2 then 558 * 559 * nb_queues = (2 * 2) + 2 = 6 (non atq) 560 * rx_stride = 3 561 * 562 * So, traffic is split across queue 0 and queue 3 since queue id for 563 * rx adapter is chosen <ethport_id> * <rx_stride> i.e in the above 564 * case eth port 0, 1 will inject packets into event queue 0, 3 565 * respectively. 566 * 567 * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx. 568 */ 569 cdata.rx_stride = atq ? 1 : nb_slots; 570 ret = rte_event_dev_service_id_get(dev_id, 571 &fdata->evdev_service_id); 572 if (ret != -ESRCH && ret != 0) { 573 printf("Error getting the service ID\n"); 574 return -1; 575 } 576 rte_service_runstate_set(fdata->evdev_service_id, 1); 577 rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0); 578 if (rte_event_dev_start(dev_id) < 0) { 579 printf("Error starting eventdev\n"); 580 return -1; 581 } 582 583 return dev_id; 584 } 585 586 587 struct rx_adptr_services { 588 uint16_t nb_rx_adptrs; 589 uint32_t *rx_adpt_arr; 590 }; 591 592 static int32_t 593 service_rx_adapter(void *arg) 594 { 595 int i; 596 struct rx_adptr_services *adptr_services = arg; 597 598 for (i = 0; i < adptr_services->nb_rx_adptrs; i++) 599 rte_service_run_iter_on_app_lcore( 600 adptr_services->rx_adpt_arr[i], 1); 601 return 0; 602 } 603 604 static void 605 init_rx_adapter(uint16_t nb_ports) 606 { 607 int i; 608 int ret; 609 uint8_t evdev_id = 0; 610 struct rx_adptr_services *adptr_services = NULL; 611 struct rte_event_dev_info dev_info; 612 613 ret = rte_event_dev_info_get(evdev_id, &dev_info); 614 adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0); 615 616 struct rte_event_port_conf rx_p_conf = { 617 .dequeue_depth = 8, 618 .enqueue_depth = 8, 619 .new_event_threshold = 1200, 620 }; 621 622 if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth) 623 rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth; 624 if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth) 625 rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth; 626 627 628 struct rte_event_eth_rx_adapter_queue_conf queue_conf = { 629 .ev.sched_type = cdata.queue_type, 630 }; 631 632 for (i = 0; i < nb_ports; i++) { 633 uint32_t cap; 634 uint32_t service_id; 635 636 ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf); 637 if (ret) 638 rte_exit(EXIT_FAILURE, 639 "failed to create rx adapter[%d]", 640 cdata.rx_adapter_id); 641 642 ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap); 643 if (ret) 644 rte_exit(EXIT_FAILURE, 645 "failed to get event rx adapter " 646 "capabilities"); 647 648 queue_conf.ev.queue_id = cdata.rx_stride ? 649 (i * cdata.rx_stride) 650 : (uint8_t)cdata.qid[0]; 651 652 ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf); 653 if (ret) 654 rte_exit(EXIT_FAILURE, 655 "Failed to add queues to Rx adapter"); 656 657 658 /* Producer needs to be scheduled. */ 659 if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { 660 ret = rte_event_eth_rx_adapter_service_id_get(i, 661 &service_id); 662 if (ret != -ESRCH && ret != 0) { 663 rte_exit(EXIT_FAILURE, 664 "Error getting the service ID for rx adptr\n"); 665 } 666 667 rte_service_runstate_set(service_id, 1); 668 rte_service_set_runstate_mapped_check(service_id, 0); 669 670 adptr_services->nb_rx_adptrs++; 671 adptr_services->rx_adpt_arr = rte_realloc( 672 adptr_services->rx_adpt_arr, 673 adptr_services->nb_rx_adptrs * 674 sizeof(uint32_t), 0); 675 adptr_services->rx_adpt_arr[ 676 adptr_services->nb_rx_adptrs - 1] = 677 service_id; 678 } 679 680 ret = rte_event_eth_rx_adapter_start(i); 681 if (ret) 682 rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed", 683 cdata.rx_adapter_id); 684 } 685 686 if (adptr_services->nb_rx_adptrs) { 687 struct rte_service_spec service; 688 689 memset(&service, 0, sizeof(struct rte_service_spec)); 690 snprintf(service.name, sizeof(service.name), "rx_service"); 691 service.callback = service_rx_adapter; 692 service.callback_userdata = (void *)adptr_services; 693 694 int32_t ret = rte_service_component_register(&service, 695 &fdata->rxadptr_service_id); 696 if (ret) 697 rte_exit(EXIT_FAILURE, 698 "Rx adapter[%d] service register failed", 699 cdata.rx_adapter_id); 700 701 rte_service_runstate_set(fdata->rxadptr_service_id, 1); 702 rte_service_component_runstate_set(fdata->rxadptr_service_id, 703 1); 704 rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 705 0); 706 } else { 707 memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 708 rte_free(adptr_services); 709 } 710 711 if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL && 712 (dev_info.event_dev_cap & 713 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)) 714 fdata->cap.scheduler = NULL; 715 716 if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED) 717 memset(fdata->sched_core, 0, 718 sizeof(unsigned int) * MAX_NUM_CORE); 719 } 720 721 static void 722 worker_tx_opt_check(void) 723 { 724 int i; 725 int ret; 726 uint32_t cap = 0; 727 uint8_t rx_needed = 0; 728 struct rte_event_dev_info eventdev_info; 729 730 memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); 731 rte_event_dev_info_get(0, &eventdev_info); 732 733 if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & 734 RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) 735 rte_exit(EXIT_FAILURE, 736 "Event dev doesn't support all type queues\n"); 737 738 for (i = 0; i < rte_eth_dev_count(); i++) { 739 ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); 740 if (ret) 741 rte_exit(EXIT_FAILURE, 742 "failed to get event rx adapter " 743 "capabilities"); 744 rx_needed |= 745 !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT); 746 } 747 748 if (cdata.worker_lcore_mask == 0 || 749 (rx_needed && cdata.rx_lcore_mask == 0) || 750 (cdata.sched_lcore_mask == 0 && 751 !(eventdev_info.event_dev_cap & 752 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) { 753 printf("Core part of pipeline was not assigned any cores. " 754 "This will stall the pipeline, please check core masks " 755 "(use -h for details on setting core masks):\n" 756 "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64 757 "\n\tworkers: %"PRIu64"\n", 758 cdata.rx_lcore_mask, cdata.tx_lcore_mask, 759 cdata.sched_lcore_mask, 760 cdata.worker_lcore_mask); 761 rte_exit(-1, "Fix core masks\n"); 762 } 763 } 764 765 static worker_loop 766 get_worker_loop_single_burst(uint8_t atq) 767 { 768 if (atq) 769 return worker_do_tx_single_burst_atq; 770 771 return worker_do_tx_single_burst; 772 } 773 774 static worker_loop 775 get_worker_loop_single_non_burst(uint8_t atq) 776 { 777 if (atq) 778 return worker_do_tx_single_atq; 779 780 return worker_do_tx_single; 781 } 782 783 static worker_loop 784 get_worker_loop_burst(uint8_t atq) 785 { 786 if (atq) 787 return worker_do_tx_burst_atq; 788 789 return worker_do_tx_burst; 790 } 791 792 static worker_loop 793 get_worker_loop_non_burst(uint8_t atq) 794 { 795 if (atq) 796 return worker_do_tx_atq; 797 798 return worker_do_tx; 799 } 800 801 static worker_loop 802 get_worker_single_stage(bool burst) 803 { 804 uint8_t atq = cdata.all_type_queues ? 1 : 0; 805 806 if (burst) 807 return get_worker_loop_single_burst(atq); 808 809 return get_worker_loop_single_non_burst(atq); 810 } 811 812 static worker_loop 813 get_worker_multi_stage(bool burst) 814 { 815 uint8_t atq = cdata.all_type_queues ? 1 : 0; 816 817 if (burst) 818 return get_worker_loop_burst(atq); 819 820 return get_worker_loop_non_burst(atq); 821 } 822 823 void 824 set_worker_tx_setup_data(struct setup_data *caps, bool burst) 825 { 826 if (cdata.num_stages == 1) 827 caps->worker = get_worker_single_stage(burst); 828 else 829 caps->worker = get_worker_multi_stage(burst); 830 831 memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 832 833 caps->check_opt = worker_tx_opt_check; 834 caps->consumer = NULL; 835 caps->scheduler = schedule_devices; 836 caps->evdev_setup = setup_eventdev_worker_tx; 837 caps->adptr_setup = init_rx_adapter; 838 } 839