1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright(c) 2010-2014 Intel Corporation 4 * Copyright 2017 Cavium, Inc. 5 */ 6 7 #include "pipeline_common.h" 8 9 static __rte_always_inline void 10 worker_fwd_event(struct rte_event *ev, uint8_t sched) 11 { 12 ev->event_type = RTE_EVENT_TYPE_CPU; 13 ev->op = RTE_EVENT_OP_FORWARD; 14 ev->sched_type = sched; 15 } 16 17 static __rte_always_inline void 18 worker_event_enqueue(const uint8_t dev, const uint8_t port, 19 struct rte_event *ev) 20 { 21 while (rte_event_enqueue_burst(dev, port, ev, 1) != 1) 22 rte_pause(); 23 } 24 25 static __rte_always_inline void 26 worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, 27 struct rte_event *ev, const uint16_t nb_rx) 28 { 29 uint16_t enq; 30 31 enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); 32 while (enq < nb_rx) { 33 enq += rte_event_enqueue_burst(dev, port, 34 ev + enq, nb_rx - enq); 35 } 36 } 37 38 static __rte_always_inline void 39 worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev) 40 { 41 exchange_mac(ev->mbuf); 42 rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0); 43 while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0)) 44 rte_pause(); 45 } 46 47 /* Single stage pipeline workers */ 48 49 static int 50 worker_do_tx_single(void *arg) 51 { 52 struct worker_data *data = (struct worker_data *)arg; 53 const uint8_t dev = data->dev_id; 54 const uint8_t port = data->port_id; 55 size_t fwd = 0, received = 0, tx = 0; 56 struct rte_event ev; 57 58 while (!fdata->done) { 59 60 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 61 rte_pause(); 62 continue; 63 } 64 65 received++; 66 67 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 68 worker_tx_pkt(dev, port, &ev); 69 tx++; 70 } else { 71 work(); 72 ev.queue_id++; 73 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 74 worker_event_enqueue(dev, port, &ev); 75 fwd++; 76 } 77 } 78 79 if (!cdata.quiet) 80 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 81 rte_lcore_id(), received, fwd, tx); 82 return 0; 83 } 84 85 static int 86 worker_do_tx_single_atq(void *arg) 87 { 88 struct worker_data *data = (struct worker_data *)arg; 89 const uint8_t dev = data->dev_id; 90 const uint8_t port = data->port_id; 91 size_t fwd = 0, received = 0, tx = 0; 92 struct rte_event ev; 93 94 while (!fdata->done) { 95 96 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 97 rte_pause(); 98 continue; 99 } 100 101 received++; 102 103 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 104 worker_tx_pkt(dev, port, &ev); 105 tx++; 106 } else { 107 work(); 108 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 109 worker_event_enqueue(dev, port, &ev); 110 fwd++; 111 } 112 } 113 114 if (!cdata.quiet) 115 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 116 rte_lcore_id(), received, fwd, tx); 117 return 0; 118 } 119 120 static int 121 worker_do_tx_single_burst(void *arg) 122 { 123 struct rte_event ev[BATCH_SIZE + 1]; 124 125 struct worker_data *data = (struct worker_data *)arg; 126 const uint8_t dev = data->dev_id; 127 const uint8_t port = data->port_id; 128 size_t fwd = 0, received = 0, tx = 0; 129 130 while (!fdata->done) { 131 uint16_t i; 132 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 133 BATCH_SIZE, 0); 134 135 if (!nb_rx) { 136 rte_pause(); 137 continue; 138 } 139 received += nb_rx; 140 141 for (i = 0; i < nb_rx; i++) { 142 rte_prefetch0(ev[i + 1].mbuf); 143 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 144 145 worker_tx_pkt(dev, port, &ev[i]); 146 ev[i].op = RTE_EVENT_OP_RELEASE; 147 tx++; 148 149 } else { 150 ev[i].queue_id++; 151 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 152 } 153 work(); 154 } 155 156 worker_event_enqueue_burst(dev, port, ev, nb_rx); 157 fwd += nb_rx; 158 } 159 160 if (!cdata.quiet) 161 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 162 rte_lcore_id(), received, fwd, tx); 163 return 0; 164 } 165 166 static int 167 worker_do_tx_single_burst_atq(void *arg) 168 { 169 struct rte_event ev[BATCH_SIZE + 1]; 170 171 struct worker_data *data = (struct worker_data *)arg; 172 const uint8_t dev = data->dev_id; 173 const uint8_t port = data->port_id; 174 size_t fwd = 0, received = 0, tx = 0; 175 176 while (!fdata->done) { 177 uint16_t i; 178 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 179 BATCH_SIZE, 0); 180 181 if (!nb_rx) { 182 rte_pause(); 183 continue; 184 } 185 186 received += nb_rx; 187 188 for (i = 0; i < nb_rx; i++) { 189 rte_prefetch0(ev[i + 1].mbuf); 190 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 191 192 worker_tx_pkt(dev, port, &ev[i]); 193 ev[i].op = RTE_EVENT_OP_RELEASE; 194 tx++; 195 } else 196 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 197 work(); 198 } 199 200 worker_event_enqueue_burst(dev, port, ev, nb_rx); 201 fwd += nb_rx; 202 } 203 204 if (!cdata.quiet) 205 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 206 rte_lcore_id(), received, fwd, tx); 207 return 0; 208 } 209 210 /* Multi stage Pipeline Workers */ 211 212 static int 213 worker_do_tx(void *arg) 214 { 215 struct rte_event ev; 216 217 struct worker_data *data = (struct worker_data *)arg; 218 const uint8_t dev = data->dev_id; 219 const uint8_t port = data->port_id; 220 const uint8_t lst_qid = cdata.num_stages - 1; 221 size_t fwd = 0, received = 0, tx = 0; 222 223 224 while (!fdata->done) { 225 226 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 227 rte_pause(); 228 continue; 229 } 230 231 received++; 232 const uint8_t cq_id = ev.queue_id % cdata.num_stages; 233 234 if (cq_id >= lst_qid) { 235 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 236 worker_tx_pkt(dev, port, &ev); 237 tx++; 238 continue; 239 } 240 241 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 242 ev.queue_id = (cq_id == lst_qid) ? 243 cdata.next_qid[ev.queue_id] : ev.queue_id; 244 } else { 245 ev.queue_id = cdata.next_qid[ev.queue_id]; 246 worker_fwd_event(&ev, cdata.queue_type); 247 } 248 work(); 249 250 worker_event_enqueue(dev, port, &ev); 251 fwd++; 252 } 253 254 if (!cdata.quiet) 255 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 256 rte_lcore_id(), received, fwd, tx); 257 258 return 0; 259 } 260 261 static int 262 worker_do_tx_atq(void *arg) 263 { 264 struct rte_event ev; 265 266 struct worker_data *data = (struct worker_data *)arg; 267 const uint8_t dev = data->dev_id; 268 const uint8_t port = data->port_id; 269 const uint8_t lst_qid = cdata.num_stages - 1; 270 size_t fwd = 0, received = 0, tx = 0; 271 272 while (!fdata->done) { 273 274 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 275 rte_pause(); 276 continue; 277 } 278 279 received++; 280 const uint8_t cq_id = ev.sub_event_type % cdata.num_stages; 281 282 if (cq_id == lst_qid) { 283 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 284 worker_tx_pkt(dev, port, &ev); 285 tx++; 286 continue; 287 } 288 289 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 290 } else { 291 ev.sub_event_type++; 292 worker_fwd_event(&ev, cdata.queue_type); 293 } 294 work(); 295 296 worker_event_enqueue(dev, port, &ev); 297 fwd++; 298 } 299 300 if (!cdata.quiet) 301 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 302 rte_lcore_id(), received, fwd, tx); 303 304 return 0; 305 } 306 307 static int 308 worker_do_tx_burst(void *arg) 309 { 310 struct rte_event ev[BATCH_SIZE]; 311 312 struct worker_data *data = (struct worker_data *)arg; 313 uint8_t dev = data->dev_id; 314 uint8_t port = data->port_id; 315 uint8_t lst_qid = cdata.num_stages - 1; 316 size_t fwd = 0, received = 0, tx = 0; 317 318 while (!fdata->done) { 319 uint16_t i; 320 const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, 321 ev, BATCH_SIZE, 0); 322 323 if (nb_rx == 0) { 324 rte_pause(); 325 continue; 326 } 327 received += nb_rx; 328 329 for (i = 0; i < nb_rx; i++) { 330 const uint8_t cq_id = ev[i].queue_id % cdata.num_stages; 331 332 if (cq_id >= lst_qid) { 333 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 334 worker_tx_pkt(dev, port, &ev[i]); 335 tx++; 336 ev[i].op = RTE_EVENT_OP_RELEASE; 337 continue; 338 } 339 ev[i].queue_id = (cq_id == lst_qid) ? 340 cdata.next_qid[ev[i].queue_id] : 341 ev[i].queue_id; 342 343 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 344 } else { 345 ev[i].queue_id = cdata.next_qid[ev[i].queue_id]; 346 worker_fwd_event(&ev[i], cdata.queue_type); 347 } 348 work(); 349 } 350 worker_event_enqueue_burst(dev, port, ev, nb_rx); 351 352 fwd += nb_rx; 353 } 354 355 if (!cdata.quiet) 356 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 357 rte_lcore_id(), received, fwd, tx); 358 359 return 0; 360 } 361 362 static int 363 worker_do_tx_burst_atq(void *arg) 364 { 365 struct rte_event ev[BATCH_SIZE]; 366 367 struct worker_data *data = (struct worker_data *)arg; 368 uint8_t dev = data->dev_id; 369 uint8_t port = data->port_id; 370 uint8_t lst_qid = cdata.num_stages - 1; 371 size_t fwd = 0, received = 0, tx = 0; 372 373 while (!fdata->done) { 374 uint16_t i; 375 376 const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, 377 ev, BATCH_SIZE, 0); 378 379 if (nb_rx == 0) { 380 rte_pause(); 381 continue; 382 } 383 received += nb_rx; 384 385 for (i = 0; i < nb_rx; i++) { 386 const uint8_t cq_id = ev[i].sub_event_type % 387 cdata.num_stages; 388 389 if (cq_id == lst_qid) { 390 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 391 worker_tx_pkt(dev, port, &ev[i]); 392 tx++; 393 ev[i].op = RTE_EVENT_OP_RELEASE; 394 continue; 395 } 396 397 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 398 } else { 399 ev[i].sub_event_type++; 400 worker_fwd_event(&ev[i], cdata.queue_type); 401 } 402 work(); 403 } 404 405 worker_event_enqueue_burst(dev, port, ev, nb_rx); 406 fwd += nb_rx; 407 } 408 409 if (!cdata.quiet) 410 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 411 rte_lcore_id(), received, fwd, tx); 412 413 return 0; 414 } 415 416 static int 417 setup_eventdev_worker_tx_enq(struct worker_data *worker_data) 418 { 419 uint8_t i; 420 const uint8_t atq = cdata.all_type_queues ? 1 : 0; 421 const uint8_t dev_id = 0; 422 const uint8_t nb_ports = cdata.num_workers; 423 uint8_t nb_slots = 0; 424 uint8_t nb_queues = rte_eth_dev_count_avail(); 425 426 /* 427 * In case where all type queues are not enabled, use queues equal to 428 * number of stages * eth_dev_count and one extra queue per pipeline 429 * for Tx. 430 */ 431 if (!atq) { 432 nb_queues *= cdata.num_stages; 433 nb_queues += rte_eth_dev_count_avail(); 434 } 435 436 struct rte_event_dev_config config = { 437 .nb_event_queues = nb_queues, 438 .nb_event_ports = nb_ports, 439 .nb_events_limit = 4096, 440 .nb_event_queue_flows = 1024, 441 .nb_event_port_dequeue_depth = 128, 442 .nb_event_port_enqueue_depth = 128, 443 }; 444 struct rte_event_port_conf wkr_p_conf = { 445 .dequeue_depth = cdata.worker_cq_depth, 446 .enqueue_depth = 64, 447 .new_event_threshold = 4096, 448 }; 449 struct rte_event_queue_conf wkr_q_conf = { 450 .schedule_type = cdata.queue_type, 451 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 452 .nb_atomic_flows = 1024, 453 .nb_atomic_order_sequences = 1024, 454 }; 455 456 int ret, ndev = rte_event_dev_count(); 457 458 if (ndev < 1) { 459 printf("%d: No Eventdev Devices Found\n", __LINE__); 460 return -1; 461 } 462 463 464 struct rte_event_dev_info dev_info; 465 ret = rte_event_dev_info_get(dev_id, &dev_info); 466 printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); 467 468 if (dev_info.max_num_events < config.nb_events_limit) 469 config.nb_events_limit = dev_info.max_num_events; 470 if (dev_info.max_event_port_dequeue_depth < 471 config.nb_event_port_dequeue_depth) 472 config.nb_event_port_dequeue_depth = 473 dev_info.max_event_port_dequeue_depth; 474 if (dev_info.max_event_port_enqueue_depth < 475 config.nb_event_port_enqueue_depth) 476 config.nb_event_port_enqueue_depth = 477 dev_info.max_event_port_enqueue_depth; 478 479 ret = rte_event_dev_configure(dev_id, &config); 480 if (ret < 0) { 481 printf("%d: Error configuring device\n", __LINE__); 482 return -1; 483 } 484 485 printf(" Stages:\n"); 486 for (i = 0; i < nb_queues; i++) { 487 488 if (atq) { 489 490 nb_slots = cdata.num_stages; 491 wkr_q_conf.event_queue_cfg = 492 RTE_EVENT_QUEUE_CFG_ALL_TYPES; 493 } else { 494 uint8_t slot; 495 496 nb_slots = cdata.num_stages + 1; 497 slot = i % nb_slots; 498 wkr_q_conf.schedule_type = slot == cdata.num_stages ? 499 RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; 500 } 501 502 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) { 503 printf("%d: error creating qid %d\n", __LINE__, i); 504 return -1; 505 } 506 cdata.qid[i] = i; 507 cdata.next_qid[i] = i+1; 508 if (cdata.enable_queue_priorities) { 509 const uint32_t prio_delta = 510 (RTE_EVENT_DEV_PRIORITY_LOWEST) / 511 nb_slots; 512 513 /* higher priority for queues closer to tx */ 514 wkr_q_conf.priority = 515 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * 516 (i % nb_slots); 517 } 518 519 const char *type_str = "Atomic"; 520 switch (wkr_q_conf.schedule_type) { 521 case RTE_SCHED_TYPE_ORDERED: 522 type_str = "Ordered"; 523 break; 524 case RTE_SCHED_TYPE_PARALLEL: 525 type_str = "Parallel"; 526 break; 527 } 528 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str, 529 wkr_q_conf.priority); 530 } 531 532 printf("\n"); 533 if (wkr_p_conf.new_event_threshold > config.nb_events_limit) 534 wkr_p_conf.new_event_threshold = config.nb_events_limit; 535 if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) 536 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; 537 if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) 538 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; 539 540 /* set up one port per worker, linking to all stage queues */ 541 for (i = 0; i < cdata.num_workers; i++) { 542 struct worker_data *w = &worker_data[i]; 543 w->dev_id = dev_id; 544 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) { 545 printf("Error setting up port %d\n", i); 546 return -1; 547 } 548 549 if (rte_event_port_link(dev_id, i, NULL, NULL, 0) 550 != nb_queues) { 551 printf("%d: error creating link for port %d\n", 552 __LINE__, i); 553 return -1; 554 } 555 w->port_id = i; 556 } 557 /* 558 * Reduce the load on ingress event queue by splitting the traffic 559 * across multiple event queues. 560 * for example, nb_stages = 2 and nb_ethdev = 2 then 561 * 562 * nb_queues = (2 * 2) + 2 = 6 (non atq) 563 * rx_stride = 3 564 * 565 * So, traffic is split across queue 0 and queue 3 since queue id for 566 * rx adapter is chosen <ethport_id> * <rx_stride> i.e in the above 567 * case eth port 0, 1 will inject packets into event queue 0, 3 568 * respectively. 569 * 570 * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx. 571 */ 572 cdata.rx_stride = atq ? 1 : nb_slots; 573 ret = rte_event_dev_service_id_get(dev_id, 574 &fdata->evdev_service_id); 575 if (ret != -ESRCH && ret != 0) { 576 printf("Error getting the service ID\n"); 577 return -1; 578 } 579 rte_service_runstate_set(fdata->evdev_service_id, 1); 580 rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0); 581 582 if (rte_event_dev_start(dev_id) < 0) 583 rte_exit(EXIT_FAILURE, "Error starting eventdev"); 584 585 return dev_id; 586 } 587 588 589 struct rx_adptr_services { 590 uint16_t nb_rx_adptrs; 591 uint32_t *rx_adpt_arr; 592 }; 593 594 static int32_t 595 service_rx_adapter(void *arg) 596 { 597 int i; 598 struct rx_adptr_services *adptr_services = arg; 599 600 for (i = 0; i < adptr_services->nb_rx_adptrs; i++) 601 rte_service_run_iter_on_app_lcore( 602 adptr_services->rx_adpt_arr[i], 1); 603 return 0; 604 } 605 606 static void 607 init_adapters(uint16_t nb_ports) 608 { 609 int i; 610 int ret; 611 uint8_t evdev_id = 0; 612 struct rx_adptr_services *adptr_services = NULL; 613 struct rte_event_dev_info dev_info; 614 615 ret = rte_event_dev_info_get(evdev_id, &dev_info); 616 adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0); 617 618 struct rte_event_port_conf adptr_p_conf = { 619 .dequeue_depth = cdata.worker_cq_depth, 620 .enqueue_depth = 64, 621 .new_event_threshold = 4096, 622 }; 623 624 if (adptr_p_conf.new_event_threshold > dev_info.max_num_events) 625 adptr_p_conf.new_event_threshold = dev_info.max_num_events; 626 if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth) 627 adptr_p_conf.dequeue_depth = 628 dev_info.max_event_port_dequeue_depth; 629 if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth) 630 adptr_p_conf.enqueue_depth = 631 dev_info.max_event_port_enqueue_depth; 632 633 struct rte_event_eth_rx_adapter_queue_conf queue_conf; 634 memset(&queue_conf, 0, sizeof(queue_conf)); 635 queue_conf.ev.sched_type = cdata.queue_type; 636 637 for (i = 0; i < nb_ports; i++) { 638 uint32_t cap; 639 uint32_t service_id; 640 641 ret = rte_event_eth_rx_adapter_create(i, evdev_id, 642 &adptr_p_conf); 643 if (ret) 644 rte_exit(EXIT_FAILURE, 645 "failed to create rx adapter[%d]", i); 646 647 ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap); 648 if (ret) 649 rte_exit(EXIT_FAILURE, 650 "failed to get event rx adapter " 651 "capabilities"); 652 653 queue_conf.ev.queue_id = cdata.rx_stride ? 654 (i * cdata.rx_stride) 655 : (uint8_t)cdata.qid[0]; 656 657 ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf); 658 if (ret) 659 rte_exit(EXIT_FAILURE, 660 "Failed to add queues to Rx adapter"); 661 662 /* Producer needs to be scheduled. */ 663 if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { 664 ret = rte_event_eth_rx_adapter_service_id_get(i, 665 &service_id); 666 if (ret != -ESRCH && ret != 0) { 667 rte_exit(EXIT_FAILURE, 668 "Error getting the service ID for rx adptr\n"); 669 } 670 671 rte_service_runstate_set(service_id, 1); 672 rte_service_set_runstate_mapped_check(service_id, 0); 673 674 adptr_services->nb_rx_adptrs++; 675 adptr_services->rx_adpt_arr = rte_realloc( 676 adptr_services->rx_adpt_arr, 677 adptr_services->nb_rx_adptrs * 678 sizeof(uint32_t), 0); 679 adptr_services->rx_adpt_arr[ 680 adptr_services->nb_rx_adptrs - 1] = 681 service_id; 682 } 683 684 ret = rte_event_eth_rx_adapter_start(i); 685 if (ret) 686 rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed", 687 i); 688 } 689 690 /* We already know that Tx adapter has INTERNAL port cap*/ 691 ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id, 692 &adptr_p_conf); 693 if (ret) 694 rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]", 695 cdata.tx_adapter_id); 696 697 for (i = 0; i < nb_ports; i++) { 698 ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i, 699 -1); 700 if (ret) 701 rte_exit(EXIT_FAILURE, 702 "Failed to add queues to Tx adapter"); 703 } 704 705 ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id); 706 if (ret) 707 rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed", 708 cdata.tx_adapter_id); 709 710 if (adptr_services->nb_rx_adptrs) { 711 struct rte_service_spec service; 712 713 memset(&service, 0, sizeof(struct rte_service_spec)); 714 snprintf(service.name, sizeof(service.name), "rx_service"); 715 service.callback = service_rx_adapter; 716 service.callback_userdata = (void *)adptr_services; 717 718 int32_t ret = rte_service_component_register(&service, 719 &fdata->rxadptr_service_id); 720 if (ret) 721 rte_exit(EXIT_FAILURE, 722 "Rx adapter service register failed"); 723 724 rte_service_runstate_set(fdata->rxadptr_service_id, 1); 725 rte_service_component_runstate_set(fdata->rxadptr_service_id, 726 1); 727 rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 728 0); 729 } else { 730 memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 731 rte_free(adptr_services); 732 } 733 734 if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap & 735 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)) 736 fdata->cap.scheduler = NULL; 737 } 738 739 static void 740 worker_tx_enq_opt_check(void) 741 { 742 int i; 743 int ret; 744 uint32_t cap = 0; 745 uint8_t rx_needed = 0; 746 uint8_t sched_needed = 0; 747 struct rte_event_dev_info eventdev_info; 748 749 memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); 750 rte_event_dev_info_get(0, &eventdev_info); 751 752 if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & 753 RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) 754 rte_exit(EXIT_FAILURE, 755 "Event dev doesn't support all type queues\n"); 756 sched_needed = !(eventdev_info.event_dev_cap & 757 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED); 758 759 RTE_ETH_FOREACH_DEV(i) { 760 ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); 761 if (ret) 762 rte_exit(EXIT_FAILURE, 763 "failed to get event rx adapter capabilities"); 764 rx_needed |= 765 !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT); 766 } 767 768 if (cdata.worker_lcore_mask == 0 || 769 (rx_needed && cdata.rx_lcore_mask == 0) || 770 (sched_needed && cdata.sched_lcore_mask == 0)) { 771 printf("Core part of pipeline was not assigned any cores. " 772 "This will stall the pipeline, please check core masks " 773 "(use -h for details on setting core masks):\n" 774 "\trx: %"PRIu64"\n\tsched: %"PRIu64 775 "\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask, 776 cdata.sched_lcore_mask, cdata.worker_lcore_mask); 777 rte_exit(-1, "Fix core masks\n"); 778 } 779 780 if (!sched_needed) 781 memset(fdata->sched_core, 0, 782 sizeof(unsigned int) * MAX_NUM_CORE); 783 if (!rx_needed) 784 memset(fdata->rx_core, 0, 785 sizeof(unsigned int) * MAX_NUM_CORE); 786 787 memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 788 } 789 790 static worker_loop 791 get_worker_loop_single_burst(uint8_t atq) 792 { 793 if (atq) 794 return worker_do_tx_single_burst_atq; 795 796 return worker_do_tx_single_burst; 797 } 798 799 static worker_loop 800 get_worker_loop_single_non_burst(uint8_t atq) 801 { 802 if (atq) 803 return worker_do_tx_single_atq; 804 805 return worker_do_tx_single; 806 } 807 808 static worker_loop 809 get_worker_loop_burst(uint8_t atq) 810 { 811 if (atq) 812 return worker_do_tx_burst_atq; 813 814 return worker_do_tx_burst; 815 } 816 817 static worker_loop 818 get_worker_loop_non_burst(uint8_t atq) 819 { 820 if (atq) 821 return worker_do_tx_atq; 822 823 return worker_do_tx; 824 } 825 826 static worker_loop 827 get_worker_single_stage(bool burst) 828 { 829 uint8_t atq = cdata.all_type_queues ? 1 : 0; 830 831 if (burst) 832 return get_worker_loop_single_burst(atq); 833 834 return get_worker_loop_single_non_burst(atq); 835 } 836 837 static worker_loop 838 get_worker_multi_stage(bool burst) 839 { 840 uint8_t atq = cdata.all_type_queues ? 1 : 0; 841 842 if (burst) 843 return get_worker_loop_burst(atq); 844 845 return get_worker_loop_non_burst(atq); 846 } 847 848 void 849 set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst) 850 { 851 if (cdata.num_stages == 1) 852 caps->worker = get_worker_single_stage(burst); 853 else 854 caps->worker = get_worker_multi_stage(burst); 855 856 caps->check_opt = worker_tx_enq_opt_check; 857 caps->scheduler = schedule_devices; 858 caps->evdev_setup = setup_eventdev_worker_tx_enq; 859 caps->adptr_setup = init_adapters; 860 } 861