1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright(c) 2010-2014 Intel Corporation 4 * Copyright 2017 Cavium, Inc. 5 */ 6 7 #include "pipeline_common.h" 8 9 static __rte_always_inline void 10 worker_fwd_event(struct rte_event *ev, uint8_t sched) 11 { 12 ev->event_type = RTE_EVENT_TYPE_CPU; 13 ev->op = RTE_EVENT_OP_FORWARD; 14 ev->sched_type = sched; 15 } 16 17 static __rte_always_inline void 18 worker_event_enqueue(const uint8_t dev, const uint8_t port, 19 struct rte_event *ev) 20 { 21 while (rte_event_enqueue_burst(dev, port, ev, 1) != 1) 22 rte_pause(); 23 } 24 25 static __rte_always_inline void 26 worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, 27 struct rte_event *ev, const uint16_t nb_rx) 28 { 29 uint16_t enq; 30 31 enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); 32 while (enq < nb_rx) { 33 enq += rte_event_enqueue_burst(dev, port, 34 ev + enq, nb_rx - enq); 35 } 36 } 37 38 static __rte_always_inline void 39 worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev) 40 { 41 exchange_mac(ev->mbuf); 42 rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0); 43 while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1)) 44 rte_pause(); 45 } 46 47 /* Single stage pipeline workers */ 48 49 static int 50 worker_do_tx_single(void *arg) 51 { 52 struct worker_data *data = (struct worker_data *)arg; 53 const uint8_t dev = data->dev_id; 54 const uint8_t port = data->port_id; 55 size_t fwd = 0, received = 0, tx = 0; 56 struct rte_event ev; 57 58 while (!fdata->done) { 59 60 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 61 rte_pause(); 62 continue; 63 } 64 65 received++; 66 67 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 68 worker_tx_pkt(dev, port, &ev); 69 tx++; 70 } else { 71 work(); 72 ev.queue_id++; 73 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 74 worker_event_enqueue(dev, port, &ev); 75 fwd++; 76 } 77 } 78 79 if (!cdata.quiet) 80 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 81 rte_lcore_id(), received, fwd, tx); 82 return 0; 83 } 84 85 static int 86 worker_do_tx_single_atq(void *arg) 87 { 88 struct worker_data *data = (struct worker_data *)arg; 89 const uint8_t dev = data->dev_id; 90 const uint8_t port = data->port_id; 91 size_t fwd = 0, received = 0, tx = 0; 92 struct rte_event ev; 93 94 while (!fdata->done) { 95 96 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 97 rte_pause(); 98 continue; 99 } 100 101 received++; 102 103 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 104 worker_tx_pkt(dev, port, &ev); 105 tx++; 106 } else { 107 work(); 108 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 109 worker_event_enqueue(dev, port, &ev); 110 fwd++; 111 } 112 } 113 114 if (!cdata.quiet) 115 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 116 rte_lcore_id(), received, fwd, tx); 117 return 0; 118 } 119 120 static int 121 worker_do_tx_single_burst(void *arg) 122 { 123 struct rte_event ev[BATCH_SIZE + 1]; 124 125 struct worker_data *data = (struct worker_data *)arg; 126 const uint8_t dev = data->dev_id; 127 const uint8_t port = data->port_id; 128 size_t fwd = 0, received = 0, tx = 0; 129 130 while (!fdata->done) { 131 uint16_t i; 132 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 133 BATCH_SIZE, 0); 134 135 if (!nb_rx) { 136 rte_pause(); 137 continue; 138 } 139 received += nb_rx; 140 141 for (i = 0; i < nb_rx; i++) { 142 rte_prefetch0(ev[i + 1].mbuf); 143 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 144 145 worker_tx_pkt(dev, port, &ev[i]); 146 ev[i].op = RTE_EVENT_OP_RELEASE; 147 tx++; 148 149 } else { 150 ev[i].queue_id++; 151 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 152 } 153 work(); 154 } 155 156 worker_event_enqueue_burst(dev, port, ev, nb_rx); 157 fwd += nb_rx; 158 } 159 160 if (!cdata.quiet) 161 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 162 rte_lcore_id(), received, fwd, tx); 163 return 0; 164 } 165 166 static int 167 worker_do_tx_single_burst_atq(void *arg) 168 { 169 struct rte_event ev[BATCH_SIZE + 1]; 170 171 struct worker_data *data = (struct worker_data *)arg; 172 const uint8_t dev = data->dev_id; 173 const uint8_t port = data->port_id; 174 size_t fwd = 0, received = 0, tx = 0; 175 176 while (!fdata->done) { 177 uint16_t i; 178 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 179 BATCH_SIZE, 0); 180 181 if (!nb_rx) { 182 rte_pause(); 183 continue; 184 } 185 186 received += nb_rx; 187 188 for (i = 0; i < nb_rx; i++) { 189 rte_prefetch0(ev[i + 1].mbuf); 190 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 191 192 worker_tx_pkt(dev, port, &ev[i]); 193 ev[i].op = RTE_EVENT_OP_RELEASE; 194 tx++; 195 } else 196 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 197 work(); 198 } 199 200 worker_event_enqueue_burst(dev, port, ev, nb_rx); 201 fwd += nb_rx; 202 } 203 204 if (!cdata.quiet) 205 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 206 rte_lcore_id(), received, fwd, tx); 207 return 0; 208 } 209 210 /* Multi stage Pipeline Workers */ 211 212 static int 213 worker_do_tx(void *arg) 214 { 215 struct rte_event ev; 216 217 struct worker_data *data = (struct worker_data *)arg; 218 const uint8_t dev = data->dev_id; 219 const uint8_t port = data->port_id; 220 const uint8_t lst_qid = cdata.num_stages - 1; 221 size_t fwd = 0, received = 0, tx = 0; 222 223 224 while (!fdata->done) { 225 226 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 227 rte_pause(); 228 continue; 229 } 230 231 received++; 232 const uint8_t cq_id = ev.queue_id % cdata.num_stages; 233 234 if (cq_id >= lst_qid) { 235 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 236 worker_tx_pkt(dev, port, &ev); 237 tx++; 238 continue; 239 } 240 241 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 242 ev.queue_id = (cq_id == lst_qid) ? 243 cdata.next_qid[ev.queue_id] : ev.queue_id; 244 } else { 245 ev.queue_id = cdata.next_qid[ev.queue_id]; 246 worker_fwd_event(&ev, cdata.queue_type); 247 } 248 work(); 249 250 worker_event_enqueue(dev, port, &ev); 251 fwd++; 252 } 253 254 if (!cdata.quiet) 255 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 256 rte_lcore_id(), received, fwd, tx); 257 258 return 0; 259 } 260 261 static int 262 worker_do_tx_atq(void *arg) 263 { 264 struct rte_event ev; 265 266 struct worker_data *data = (struct worker_data *)arg; 267 const uint8_t dev = data->dev_id; 268 const uint8_t port = data->port_id; 269 const uint8_t lst_qid = cdata.num_stages - 1; 270 size_t fwd = 0, received = 0, tx = 0; 271 272 while (!fdata->done) { 273 274 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 275 rte_pause(); 276 continue; 277 } 278 279 received++; 280 const uint8_t cq_id = ev.sub_event_type % cdata.num_stages; 281 282 if (cq_id == lst_qid) { 283 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 284 worker_tx_pkt(dev, port, &ev); 285 tx++; 286 continue; 287 } 288 289 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 290 } else { 291 ev.sub_event_type++; 292 worker_fwd_event(&ev, cdata.queue_type); 293 } 294 work(); 295 296 worker_event_enqueue(dev, port, &ev); 297 fwd++; 298 } 299 300 if (!cdata.quiet) 301 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 302 rte_lcore_id(), received, fwd, tx); 303 304 return 0; 305 } 306 307 static int 308 worker_do_tx_burst(void *arg) 309 { 310 struct rte_event ev[BATCH_SIZE]; 311 312 struct worker_data *data = (struct worker_data *)arg; 313 uint8_t dev = data->dev_id; 314 uint8_t port = data->port_id; 315 uint8_t lst_qid = cdata.num_stages - 1; 316 size_t fwd = 0, received = 0, tx = 0; 317 318 while (!fdata->done) { 319 uint16_t i; 320 const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, 321 ev, BATCH_SIZE, 0); 322 323 if (nb_rx == 0) { 324 rte_pause(); 325 continue; 326 } 327 received += nb_rx; 328 329 for (i = 0; i < nb_rx; i++) { 330 const uint8_t cq_id = ev[i].queue_id % cdata.num_stages; 331 332 if (cq_id >= lst_qid) { 333 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 334 worker_tx_pkt(dev, port, &ev[i]); 335 tx++; 336 ev[i].op = RTE_EVENT_OP_RELEASE; 337 continue; 338 } 339 ev[i].queue_id = (cq_id == lst_qid) ? 340 cdata.next_qid[ev[i].queue_id] : 341 ev[i].queue_id; 342 343 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 344 } else { 345 ev[i].queue_id = cdata.next_qid[ev[i].queue_id]; 346 worker_fwd_event(&ev[i], cdata.queue_type); 347 } 348 work(); 349 } 350 worker_event_enqueue_burst(dev, port, ev, nb_rx); 351 352 fwd += nb_rx; 353 } 354 355 if (!cdata.quiet) 356 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 357 rte_lcore_id(), received, fwd, tx); 358 359 return 0; 360 } 361 362 static int 363 worker_do_tx_burst_atq(void *arg) 364 { 365 struct rte_event ev[BATCH_SIZE]; 366 367 struct worker_data *data = (struct worker_data *)arg; 368 uint8_t dev = data->dev_id; 369 uint8_t port = data->port_id; 370 uint8_t lst_qid = cdata.num_stages - 1; 371 size_t fwd = 0, received = 0, tx = 0; 372 373 while (!fdata->done) { 374 uint16_t i; 375 376 const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, 377 ev, BATCH_SIZE, 0); 378 379 if (nb_rx == 0) { 380 rte_pause(); 381 continue; 382 } 383 received += nb_rx; 384 385 for (i = 0; i < nb_rx; i++) { 386 const uint8_t cq_id = ev[i].sub_event_type % 387 cdata.num_stages; 388 389 if (cq_id == lst_qid) { 390 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 391 worker_tx_pkt(dev, port, &ev[i]); 392 tx++; 393 ev[i].op = RTE_EVENT_OP_RELEASE; 394 continue; 395 } 396 397 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 398 } else { 399 ev[i].sub_event_type++; 400 worker_fwd_event(&ev[i], cdata.queue_type); 401 } 402 work(); 403 } 404 405 worker_event_enqueue_burst(dev, port, ev, nb_rx); 406 fwd += nb_rx; 407 } 408 409 if (!cdata.quiet) 410 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 411 rte_lcore_id(), received, fwd, tx); 412 413 return 0; 414 } 415 416 static int 417 setup_eventdev_worker_tx_enq(struct worker_data *worker_data) 418 { 419 uint8_t i; 420 const uint8_t atq = cdata.all_type_queues ? 1 : 0; 421 const uint8_t dev_id = 0; 422 const uint8_t nb_ports = cdata.num_workers; 423 uint8_t nb_slots = 0; 424 uint8_t nb_queues = rte_eth_dev_count_avail(); 425 426 /* 427 * In case where all type queues are not enabled, use queues equal to 428 * number of stages * eth_dev_count and one extra queue per pipeline 429 * for Tx. 430 */ 431 if (!atq) { 432 nb_queues *= cdata.num_stages; 433 nb_queues += rte_eth_dev_count_avail(); 434 } 435 436 struct rte_event_dev_config config = { 437 .nb_event_queues = nb_queues, 438 .nb_event_ports = nb_ports, 439 .nb_events_limit = 4096, 440 .nb_event_queue_flows = 1024, 441 .nb_event_port_dequeue_depth = 128, 442 .nb_event_port_enqueue_depth = 128, 443 }; 444 struct rte_event_port_conf wkr_p_conf = { 445 .dequeue_depth = cdata.worker_cq_depth, 446 .enqueue_depth = 64, 447 .new_event_threshold = 4096, 448 }; 449 struct rte_event_queue_conf wkr_q_conf = { 450 .schedule_type = cdata.queue_type, 451 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 452 .nb_atomic_flows = 1024, 453 .nb_atomic_order_sequences = 1024, 454 }; 455 456 int ret, ndev = rte_event_dev_count(); 457 458 if (ndev < 1) { 459 printf("%d: No Eventdev Devices Found\n", __LINE__); 460 return -1; 461 } 462 463 464 struct rte_event_dev_info dev_info; 465 ret = rte_event_dev_info_get(dev_id, &dev_info); 466 printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); 467 468 if (dev_info.max_event_port_dequeue_depth < 469 config.nb_event_port_dequeue_depth) 470 config.nb_event_port_dequeue_depth = 471 dev_info.max_event_port_dequeue_depth; 472 if (dev_info.max_event_port_enqueue_depth < 473 config.nb_event_port_enqueue_depth) 474 config.nb_event_port_enqueue_depth = 475 dev_info.max_event_port_enqueue_depth; 476 477 ret = rte_event_dev_configure(dev_id, &config); 478 if (ret < 0) { 479 printf("%d: Error configuring device\n", __LINE__); 480 return -1; 481 } 482 483 printf(" Stages:\n"); 484 for (i = 0; i < nb_queues; i++) { 485 486 if (atq) { 487 488 nb_slots = cdata.num_stages; 489 wkr_q_conf.event_queue_cfg = 490 RTE_EVENT_QUEUE_CFG_ALL_TYPES; 491 } else { 492 uint8_t slot; 493 494 nb_slots = cdata.num_stages + 1; 495 slot = i % nb_slots; 496 wkr_q_conf.schedule_type = slot == cdata.num_stages ? 497 RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; 498 } 499 500 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) { 501 printf("%d: error creating qid %d\n", __LINE__, i); 502 return -1; 503 } 504 cdata.qid[i] = i; 505 cdata.next_qid[i] = i+1; 506 if (cdata.enable_queue_priorities) { 507 const uint32_t prio_delta = 508 (RTE_EVENT_DEV_PRIORITY_LOWEST) / 509 nb_slots; 510 511 /* higher priority for queues closer to tx */ 512 wkr_q_conf.priority = 513 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * 514 (i % nb_slots); 515 } 516 517 const char *type_str = "Atomic"; 518 switch (wkr_q_conf.schedule_type) { 519 case RTE_SCHED_TYPE_ORDERED: 520 type_str = "Ordered"; 521 break; 522 case RTE_SCHED_TYPE_PARALLEL: 523 type_str = "Parallel"; 524 break; 525 } 526 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str, 527 wkr_q_conf.priority); 528 } 529 530 printf("\n"); 531 if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) 532 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; 533 if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) 534 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; 535 536 /* set up one port per worker, linking to all stage queues */ 537 for (i = 0; i < cdata.num_workers; i++) { 538 struct worker_data *w = &worker_data[i]; 539 w->dev_id = dev_id; 540 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) { 541 printf("Error setting up port %d\n", i); 542 return -1; 543 } 544 545 if (rte_event_port_link(dev_id, i, NULL, NULL, 0) 546 != nb_queues) { 547 printf("%d: error creating link for port %d\n", 548 __LINE__, i); 549 return -1; 550 } 551 w->port_id = i; 552 } 553 /* 554 * Reduce the load on ingress event queue by splitting the traffic 555 * across multiple event queues. 556 * for example, nb_stages = 2 and nb_ethdev = 2 then 557 * 558 * nb_queues = (2 * 2) + 2 = 6 (non atq) 559 * rx_stride = 3 560 * 561 * So, traffic is split across queue 0 and queue 3 since queue id for 562 * rx adapter is chosen <ethport_id> * <rx_stride> i.e in the above 563 * case eth port 0, 1 will inject packets into event queue 0, 3 564 * respectively. 565 * 566 * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx. 567 */ 568 cdata.rx_stride = atq ? 1 : nb_slots; 569 ret = rte_event_dev_service_id_get(dev_id, 570 &fdata->evdev_service_id); 571 if (ret != -ESRCH && ret != 0) { 572 printf("Error getting the service ID\n"); 573 return -1; 574 } 575 rte_service_runstate_set(fdata->evdev_service_id, 1); 576 rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0); 577 578 if (rte_event_dev_start(dev_id) < 0) 579 rte_exit(EXIT_FAILURE, "Error starting eventdev"); 580 581 return dev_id; 582 } 583 584 585 struct rx_adptr_services { 586 uint16_t nb_rx_adptrs; 587 uint32_t *rx_adpt_arr; 588 }; 589 590 static int32_t 591 service_rx_adapter(void *arg) 592 { 593 int i; 594 struct rx_adptr_services *adptr_services = arg; 595 596 for (i = 0; i < adptr_services->nb_rx_adptrs; i++) 597 rte_service_run_iter_on_app_lcore( 598 adptr_services->rx_adpt_arr[i], 1); 599 return 0; 600 } 601 602 static void 603 init_adapters(uint16_t nb_ports) 604 { 605 int i; 606 int ret; 607 uint8_t evdev_id = 0; 608 struct rx_adptr_services *adptr_services = NULL; 609 struct rte_event_dev_info dev_info; 610 611 ret = rte_event_dev_info_get(evdev_id, &dev_info); 612 adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0); 613 614 struct rte_event_port_conf adptr_p_conf = { 615 .dequeue_depth = cdata.worker_cq_depth, 616 .enqueue_depth = 64, 617 .new_event_threshold = 4096, 618 }; 619 620 if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth) 621 adptr_p_conf.dequeue_depth = 622 dev_info.max_event_port_dequeue_depth; 623 if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth) 624 adptr_p_conf.enqueue_depth = 625 dev_info.max_event_port_enqueue_depth; 626 627 struct rte_event_eth_rx_adapter_queue_conf queue_conf; 628 memset(&queue_conf, 0, sizeof(queue_conf)); 629 queue_conf.ev.sched_type = cdata.queue_type; 630 631 for (i = 0; i < nb_ports; i++) { 632 uint32_t cap; 633 uint32_t service_id; 634 635 ret = rte_event_eth_rx_adapter_create(i, evdev_id, 636 &adptr_p_conf); 637 if (ret) 638 rte_exit(EXIT_FAILURE, 639 "failed to create rx adapter[%d]", i); 640 641 ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap); 642 if (ret) 643 rte_exit(EXIT_FAILURE, 644 "failed to get event rx adapter " 645 "capabilities"); 646 647 queue_conf.ev.queue_id = cdata.rx_stride ? 648 (i * cdata.rx_stride) 649 : (uint8_t)cdata.qid[0]; 650 651 ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf); 652 if (ret) 653 rte_exit(EXIT_FAILURE, 654 "Failed to add queues to Rx adapter"); 655 656 /* Producer needs to be scheduled. */ 657 if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { 658 ret = rte_event_eth_rx_adapter_service_id_get(i, 659 &service_id); 660 if (ret != -ESRCH && ret != 0) { 661 rte_exit(EXIT_FAILURE, 662 "Error getting the service ID for rx adptr\n"); 663 } 664 665 rte_service_runstate_set(service_id, 1); 666 rte_service_set_runstate_mapped_check(service_id, 0); 667 668 adptr_services->nb_rx_adptrs++; 669 adptr_services->rx_adpt_arr = rte_realloc( 670 adptr_services->rx_adpt_arr, 671 adptr_services->nb_rx_adptrs * 672 sizeof(uint32_t), 0); 673 adptr_services->rx_adpt_arr[ 674 adptr_services->nb_rx_adptrs - 1] = 675 service_id; 676 } 677 678 ret = rte_event_eth_rx_adapter_start(i); 679 if (ret) 680 rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed", 681 i); 682 } 683 684 /* We already know that Tx adapter has INTERNAL port cap*/ 685 ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id, 686 &adptr_p_conf); 687 if (ret) 688 rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]", 689 cdata.tx_adapter_id); 690 691 for (i = 0; i < nb_ports; i++) { 692 ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i, 693 -1); 694 if (ret) 695 rte_exit(EXIT_FAILURE, 696 "Failed to add queues to Tx adapter"); 697 } 698 699 ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id); 700 if (ret) 701 rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed", 702 cdata.tx_adapter_id); 703 704 if (adptr_services->nb_rx_adptrs) { 705 struct rte_service_spec service; 706 707 memset(&service, 0, sizeof(struct rte_service_spec)); 708 snprintf(service.name, sizeof(service.name), "rx_service"); 709 service.callback = service_rx_adapter; 710 service.callback_userdata = (void *)adptr_services; 711 712 int32_t ret = rte_service_component_register(&service, 713 &fdata->rxadptr_service_id); 714 if (ret) 715 rte_exit(EXIT_FAILURE, 716 "Rx adapter service register failed"); 717 718 rte_service_runstate_set(fdata->rxadptr_service_id, 1); 719 rte_service_component_runstate_set(fdata->rxadptr_service_id, 720 1); 721 rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 722 0); 723 } else { 724 memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 725 rte_free(adptr_services); 726 } 727 728 if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap & 729 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)) 730 fdata->cap.scheduler = NULL; 731 } 732 733 static void 734 worker_tx_enq_opt_check(void) 735 { 736 int i; 737 int ret; 738 uint32_t cap = 0; 739 uint8_t rx_needed = 0; 740 uint8_t sched_needed = 0; 741 struct rte_event_dev_info eventdev_info; 742 743 memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); 744 rte_event_dev_info_get(0, &eventdev_info); 745 746 if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & 747 RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) 748 rte_exit(EXIT_FAILURE, 749 "Event dev doesn't support all type queues\n"); 750 sched_needed = !(eventdev_info.event_dev_cap & 751 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED); 752 753 RTE_ETH_FOREACH_DEV(i) { 754 ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); 755 if (ret) 756 rte_exit(EXIT_FAILURE, 757 "failed to get event rx adapter capabilities"); 758 rx_needed |= 759 !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT); 760 } 761 762 if (cdata.worker_lcore_mask == 0 || 763 (rx_needed && cdata.rx_lcore_mask == 0) || 764 (sched_needed && cdata.sched_lcore_mask == 0)) { 765 printf("Core part of pipeline was not assigned any cores. " 766 "This will stall the pipeline, please check core masks " 767 "(use -h for details on setting core masks):\n" 768 "\trx: %"PRIu64"\n\tsched: %"PRIu64 769 "\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask, 770 cdata.sched_lcore_mask, cdata.worker_lcore_mask); 771 rte_exit(-1, "Fix core masks\n"); 772 } 773 774 if (!sched_needed) 775 memset(fdata->sched_core, 0, 776 sizeof(unsigned int) * MAX_NUM_CORE); 777 if (!rx_needed) 778 memset(fdata->rx_core, 0, 779 sizeof(unsigned int) * MAX_NUM_CORE); 780 781 memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 782 } 783 784 static worker_loop 785 get_worker_loop_single_burst(uint8_t atq) 786 { 787 if (atq) 788 return worker_do_tx_single_burst_atq; 789 790 return worker_do_tx_single_burst; 791 } 792 793 static worker_loop 794 get_worker_loop_single_non_burst(uint8_t atq) 795 { 796 if (atq) 797 return worker_do_tx_single_atq; 798 799 return worker_do_tx_single; 800 } 801 802 static worker_loop 803 get_worker_loop_burst(uint8_t atq) 804 { 805 if (atq) 806 return worker_do_tx_burst_atq; 807 808 return worker_do_tx_burst; 809 } 810 811 static worker_loop 812 get_worker_loop_non_burst(uint8_t atq) 813 { 814 if (atq) 815 return worker_do_tx_atq; 816 817 return worker_do_tx; 818 } 819 820 static worker_loop 821 get_worker_single_stage(bool burst) 822 { 823 uint8_t atq = cdata.all_type_queues ? 1 : 0; 824 825 if (burst) 826 return get_worker_loop_single_burst(atq); 827 828 return get_worker_loop_single_non_burst(atq); 829 } 830 831 static worker_loop 832 get_worker_multi_stage(bool burst) 833 { 834 uint8_t atq = cdata.all_type_queues ? 1 : 0; 835 836 if (burst) 837 return get_worker_loop_burst(atq); 838 839 return get_worker_loop_non_burst(atq); 840 } 841 842 void 843 set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst) 844 { 845 if (cdata.num_stages == 1) 846 caps->worker = get_worker_single_stage(burst); 847 else 848 caps->worker = get_worker_multi_stage(burst); 849 850 caps->check_opt = worker_tx_enq_opt_check; 851 caps->scheduler = schedule_devices; 852 caps->evdev_setup = setup_eventdev_worker_tx_enq; 853 caps->adptr_setup = init_adapters; 854 } 855