1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright(c) 2010-2014 Intel Corporation 4 * Copyright 2017 Cavium, Inc. 5 */ 6 7 #include "pipeline_common.h" 8 9 static __rte_always_inline void 10 worker_fwd_event(struct rte_event *ev, uint8_t sched) 11 { 12 ev->event_type = RTE_EVENT_TYPE_CPU; 13 ev->op = RTE_EVENT_OP_FORWARD; 14 ev->sched_type = sched; 15 } 16 17 static __rte_always_inline void 18 worker_event_enqueue(const uint8_t dev, const uint8_t port, 19 struct rte_event *ev) 20 { 21 while (rte_event_enqueue_burst(dev, port, ev, 1) != 1) 22 rte_pause(); 23 } 24 25 static __rte_always_inline void 26 worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, 27 struct rte_event *ev, const uint16_t nb_rx) 28 { 29 uint16_t enq; 30 31 enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); 32 while (enq < nb_rx) { 33 enq += rte_event_enqueue_burst(dev, port, 34 ev + enq, nb_rx - enq); 35 } 36 } 37 38 static __rte_always_inline void 39 worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev) 40 { 41 exchange_mac(ev->mbuf); 42 rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0); 43 while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0)) 44 rte_pause(); 45 } 46 47 /* Single stage pipeline workers */ 48 49 static int 50 worker_do_tx_single(void *arg) 51 { 52 struct worker_data *data = (struct worker_data *)arg; 53 const uint8_t dev = data->dev_id; 54 const uint8_t port = data->port_id; 55 size_t fwd = 0, received = 0, tx = 0; 56 struct rte_event ev; 57 58 while (!fdata->done) { 59 60 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 61 rte_pause(); 62 continue; 63 } 64 65 received++; 66 67 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 68 worker_tx_pkt(dev, port, &ev); 69 tx++; 70 } else { 71 work(); 72 ev.queue_id++; 73 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 74 worker_event_enqueue(dev, port, &ev); 75 fwd++; 76 } 77 } 78 79 if (!cdata.quiet) 80 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 81 rte_lcore_id(), received, fwd, tx); 82 return 0; 83 } 84 85 static int 86 worker_do_tx_single_atq(void *arg) 87 { 88 struct worker_data *data = (struct worker_data *)arg; 89 const uint8_t dev = data->dev_id; 90 const uint8_t port = data->port_id; 91 size_t fwd = 0, received = 0, tx = 0; 92 struct rte_event ev; 93 94 while (!fdata->done) { 95 96 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 97 rte_pause(); 98 continue; 99 } 100 101 received++; 102 103 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 104 worker_tx_pkt(dev, port, &ev); 105 tx++; 106 } else { 107 work(); 108 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 109 worker_event_enqueue(dev, port, &ev); 110 fwd++; 111 } 112 } 113 114 if (!cdata.quiet) 115 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 116 rte_lcore_id(), received, fwd, tx); 117 return 0; 118 } 119 120 static int 121 worker_do_tx_single_burst(void *arg) 122 { 123 struct rte_event ev[BATCH_SIZE + 1]; 124 125 struct worker_data *data = (struct worker_data *)arg; 126 const uint8_t dev = data->dev_id; 127 const uint8_t port = data->port_id; 128 size_t fwd = 0, received = 0, tx = 0; 129 130 while (!fdata->done) { 131 uint16_t i; 132 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 133 BATCH_SIZE, 0); 134 135 if (!nb_rx) { 136 rte_pause(); 137 continue; 138 } 139 received += nb_rx; 140 141 for (i = 0; i < nb_rx; i++) { 142 rte_prefetch0(ev[i + 1].mbuf); 143 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 144 145 worker_tx_pkt(dev, port, &ev[i]); 146 ev[i].op = RTE_EVENT_OP_RELEASE; 147 tx++; 148 149 } else { 150 ev[i].queue_id++; 151 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 152 } 153 work(); 154 } 155 156 worker_event_enqueue_burst(dev, port, ev, nb_rx); 157 fwd += nb_rx; 158 } 159 160 if (!cdata.quiet) 161 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 162 rte_lcore_id(), received, fwd, tx); 163 return 0; 164 } 165 166 static int 167 worker_do_tx_single_burst_atq(void *arg) 168 { 169 struct rte_event ev[BATCH_SIZE + 1]; 170 171 struct worker_data *data = (struct worker_data *)arg; 172 const uint8_t dev = data->dev_id; 173 const uint8_t port = data->port_id; 174 size_t fwd = 0, received = 0, tx = 0; 175 176 while (!fdata->done) { 177 uint16_t i; 178 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 179 BATCH_SIZE, 0); 180 181 if (!nb_rx) { 182 rte_pause(); 183 continue; 184 } 185 186 received += nb_rx; 187 188 for (i = 0; i < nb_rx; i++) { 189 rte_prefetch0(ev[i + 1].mbuf); 190 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 191 192 worker_tx_pkt(dev, port, &ev[i]); 193 ev[i].op = RTE_EVENT_OP_RELEASE; 194 tx++; 195 } else 196 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 197 work(); 198 } 199 200 worker_event_enqueue_burst(dev, port, ev, nb_rx); 201 fwd += nb_rx; 202 } 203 204 if (!cdata.quiet) 205 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 206 rte_lcore_id(), received, fwd, tx); 207 return 0; 208 } 209 210 /* Multi stage Pipeline Workers */ 211 212 static int 213 worker_do_tx(void *arg) 214 { 215 struct rte_event ev; 216 217 struct worker_data *data = (struct worker_data *)arg; 218 const uint8_t dev = data->dev_id; 219 const uint8_t port = data->port_id; 220 const uint8_t lst_qid = cdata.num_stages - 1; 221 size_t fwd = 0, received = 0, tx = 0; 222 223 224 while (!fdata->done) { 225 226 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 227 rte_pause(); 228 continue; 229 } 230 231 received++; 232 const uint8_t cq_id = ev.queue_id % cdata.num_stages; 233 234 if (cq_id >= lst_qid) { 235 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 236 worker_tx_pkt(dev, port, &ev); 237 tx++; 238 continue; 239 } 240 241 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 242 ev.queue_id = (cq_id == lst_qid) ? 243 cdata.next_qid[ev.queue_id] : ev.queue_id; 244 } else { 245 ev.queue_id = cdata.next_qid[ev.queue_id]; 246 worker_fwd_event(&ev, cdata.queue_type); 247 } 248 work(); 249 250 worker_event_enqueue(dev, port, &ev); 251 fwd++; 252 } 253 254 if (!cdata.quiet) 255 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 256 rte_lcore_id(), received, fwd, tx); 257 258 return 0; 259 } 260 261 static int 262 worker_do_tx_atq(void *arg) 263 { 264 struct rte_event ev; 265 266 struct worker_data *data = (struct worker_data *)arg; 267 const uint8_t dev = data->dev_id; 268 const uint8_t port = data->port_id; 269 const uint8_t lst_qid = cdata.num_stages - 1; 270 size_t fwd = 0, received = 0, tx = 0; 271 272 while (!fdata->done) { 273 274 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 275 rte_pause(); 276 continue; 277 } 278 279 received++; 280 const uint8_t cq_id = ev.sub_event_type % cdata.num_stages; 281 282 if (cq_id == lst_qid) { 283 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 284 worker_tx_pkt(dev, port, &ev); 285 tx++; 286 continue; 287 } 288 289 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 290 } else { 291 ev.sub_event_type++; 292 worker_fwd_event(&ev, cdata.queue_type); 293 } 294 work(); 295 296 worker_event_enqueue(dev, port, &ev); 297 fwd++; 298 } 299 300 if (!cdata.quiet) 301 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 302 rte_lcore_id(), received, fwd, tx); 303 304 return 0; 305 } 306 307 static int 308 worker_do_tx_burst(void *arg) 309 { 310 struct rte_event ev[BATCH_SIZE]; 311 312 struct worker_data *data = (struct worker_data *)arg; 313 uint8_t dev = data->dev_id; 314 uint8_t port = data->port_id; 315 uint8_t lst_qid = cdata.num_stages - 1; 316 size_t fwd = 0, received = 0, tx = 0; 317 318 while (!fdata->done) { 319 uint16_t i; 320 const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, 321 ev, BATCH_SIZE, 0); 322 323 if (nb_rx == 0) { 324 rte_pause(); 325 continue; 326 } 327 received += nb_rx; 328 329 for (i = 0; i < nb_rx; i++) { 330 const uint8_t cq_id = ev[i].queue_id % cdata.num_stages; 331 332 if (cq_id >= lst_qid) { 333 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 334 worker_tx_pkt(dev, port, &ev[i]); 335 tx++; 336 ev[i].op = RTE_EVENT_OP_RELEASE; 337 continue; 338 } 339 ev[i].queue_id = (cq_id == lst_qid) ? 340 cdata.next_qid[ev[i].queue_id] : 341 ev[i].queue_id; 342 343 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 344 } else { 345 ev[i].queue_id = cdata.next_qid[ev[i].queue_id]; 346 worker_fwd_event(&ev[i], cdata.queue_type); 347 } 348 work(); 349 } 350 worker_event_enqueue_burst(dev, port, ev, nb_rx); 351 352 fwd += nb_rx; 353 } 354 355 if (!cdata.quiet) 356 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 357 rte_lcore_id(), received, fwd, tx); 358 359 return 0; 360 } 361 362 static int 363 worker_do_tx_burst_atq(void *arg) 364 { 365 struct rte_event ev[BATCH_SIZE]; 366 367 struct worker_data *data = (struct worker_data *)arg; 368 uint8_t dev = data->dev_id; 369 uint8_t port = data->port_id; 370 uint8_t lst_qid = cdata.num_stages - 1; 371 size_t fwd = 0, received = 0, tx = 0; 372 373 while (!fdata->done) { 374 uint16_t i; 375 376 const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, 377 ev, BATCH_SIZE, 0); 378 379 if (nb_rx == 0) { 380 rte_pause(); 381 continue; 382 } 383 received += nb_rx; 384 385 for (i = 0; i < nb_rx; i++) { 386 const uint8_t cq_id = ev[i].sub_event_type % 387 cdata.num_stages; 388 389 if (cq_id == lst_qid) { 390 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 391 worker_tx_pkt(dev, port, &ev[i]); 392 tx++; 393 ev[i].op = RTE_EVENT_OP_RELEASE; 394 continue; 395 } 396 397 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 398 } else { 399 ev[i].sub_event_type++; 400 worker_fwd_event(&ev[i], cdata.queue_type); 401 } 402 work(); 403 } 404 405 worker_event_enqueue_burst(dev, port, ev, nb_rx); 406 fwd += nb_rx; 407 } 408 409 if (!cdata.quiet) 410 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 411 rte_lcore_id(), received, fwd, tx); 412 413 return 0; 414 } 415 416 static int 417 setup_eventdev_worker_tx_enq(struct worker_data *worker_data) 418 { 419 uint8_t i; 420 const uint8_t atq = cdata.all_type_queues ? 1 : 0; 421 const uint8_t dev_id = 0; 422 const uint8_t nb_ports = cdata.num_workers; 423 uint8_t nb_slots = 0; 424 uint8_t nb_queues = rte_eth_dev_count_avail(); 425 426 /* 427 * In case where all type queues are not enabled, use queues equal to 428 * number of stages * eth_dev_count and one extra queue per pipeline 429 * for Tx. 430 */ 431 if (!atq) { 432 nb_queues *= cdata.num_stages; 433 nb_queues += rte_eth_dev_count_avail(); 434 } 435 436 struct rte_event_dev_config config = { 437 .nb_event_queues = nb_queues, 438 .nb_event_ports = nb_ports, 439 .nb_events_limit = 4096, 440 .nb_event_queue_flows = 1024, 441 .nb_event_port_dequeue_depth = 128, 442 .nb_event_port_enqueue_depth = 128, 443 }; 444 struct rte_event_port_conf wkr_p_conf = { 445 .dequeue_depth = cdata.worker_cq_depth, 446 .enqueue_depth = 64, 447 .new_event_threshold = 4096, 448 }; 449 struct rte_event_queue_conf wkr_q_conf = { 450 .schedule_type = cdata.queue_type, 451 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 452 .nb_atomic_flows = 1024, 453 .nb_atomic_order_sequences = 1024, 454 }; 455 456 int ret, ndev = rte_event_dev_count(); 457 458 if (ndev < 1) { 459 printf("%d: No Eventdev Devices Found\n", __LINE__); 460 return -1; 461 } 462 463 464 struct rte_event_dev_info dev_info; 465 ret = rte_event_dev_info_get(dev_id, &dev_info); 466 printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); 467 468 if (dev_info.max_num_events < config.nb_events_limit) 469 config.nb_events_limit = dev_info.max_num_events; 470 if (dev_info.max_event_port_dequeue_depth < 471 config.nb_event_port_dequeue_depth) 472 config.nb_event_port_dequeue_depth = 473 dev_info.max_event_port_dequeue_depth; 474 if (dev_info.max_event_port_enqueue_depth < 475 config.nb_event_port_enqueue_depth) 476 config.nb_event_port_enqueue_depth = 477 dev_info.max_event_port_enqueue_depth; 478 479 ret = rte_event_dev_configure(dev_id, &config); 480 if (ret < 0) { 481 printf("%d: Error configuring device\n", __LINE__); 482 return -1; 483 } 484 485 printf(" Stages:\n"); 486 for (i = 0; i < nb_queues; i++) { 487 488 if (atq) { 489 490 nb_slots = cdata.num_stages; 491 wkr_q_conf.event_queue_cfg = 492 RTE_EVENT_QUEUE_CFG_ALL_TYPES; 493 } else { 494 uint8_t slot; 495 496 nb_slots = cdata.num_stages + 1; 497 slot = i % nb_slots; 498 wkr_q_conf.schedule_type = slot == cdata.num_stages ? 499 RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; 500 } 501 502 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) { 503 printf("%d: error creating qid %d\n", __LINE__, i); 504 return -1; 505 } 506 cdata.qid[i] = i; 507 cdata.next_qid[i] = i+1; 508 if (cdata.enable_queue_priorities) { 509 const uint32_t prio_delta = 510 (RTE_EVENT_DEV_PRIORITY_LOWEST) / 511 nb_slots; 512 513 /* higher priority for queues closer to tx */ 514 wkr_q_conf.priority = 515 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * 516 (i % nb_slots); 517 } 518 519 const char *type_str = "Atomic"; 520 switch (wkr_q_conf.schedule_type) { 521 case RTE_SCHED_TYPE_ORDERED: 522 type_str = "Ordered"; 523 break; 524 case RTE_SCHED_TYPE_PARALLEL: 525 type_str = "Parallel"; 526 break; 527 } 528 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str, 529 wkr_q_conf.priority); 530 } 531 532 printf("\n"); 533 if (wkr_p_conf.new_event_threshold > config.nb_events_limit) 534 wkr_p_conf.new_event_threshold = config.nb_events_limit; 535 if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) 536 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; 537 if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) 538 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; 539 540 /* set up one port per worker, linking to all stage queues */ 541 for (i = 0; i < cdata.num_workers; i++) { 542 struct worker_data *w = &worker_data[i]; 543 w->dev_id = dev_id; 544 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) { 545 printf("Error setting up port %d\n", i); 546 return -1; 547 } 548 549 if (rte_event_port_link(dev_id, i, NULL, NULL, 0) 550 != nb_queues) { 551 printf("%d: error creating link for port %d\n", 552 __LINE__, i); 553 return -1; 554 } 555 w->port_id = i; 556 } 557 /* 558 * Reduce the load on ingress event queue by splitting the traffic 559 * across multiple event queues. 560 * for example, nb_stages = 2 and nb_ethdev = 2 then 561 * 562 * nb_queues = (2 * 2) + 2 = 6 (non atq) 563 * rx_stride = 3 564 * 565 * So, traffic is split across queue 0 and queue 3 since queue id for 566 * rx adapter is chosen <ethport_id> * <rx_stride> i.e in the above 567 * case eth port 0, 1 will inject packets into event queue 0, 3 568 * respectively. 569 * 570 * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx. 571 */ 572 cdata.rx_stride = atq ? 1 : nb_slots; 573 ret = rte_event_dev_service_id_get(dev_id, 574 &fdata->evdev_service_id); 575 if (ret != -ESRCH && ret != 0) { 576 printf("Error getting the service ID\n"); 577 return -1; 578 } 579 rte_service_runstate_set(fdata->evdev_service_id, 1); 580 rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0); 581 582 if (rte_event_dev_start(dev_id) < 0) 583 rte_exit(EXIT_FAILURE, "Error starting eventdev"); 584 585 return dev_id; 586 } 587 588 589 struct rx_adptr_services { 590 uint16_t nb_rx_adptrs; 591 uint32_t *rx_adpt_arr; 592 }; 593 594 static int32_t 595 service_rx_adapter(void *arg) 596 { 597 int i; 598 struct rx_adptr_services *adptr_services = arg; 599 600 for (i = 0; i < adptr_services->nb_rx_adptrs; i++) 601 rte_service_run_iter_on_app_lcore( 602 adptr_services->rx_adpt_arr[i], 1); 603 return 0; 604 } 605 606 /* 607 * Initializes a given port using global settings and with the RX buffers 608 * coming from the mbuf_pool passed as a parameter. 609 */ 610 static inline int 611 port_init(uint8_t port, struct rte_mempool *mbuf_pool) 612 { 613 struct rte_eth_rxconf rx_conf; 614 static const struct rte_eth_conf port_conf_default = { 615 .rxmode = { 616 .mq_mode = ETH_MQ_RX_RSS, 617 .max_rx_pkt_len = RTE_ETHER_MAX_LEN, 618 }, 619 .rx_adv_conf = { 620 .rss_conf = { 621 .rss_hf = ETH_RSS_IP | 622 ETH_RSS_TCP | 623 ETH_RSS_UDP, 624 } 625 } 626 }; 627 const uint16_t rx_rings = 1, tx_rings = 1; 628 const uint16_t rx_ring_size = 512, tx_ring_size = 512; 629 struct rte_eth_conf port_conf = port_conf_default; 630 int retval; 631 uint16_t q; 632 struct rte_eth_dev_info dev_info; 633 struct rte_eth_txconf txconf; 634 635 if (!rte_eth_dev_is_valid_port(port)) 636 return -1; 637 638 retval = rte_eth_dev_info_get(port, &dev_info); 639 if (retval != 0) { 640 printf("Error during getting device (port %u) info: %s\n", 641 port, strerror(-retval)); 642 return retval; 643 } 644 645 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 646 port_conf.txmode.offloads |= 647 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 648 rx_conf = dev_info.default_rxconf; 649 rx_conf.offloads = port_conf.rxmode.offloads; 650 651 port_conf.rx_adv_conf.rss_conf.rss_hf &= 652 dev_info.flow_type_rss_offloads; 653 if (port_conf.rx_adv_conf.rss_conf.rss_hf != 654 port_conf_default.rx_adv_conf.rss_conf.rss_hf) { 655 printf("Port %u modified RSS hash function based on hardware support," 656 "requested:%#"PRIx64" configured:%#"PRIx64"\n", 657 port, 658 port_conf_default.rx_adv_conf.rss_conf.rss_hf, 659 port_conf.rx_adv_conf.rss_conf.rss_hf); 660 } 661 662 /* Configure the Ethernet device. */ 663 retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf); 664 if (retval != 0) 665 return retval; 666 667 /* Allocate and set up 1 RX queue per Ethernet port. */ 668 for (q = 0; q < rx_rings; q++) { 669 retval = rte_eth_rx_queue_setup(port, q, rx_ring_size, 670 rte_eth_dev_socket_id(port), &rx_conf, 671 mbuf_pool); 672 if (retval < 0) 673 return retval; 674 } 675 676 txconf = dev_info.default_txconf; 677 txconf.offloads = port_conf_default.txmode.offloads; 678 /* Allocate and set up 1 TX queue per Ethernet port. */ 679 for (q = 0; q < tx_rings; q++) { 680 retval = rte_eth_tx_queue_setup(port, q, tx_ring_size, 681 rte_eth_dev_socket_id(port), &txconf); 682 if (retval < 0) 683 return retval; 684 } 685 686 /* Display the port MAC address. */ 687 struct rte_ether_addr addr; 688 retval = rte_eth_macaddr_get(port, &addr); 689 if (retval != 0) { 690 printf("Failed to get MAC address (port %u): %s\n", 691 port, rte_strerror(-retval)); 692 return retval; 693 } 694 695 printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8 696 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n", 697 (unsigned int)port, 698 addr.addr_bytes[0], addr.addr_bytes[1], 699 addr.addr_bytes[2], addr.addr_bytes[3], 700 addr.addr_bytes[4], addr.addr_bytes[5]); 701 702 /* Enable RX in promiscuous mode for the Ethernet device. */ 703 retval = rte_eth_promiscuous_enable(port); 704 if (retval != 0) 705 return retval; 706 707 return 0; 708 } 709 710 static int 711 init_ports(uint16_t num_ports) 712 { 713 uint16_t portid; 714 715 if (!cdata.num_mbuf) 716 cdata.num_mbuf = 16384 * num_ports; 717 718 struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool", 719 /* mbufs */ cdata.num_mbuf, 720 /* cache_size */ 512, 721 /* priv_size*/ 0, 722 /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE, 723 rte_socket_id()); 724 725 RTE_ETH_FOREACH_DEV(portid) 726 if (port_init(portid, mp) != 0) 727 rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n", 728 portid); 729 730 return 0; 731 } 732 733 static void 734 init_adapters(uint16_t nb_ports) 735 { 736 int i; 737 int ret; 738 uint8_t evdev_id = 0; 739 struct rx_adptr_services *adptr_services = NULL; 740 struct rte_event_dev_info dev_info; 741 742 ret = rte_event_dev_info_get(evdev_id, &dev_info); 743 adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0); 744 745 struct rte_event_port_conf adptr_p_conf = { 746 .dequeue_depth = cdata.worker_cq_depth, 747 .enqueue_depth = 64, 748 .new_event_threshold = 4096, 749 }; 750 751 init_ports(nb_ports); 752 if (adptr_p_conf.new_event_threshold > dev_info.max_num_events) 753 adptr_p_conf.new_event_threshold = dev_info.max_num_events; 754 if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth) 755 adptr_p_conf.dequeue_depth = 756 dev_info.max_event_port_dequeue_depth; 757 if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth) 758 adptr_p_conf.enqueue_depth = 759 dev_info.max_event_port_enqueue_depth; 760 761 struct rte_event_eth_rx_adapter_queue_conf queue_conf; 762 memset(&queue_conf, 0, sizeof(queue_conf)); 763 queue_conf.ev.sched_type = cdata.queue_type; 764 765 for (i = 0; i < nb_ports; i++) { 766 uint32_t cap; 767 uint32_t service_id; 768 769 ret = rte_event_eth_rx_adapter_create(i, evdev_id, 770 &adptr_p_conf); 771 if (ret) 772 rte_exit(EXIT_FAILURE, 773 "failed to create rx adapter[%d]", i); 774 775 ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap); 776 if (ret) 777 rte_exit(EXIT_FAILURE, 778 "failed to get event rx adapter " 779 "capabilities"); 780 781 queue_conf.ev.queue_id = cdata.rx_stride ? 782 (i * cdata.rx_stride) 783 : (uint8_t)cdata.qid[0]; 784 785 ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf); 786 if (ret) 787 rte_exit(EXIT_FAILURE, 788 "Failed to add queues to Rx adapter"); 789 790 /* Producer needs to be scheduled. */ 791 if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { 792 ret = rte_event_eth_rx_adapter_service_id_get(i, 793 &service_id); 794 if (ret != -ESRCH && ret != 0) { 795 rte_exit(EXIT_FAILURE, 796 "Error getting the service ID for rx adptr\n"); 797 } 798 799 rte_service_runstate_set(service_id, 1); 800 rte_service_set_runstate_mapped_check(service_id, 0); 801 802 adptr_services->nb_rx_adptrs++; 803 adptr_services->rx_adpt_arr = rte_realloc( 804 adptr_services->rx_adpt_arr, 805 adptr_services->nb_rx_adptrs * 806 sizeof(uint32_t), 0); 807 adptr_services->rx_adpt_arr[ 808 adptr_services->nb_rx_adptrs - 1] = 809 service_id; 810 } 811 812 ret = rte_event_eth_rx_adapter_start(i); 813 if (ret) 814 rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed", 815 i); 816 } 817 818 /* We already know that Tx adapter has INTERNAL port cap*/ 819 ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id, 820 &adptr_p_conf); 821 if (ret) 822 rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]", 823 cdata.tx_adapter_id); 824 825 for (i = 0; i < nb_ports; i++) { 826 ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i, 827 -1); 828 if (ret) 829 rte_exit(EXIT_FAILURE, 830 "Failed to add queues to Tx adapter"); 831 } 832 833 ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id); 834 if (ret) 835 rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed", 836 cdata.tx_adapter_id); 837 838 if (adptr_services->nb_rx_adptrs) { 839 struct rte_service_spec service; 840 841 memset(&service, 0, sizeof(struct rte_service_spec)); 842 snprintf(service.name, sizeof(service.name), "rx_service"); 843 service.callback = service_rx_adapter; 844 service.callback_userdata = (void *)adptr_services; 845 846 int32_t ret = rte_service_component_register(&service, 847 &fdata->rxadptr_service_id); 848 if (ret) 849 rte_exit(EXIT_FAILURE, 850 "Rx adapter service register failed"); 851 852 rte_service_runstate_set(fdata->rxadptr_service_id, 1); 853 rte_service_component_runstate_set(fdata->rxadptr_service_id, 854 1); 855 rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 856 0); 857 } else { 858 memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 859 rte_free(adptr_services); 860 } 861 862 if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap & 863 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)) 864 fdata->cap.scheduler = NULL; 865 } 866 867 static void 868 worker_tx_enq_opt_check(void) 869 { 870 int i; 871 int ret; 872 uint32_t cap = 0; 873 uint8_t rx_needed = 0; 874 uint8_t sched_needed = 0; 875 struct rte_event_dev_info eventdev_info; 876 877 memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); 878 rte_event_dev_info_get(0, &eventdev_info); 879 880 if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & 881 RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) 882 rte_exit(EXIT_FAILURE, 883 "Event dev doesn't support all type queues\n"); 884 sched_needed = !(eventdev_info.event_dev_cap & 885 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED); 886 887 RTE_ETH_FOREACH_DEV(i) { 888 ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); 889 if (ret) 890 rte_exit(EXIT_FAILURE, 891 "failed to get event rx adapter capabilities"); 892 rx_needed |= 893 !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT); 894 } 895 896 if (cdata.worker_lcore_mask == 0 || 897 (rx_needed && cdata.rx_lcore_mask == 0) || 898 (sched_needed && cdata.sched_lcore_mask == 0)) { 899 printf("Core part of pipeline was not assigned any cores. " 900 "This will stall the pipeline, please check core masks " 901 "(use -h for details on setting core masks):\n" 902 "\trx: %"PRIu64"\n\tsched: %"PRIu64 903 "\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask, 904 cdata.sched_lcore_mask, cdata.worker_lcore_mask); 905 rte_exit(-1, "Fix core masks\n"); 906 } 907 908 if (!sched_needed) 909 memset(fdata->sched_core, 0, 910 sizeof(unsigned int) * MAX_NUM_CORE); 911 if (!rx_needed) 912 memset(fdata->rx_core, 0, 913 sizeof(unsigned int) * MAX_NUM_CORE); 914 915 memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 916 } 917 918 static worker_loop 919 get_worker_loop_single_burst(uint8_t atq) 920 { 921 if (atq) 922 return worker_do_tx_single_burst_atq; 923 924 return worker_do_tx_single_burst; 925 } 926 927 static worker_loop 928 get_worker_loop_single_non_burst(uint8_t atq) 929 { 930 if (atq) 931 return worker_do_tx_single_atq; 932 933 return worker_do_tx_single; 934 } 935 936 static worker_loop 937 get_worker_loop_burst(uint8_t atq) 938 { 939 if (atq) 940 return worker_do_tx_burst_atq; 941 942 return worker_do_tx_burst; 943 } 944 945 static worker_loop 946 get_worker_loop_non_burst(uint8_t atq) 947 { 948 if (atq) 949 return worker_do_tx_atq; 950 951 return worker_do_tx; 952 } 953 954 static worker_loop 955 get_worker_single_stage(bool burst) 956 { 957 uint8_t atq = cdata.all_type_queues ? 1 : 0; 958 959 if (burst) 960 return get_worker_loop_single_burst(atq); 961 962 return get_worker_loop_single_non_burst(atq); 963 } 964 965 static worker_loop 966 get_worker_multi_stage(bool burst) 967 { 968 uint8_t atq = cdata.all_type_queues ? 1 : 0; 969 970 if (burst) 971 return get_worker_loop_burst(atq); 972 973 return get_worker_loop_non_burst(atq); 974 } 975 976 void 977 set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst) 978 { 979 if (cdata.num_stages == 1) 980 caps->worker = get_worker_single_stage(burst); 981 else 982 caps->worker = get_worker_multi_stage(burst); 983 984 caps->check_opt = worker_tx_enq_opt_check; 985 caps->scheduler = schedule_devices; 986 caps->evdev_setup = setup_eventdev_worker_tx_enq; 987 caps->adptr_setup = init_adapters; 988 } 989