1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright(c) 2010-2014 Intel Corporation 4 * Copyright 2017 Cavium, Inc. 5 */ 6 7 #include "pipeline_common.h" 8 9 static __rte_always_inline void 10 worker_fwd_event(struct rte_event *ev, uint8_t sched) 11 { 12 ev->event_type = RTE_EVENT_TYPE_CPU; 13 ev->op = RTE_EVENT_OP_FORWARD; 14 ev->sched_type = sched; 15 } 16 17 static __rte_always_inline void 18 worker_event_enqueue(const uint8_t dev, const uint8_t port, 19 struct rte_event *ev) 20 { 21 while (rte_event_enqueue_burst(dev, port, ev, 1) != 1) 22 rte_pause(); 23 } 24 25 static __rte_always_inline void 26 worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, 27 struct rte_event *ev, const uint16_t nb_rx) 28 { 29 uint16_t enq; 30 31 enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); 32 while (enq < nb_rx) { 33 enq += rte_event_enqueue_burst(dev, port, 34 ev + enq, nb_rx - enq); 35 } 36 } 37 38 static __rte_always_inline void 39 worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev) 40 { 41 exchange_mac(ev->mbuf); 42 rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0); 43 while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0)) 44 rte_pause(); 45 } 46 47 /* Single stage pipeline workers */ 48 49 static int 50 worker_do_tx_single(void *arg) 51 { 52 struct worker_data *data = (struct worker_data *)arg; 53 const uint8_t dev = data->dev_id; 54 const uint8_t port = data->port_id; 55 size_t fwd = 0, received = 0, tx = 0; 56 struct rte_event ev; 57 58 while (!fdata->done) { 59 60 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 61 rte_pause(); 62 continue; 63 } 64 65 received++; 66 67 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 68 worker_tx_pkt(dev, port, &ev); 69 tx++; 70 } else { 71 work(); 72 ev.queue_id++; 73 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 74 worker_event_enqueue(dev, port, &ev); 75 fwd++; 76 } 77 } 78 79 if (!cdata.quiet) 80 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 81 rte_lcore_id(), received, fwd, tx); 82 return 0; 83 } 84 85 static int 86 worker_do_tx_single_atq(void *arg) 87 { 88 struct worker_data *data = (struct worker_data *)arg; 89 const uint8_t dev = data->dev_id; 90 const uint8_t port = data->port_id; 91 size_t fwd = 0, received = 0, tx = 0; 92 struct rte_event ev; 93 94 while (!fdata->done) { 95 96 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 97 rte_pause(); 98 continue; 99 } 100 101 received++; 102 103 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 104 worker_tx_pkt(dev, port, &ev); 105 tx++; 106 } else { 107 work(); 108 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 109 worker_event_enqueue(dev, port, &ev); 110 fwd++; 111 } 112 } 113 114 if (!cdata.quiet) 115 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 116 rte_lcore_id(), received, fwd, tx); 117 return 0; 118 } 119 120 static int 121 worker_do_tx_single_burst(void *arg) 122 { 123 struct rte_event ev[BATCH_SIZE + 1]; 124 125 struct worker_data *data = (struct worker_data *)arg; 126 const uint8_t dev = data->dev_id; 127 const uint8_t port = data->port_id; 128 size_t fwd = 0, received = 0, tx = 0; 129 130 while (!fdata->done) { 131 uint16_t i; 132 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 133 BATCH_SIZE, 0); 134 135 if (!nb_rx) { 136 rte_pause(); 137 continue; 138 } 139 received += nb_rx; 140 141 for (i = 0; i < nb_rx; i++) { 142 rte_prefetch0(ev[i + 1].mbuf); 143 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 144 145 worker_tx_pkt(dev, port, &ev[i]); 146 ev[i].op = RTE_EVENT_OP_RELEASE; 147 tx++; 148 149 } else { 150 ev[i].queue_id++; 151 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 152 } 153 work(); 154 } 155 156 worker_event_enqueue_burst(dev, port, ev, nb_rx); 157 fwd += nb_rx; 158 } 159 160 if (!cdata.quiet) 161 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 162 rte_lcore_id(), received, fwd, tx); 163 return 0; 164 } 165 166 static int 167 worker_do_tx_single_burst_atq(void *arg) 168 { 169 struct rte_event ev[BATCH_SIZE + 1]; 170 171 struct worker_data *data = (struct worker_data *)arg; 172 const uint8_t dev = data->dev_id; 173 const uint8_t port = data->port_id; 174 size_t fwd = 0, received = 0, tx = 0; 175 176 while (!fdata->done) { 177 uint16_t i; 178 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 179 BATCH_SIZE, 0); 180 181 if (!nb_rx) { 182 rte_pause(); 183 continue; 184 } 185 186 received += nb_rx; 187 188 for (i = 0; i < nb_rx; i++) { 189 rte_prefetch0(ev[i + 1].mbuf); 190 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 191 192 worker_tx_pkt(dev, port, &ev[i]); 193 ev[i].op = RTE_EVENT_OP_RELEASE; 194 tx++; 195 } else 196 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 197 work(); 198 } 199 200 worker_event_enqueue_burst(dev, port, ev, nb_rx); 201 fwd += nb_rx; 202 } 203 204 if (!cdata.quiet) 205 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 206 rte_lcore_id(), received, fwd, tx); 207 return 0; 208 } 209 210 /* Multi stage Pipeline Workers */ 211 212 static int 213 worker_do_tx(void *arg) 214 { 215 struct rte_event ev; 216 217 struct worker_data *data = (struct worker_data *)arg; 218 const uint8_t dev = data->dev_id; 219 const uint8_t port = data->port_id; 220 const uint8_t lst_qid = cdata.num_stages - 1; 221 size_t fwd = 0, received = 0, tx = 0; 222 223 224 while (!fdata->done) { 225 226 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 227 rte_pause(); 228 continue; 229 } 230 231 received++; 232 const uint8_t cq_id = ev.queue_id % cdata.num_stages; 233 234 if (cq_id >= lst_qid) { 235 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 236 worker_tx_pkt(dev, port, &ev); 237 tx++; 238 continue; 239 } 240 241 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 242 ev.queue_id = (cq_id == lst_qid) ? 243 cdata.next_qid[ev.queue_id] : ev.queue_id; 244 } else { 245 ev.queue_id = cdata.next_qid[ev.queue_id]; 246 worker_fwd_event(&ev, cdata.queue_type); 247 } 248 work(); 249 250 worker_event_enqueue(dev, port, &ev); 251 fwd++; 252 } 253 254 if (!cdata.quiet) 255 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 256 rte_lcore_id(), received, fwd, tx); 257 258 return 0; 259 } 260 261 static int 262 worker_do_tx_atq(void *arg) 263 { 264 struct rte_event ev; 265 266 struct worker_data *data = (struct worker_data *)arg; 267 const uint8_t dev = data->dev_id; 268 const uint8_t port = data->port_id; 269 const uint8_t lst_qid = cdata.num_stages - 1; 270 size_t fwd = 0, received = 0, tx = 0; 271 272 while (!fdata->done) { 273 274 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 275 rte_pause(); 276 continue; 277 } 278 279 received++; 280 const uint8_t cq_id = ev.sub_event_type % cdata.num_stages; 281 282 if (cq_id == lst_qid) { 283 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 284 worker_tx_pkt(dev, port, &ev); 285 tx++; 286 continue; 287 } 288 289 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 290 } else { 291 ev.sub_event_type++; 292 worker_fwd_event(&ev, cdata.queue_type); 293 } 294 work(); 295 296 worker_event_enqueue(dev, port, &ev); 297 fwd++; 298 } 299 300 if (!cdata.quiet) 301 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 302 rte_lcore_id(), received, fwd, tx); 303 304 return 0; 305 } 306 307 static int 308 worker_do_tx_burst(void *arg) 309 { 310 struct rte_event ev[BATCH_SIZE]; 311 312 struct worker_data *data = (struct worker_data *)arg; 313 uint8_t dev = data->dev_id; 314 uint8_t port = data->port_id; 315 uint8_t lst_qid = cdata.num_stages - 1; 316 size_t fwd = 0, received = 0, tx = 0; 317 318 while (!fdata->done) { 319 uint16_t i; 320 const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, 321 ev, BATCH_SIZE, 0); 322 323 if (nb_rx == 0) { 324 rte_pause(); 325 continue; 326 } 327 received += nb_rx; 328 329 for (i = 0; i < nb_rx; i++) { 330 const uint8_t cq_id = ev[i].queue_id % cdata.num_stages; 331 332 if (cq_id >= lst_qid) { 333 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 334 worker_tx_pkt(dev, port, &ev[i]); 335 tx++; 336 ev[i].op = RTE_EVENT_OP_RELEASE; 337 continue; 338 } 339 ev[i].queue_id = (cq_id == lst_qid) ? 340 cdata.next_qid[ev[i].queue_id] : 341 ev[i].queue_id; 342 343 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 344 } else { 345 ev[i].queue_id = cdata.next_qid[ev[i].queue_id]; 346 worker_fwd_event(&ev[i], cdata.queue_type); 347 } 348 work(); 349 } 350 worker_event_enqueue_burst(dev, port, ev, nb_rx); 351 352 fwd += nb_rx; 353 } 354 355 if (!cdata.quiet) 356 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 357 rte_lcore_id(), received, fwd, tx); 358 359 return 0; 360 } 361 362 static int 363 worker_do_tx_burst_atq(void *arg) 364 { 365 struct rte_event ev[BATCH_SIZE]; 366 367 struct worker_data *data = (struct worker_data *)arg; 368 uint8_t dev = data->dev_id; 369 uint8_t port = data->port_id; 370 uint8_t lst_qid = cdata.num_stages - 1; 371 size_t fwd = 0, received = 0, tx = 0; 372 373 while (!fdata->done) { 374 uint16_t i; 375 376 const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, 377 ev, BATCH_SIZE, 0); 378 379 if (nb_rx == 0) { 380 rte_pause(); 381 continue; 382 } 383 received += nb_rx; 384 385 for (i = 0; i < nb_rx; i++) { 386 const uint8_t cq_id = ev[i].sub_event_type % 387 cdata.num_stages; 388 389 if (cq_id == lst_qid) { 390 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 391 worker_tx_pkt(dev, port, &ev[i]); 392 tx++; 393 ev[i].op = RTE_EVENT_OP_RELEASE; 394 continue; 395 } 396 397 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 398 } else { 399 ev[i].sub_event_type++; 400 worker_fwd_event(&ev[i], cdata.queue_type); 401 } 402 work(); 403 } 404 405 worker_event_enqueue_burst(dev, port, ev, nb_rx); 406 fwd += nb_rx; 407 } 408 409 if (!cdata.quiet) 410 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 411 rte_lcore_id(), received, fwd, tx); 412 413 return 0; 414 } 415 416 static int 417 setup_eventdev_worker_tx_enq(struct worker_data *worker_data) 418 { 419 uint8_t i; 420 const uint8_t atq = cdata.all_type_queues ? 1 : 0; 421 const uint8_t dev_id = 0; 422 const uint8_t nb_ports = cdata.num_workers; 423 uint8_t nb_slots = 0; 424 uint8_t nb_queues = rte_eth_dev_count_avail(); 425 426 /* 427 * In case where all type queues are not enabled, use queues equal to 428 * number of stages * eth_dev_count and one extra queue per pipeline 429 * for Tx. 430 */ 431 if (!atq) { 432 nb_queues *= cdata.num_stages; 433 nb_queues += rte_eth_dev_count_avail(); 434 } 435 436 struct rte_event_dev_config config = { 437 .nb_event_queues = nb_queues, 438 .nb_event_ports = nb_ports, 439 .nb_single_link_event_port_queues = 0, 440 .nb_events_limit = 4096, 441 .nb_event_queue_flows = 1024, 442 .nb_event_port_dequeue_depth = 128, 443 .nb_event_port_enqueue_depth = 128, 444 }; 445 struct rte_event_port_conf wkr_p_conf = { 446 .dequeue_depth = cdata.worker_cq_depth, 447 .enqueue_depth = 64, 448 .new_event_threshold = 4096, 449 }; 450 struct rte_event_queue_conf wkr_q_conf = { 451 .schedule_type = cdata.queue_type, 452 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 453 .nb_atomic_flows = 1024, 454 .nb_atomic_order_sequences = 1024, 455 }; 456 457 int ret, ndev = rte_event_dev_count(); 458 459 if (ndev < 1) { 460 printf("%d: No Eventdev Devices Found\n", __LINE__); 461 return -1; 462 } 463 464 465 struct rte_event_dev_info dev_info; 466 ret = rte_event_dev_info_get(dev_id, &dev_info); 467 printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); 468 469 if (dev_info.max_num_events < config.nb_events_limit) 470 config.nb_events_limit = dev_info.max_num_events; 471 if (dev_info.max_event_port_dequeue_depth < 472 config.nb_event_port_dequeue_depth) 473 config.nb_event_port_dequeue_depth = 474 dev_info.max_event_port_dequeue_depth; 475 if (dev_info.max_event_port_enqueue_depth < 476 config.nb_event_port_enqueue_depth) 477 config.nb_event_port_enqueue_depth = 478 dev_info.max_event_port_enqueue_depth; 479 480 ret = rte_event_dev_configure(dev_id, &config); 481 if (ret < 0) { 482 printf("%d: Error configuring device\n", __LINE__); 483 return -1; 484 } 485 486 printf(" Stages:\n"); 487 for (i = 0; i < nb_queues; i++) { 488 489 if (atq) { 490 491 nb_slots = cdata.num_stages; 492 wkr_q_conf.event_queue_cfg = 493 RTE_EVENT_QUEUE_CFG_ALL_TYPES; 494 } else { 495 uint8_t slot; 496 497 nb_slots = cdata.num_stages + 1; 498 slot = i % nb_slots; 499 wkr_q_conf.schedule_type = slot == cdata.num_stages ? 500 RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; 501 } 502 503 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) { 504 printf("%d: error creating qid %d\n", __LINE__, i); 505 return -1; 506 } 507 cdata.qid[i] = i; 508 cdata.next_qid[i] = i+1; 509 if (cdata.enable_queue_priorities) { 510 const uint32_t prio_delta = 511 (RTE_EVENT_DEV_PRIORITY_LOWEST) / 512 nb_slots; 513 514 /* higher priority for queues closer to tx */ 515 wkr_q_conf.priority = 516 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * 517 (i % nb_slots); 518 } 519 520 const char *type_str = "Atomic"; 521 switch (wkr_q_conf.schedule_type) { 522 case RTE_SCHED_TYPE_ORDERED: 523 type_str = "Ordered"; 524 break; 525 case RTE_SCHED_TYPE_PARALLEL: 526 type_str = "Parallel"; 527 break; 528 } 529 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str, 530 wkr_q_conf.priority); 531 } 532 533 printf("\n"); 534 if (wkr_p_conf.new_event_threshold > config.nb_events_limit) 535 wkr_p_conf.new_event_threshold = config.nb_events_limit; 536 if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) 537 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; 538 if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) 539 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; 540 541 /* set up one port per worker, linking to all stage queues */ 542 for (i = 0; i < cdata.num_workers; i++) { 543 struct worker_data *w = &worker_data[i]; 544 w->dev_id = dev_id; 545 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) { 546 printf("Error setting up port %d\n", i); 547 return -1; 548 } 549 550 if (rte_event_port_link(dev_id, i, NULL, NULL, 0) 551 != nb_queues) { 552 printf("%d: error creating link for port %d\n", 553 __LINE__, i); 554 return -1; 555 } 556 w->port_id = i; 557 } 558 /* 559 * Reduce the load on ingress event queue by splitting the traffic 560 * across multiple event queues. 561 * for example, nb_stages = 2 and nb_ethdev = 2 then 562 * 563 * nb_queues = (2 * 2) + 2 = 6 (non atq) 564 * rx_stride = 3 565 * 566 * So, traffic is split across queue 0 and queue 3 since queue id for 567 * rx adapter is chosen <ethport_id> * <rx_stride> i.e in the above 568 * case eth port 0, 1 will inject packets into event queue 0, 3 569 * respectively. 570 * 571 * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx. 572 */ 573 cdata.rx_stride = atq ? 1 : nb_slots; 574 ret = rte_event_dev_service_id_get(dev_id, 575 &fdata->evdev_service_id); 576 if (ret != -ESRCH && ret != 0) { 577 printf("Error getting the service ID\n"); 578 return -1; 579 } 580 rte_service_runstate_set(fdata->evdev_service_id, 1); 581 rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0); 582 583 if (rte_event_dev_start(dev_id) < 0) 584 rte_exit(EXIT_FAILURE, "Error starting eventdev"); 585 586 return dev_id; 587 } 588 589 590 struct rx_adptr_services { 591 uint16_t nb_rx_adptrs; 592 uint32_t *rx_adpt_arr; 593 }; 594 595 static int32_t 596 service_rx_adapter(void *arg) 597 { 598 int i; 599 struct rx_adptr_services *adptr_services = arg; 600 601 for (i = 0; i < adptr_services->nb_rx_adptrs; i++) 602 rte_service_run_iter_on_app_lcore( 603 adptr_services->rx_adpt_arr[i], 1); 604 return 0; 605 } 606 607 /* 608 * Initializes a given port using global settings and with the RX buffers 609 * coming from the mbuf_pool passed as a parameter. 610 */ 611 static inline int 612 port_init(uint8_t port, struct rte_mempool *mbuf_pool) 613 { 614 struct rte_eth_rxconf rx_conf; 615 static const struct rte_eth_conf port_conf_default = { 616 .rxmode = { 617 .mq_mode = ETH_MQ_RX_RSS, 618 .max_rx_pkt_len = RTE_ETHER_MAX_LEN, 619 }, 620 .rx_adv_conf = { 621 .rss_conf = { 622 .rss_hf = ETH_RSS_IP | 623 ETH_RSS_TCP | 624 ETH_RSS_UDP, 625 } 626 } 627 }; 628 const uint16_t rx_rings = 1, tx_rings = 1; 629 const uint16_t rx_ring_size = 512, tx_ring_size = 512; 630 struct rte_eth_conf port_conf = port_conf_default; 631 int retval; 632 uint16_t q; 633 struct rte_eth_dev_info dev_info; 634 struct rte_eth_txconf txconf; 635 636 if (!rte_eth_dev_is_valid_port(port)) 637 return -1; 638 639 retval = rte_eth_dev_info_get(port, &dev_info); 640 if (retval != 0) { 641 printf("Error during getting device (port %u) info: %s\n", 642 port, strerror(-retval)); 643 return retval; 644 } 645 646 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 647 port_conf.txmode.offloads |= 648 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 649 rx_conf = dev_info.default_rxconf; 650 rx_conf.offloads = port_conf.rxmode.offloads; 651 652 port_conf.rx_adv_conf.rss_conf.rss_hf &= 653 dev_info.flow_type_rss_offloads; 654 if (port_conf.rx_adv_conf.rss_conf.rss_hf != 655 port_conf_default.rx_adv_conf.rss_conf.rss_hf) { 656 printf("Port %u modified RSS hash function based on hardware support," 657 "requested:%#"PRIx64" configured:%#"PRIx64"\n", 658 port, 659 port_conf_default.rx_adv_conf.rss_conf.rss_hf, 660 port_conf.rx_adv_conf.rss_conf.rss_hf); 661 } 662 663 /* Configure the Ethernet device. */ 664 retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf); 665 if (retval != 0) 666 return retval; 667 668 /* Allocate and set up 1 RX queue per Ethernet port. */ 669 for (q = 0; q < rx_rings; q++) { 670 retval = rte_eth_rx_queue_setup(port, q, rx_ring_size, 671 rte_eth_dev_socket_id(port), &rx_conf, 672 mbuf_pool); 673 if (retval < 0) 674 return retval; 675 } 676 677 txconf = dev_info.default_txconf; 678 txconf.offloads = port_conf_default.txmode.offloads; 679 /* Allocate and set up 1 TX queue per Ethernet port. */ 680 for (q = 0; q < tx_rings; q++) { 681 retval = rte_eth_tx_queue_setup(port, q, tx_ring_size, 682 rte_eth_dev_socket_id(port), &txconf); 683 if (retval < 0) 684 return retval; 685 } 686 687 /* Display the port MAC address. */ 688 struct rte_ether_addr addr; 689 retval = rte_eth_macaddr_get(port, &addr); 690 if (retval != 0) { 691 printf("Failed to get MAC address (port %u): %s\n", 692 port, rte_strerror(-retval)); 693 return retval; 694 } 695 696 printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8 697 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n", 698 (unsigned int)port, 699 addr.addr_bytes[0], addr.addr_bytes[1], 700 addr.addr_bytes[2], addr.addr_bytes[3], 701 addr.addr_bytes[4], addr.addr_bytes[5]); 702 703 /* Enable RX in promiscuous mode for the Ethernet device. */ 704 retval = rte_eth_promiscuous_enable(port); 705 if (retval != 0) 706 return retval; 707 708 return 0; 709 } 710 711 static int 712 init_ports(uint16_t num_ports) 713 { 714 uint16_t portid; 715 716 if (!cdata.num_mbuf) 717 cdata.num_mbuf = 16384 * num_ports; 718 719 struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool", 720 /* mbufs */ cdata.num_mbuf, 721 /* cache_size */ 512, 722 /* priv_size*/ 0, 723 /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE, 724 rte_socket_id()); 725 726 RTE_ETH_FOREACH_DEV(portid) 727 if (port_init(portid, mp) != 0) 728 rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n", 729 portid); 730 731 return 0; 732 } 733 734 static void 735 init_adapters(uint16_t nb_ports) 736 { 737 int i; 738 int ret; 739 uint8_t evdev_id = 0; 740 struct rx_adptr_services *adptr_services = NULL; 741 struct rte_event_dev_info dev_info; 742 743 ret = rte_event_dev_info_get(evdev_id, &dev_info); 744 adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0); 745 746 struct rte_event_port_conf adptr_p_conf = { 747 .dequeue_depth = cdata.worker_cq_depth, 748 .enqueue_depth = 64, 749 .new_event_threshold = 4096, 750 }; 751 752 init_ports(nb_ports); 753 if (adptr_p_conf.new_event_threshold > dev_info.max_num_events) 754 adptr_p_conf.new_event_threshold = dev_info.max_num_events; 755 if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth) 756 adptr_p_conf.dequeue_depth = 757 dev_info.max_event_port_dequeue_depth; 758 if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth) 759 adptr_p_conf.enqueue_depth = 760 dev_info.max_event_port_enqueue_depth; 761 762 struct rte_event_eth_rx_adapter_queue_conf queue_conf; 763 memset(&queue_conf, 0, sizeof(queue_conf)); 764 queue_conf.ev.sched_type = cdata.queue_type; 765 766 for (i = 0; i < nb_ports; i++) { 767 uint32_t cap; 768 uint32_t service_id; 769 770 ret = rte_event_eth_rx_adapter_create(i, evdev_id, 771 &adptr_p_conf); 772 if (ret) 773 rte_exit(EXIT_FAILURE, 774 "failed to create rx adapter[%d]", i); 775 776 ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap); 777 if (ret) 778 rte_exit(EXIT_FAILURE, 779 "failed to get event rx adapter " 780 "capabilities"); 781 782 queue_conf.ev.queue_id = cdata.rx_stride ? 783 (i * cdata.rx_stride) 784 : (uint8_t)cdata.qid[0]; 785 786 ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf); 787 if (ret) 788 rte_exit(EXIT_FAILURE, 789 "Failed to add queues to Rx adapter"); 790 791 /* Producer needs to be scheduled. */ 792 if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { 793 ret = rte_event_eth_rx_adapter_service_id_get(i, 794 &service_id); 795 if (ret != -ESRCH && ret != 0) { 796 rte_exit(EXIT_FAILURE, 797 "Error getting the service ID for rx adptr\n"); 798 } 799 800 rte_service_runstate_set(service_id, 1); 801 rte_service_set_runstate_mapped_check(service_id, 0); 802 803 adptr_services->nb_rx_adptrs++; 804 adptr_services->rx_adpt_arr = rte_realloc( 805 adptr_services->rx_adpt_arr, 806 adptr_services->nb_rx_adptrs * 807 sizeof(uint32_t), 0); 808 adptr_services->rx_adpt_arr[ 809 adptr_services->nb_rx_adptrs - 1] = 810 service_id; 811 } 812 813 ret = rte_event_eth_rx_adapter_start(i); 814 if (ret) 815 rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed", 816 i); 817 } 818 819 /* We already know that Tx adapter has INTERNAL port cap*/ 820 ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id, 821 &adptr_p_conf); 822 if (ret) 823 rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]", 824 cdata.tx_adapter_id); 825 826 for (i = 0; i < nb_ports; i++) { 827 ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i, 828 -1); 829 if (ret) 830 rte_exit(EXIT_FAILURE, 831 "Failed to add queues to Tx adapter"); 832 } 833 834 ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id); 835 if (ret) 836 rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed", 837 cdata.tx_adapter_id); 838 839 if (adptr_services->nb_rx_adptrs) { 840 struct rte_service_spec service; 841 842 memset(&service, 0, sizeof(struct rte_service_spec)); 843 snprintf(service.name, sizeof(service.name), "rx_service"); 844 service.callback = service_rx_adapter; 845 service.callback_userdata = (void *)adptr_services; 846 847 int32_t ret = rte_service_component_register(&service, 848 &fdata->rxadptr_service_id); 849 if (ret) 850 rte_exit(EXIT_FAILURE, 851 "Rx adapter service register failed"); 852 853 rte_service_runstate_set(fdata->rxadptr_service_id, 1); 854 rte_service_component_runstate_set(fdata->rxadptr_service_id, 855 1); 856 rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 857 0); 858 } else { 859 memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 860 rte_free(adptr_services); 861 } 862 863 if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap & 864 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)) 865 fdata->cap.scheduler = NULL; 866 } 867 868 static void 869 worker_tx_enq_opt_check(void) 870 { 871 int i; 872 int ret; 873 uint32_t cap = 0; 874 uint8_t rx_needed = 0; 875 uint8_t sched_needed = 0; 876 struct rte_event_dev_info eventdev_info; 877 878 memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); 879 rte_event_dev_info_get(0, &eventdev_info); 880 881 if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & 882 RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) 883 rte_exit(EXIT_FAILURE, 884 "Event dev doesn't support all type queues\n"); 885 sched_needed = !(eventdev_info.event_dev_cap & 886 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED); 887 888 RTE_ETH_FOREACH_DEV(i) { 889 ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); 890 if (ret) 891 rte_exit(EXIT_FAILURE, 892 "failed to get event rx adapter capabilities"); 893 rx_needed |= 894 !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT); 895 } 896 897 if (cdata.worker_lcore_mask == 0 || 898 (rx_needed && cdata.rx_lcore_mask == 0) || 899 (sched_needed && cdata.sched_lcore_mask == 0)) { 900 printf("Core part of pipeline was not assigned any cores. " 901 "This will stall the pipeline, please check core masks " 902 "(use -h for details on setting core masks):\n" 903 "\trx: %"PRIu64"\n\tsched: %"PRIu64 904 "\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask, 905 cdata.sched_lcore_mask, cdata.worker_lcore_mask); 906 rte_exit(-1, "Fix core masks\n"); 907 } 908 909 if (!sched_needed) 910 memset(fdata->sched_core, 0, 911 sizeof(unsigned int) * MAX_NUM_CORE); 912 if (!rx_needed) 913 memset(fdata->rx_core, 0, 914 sizeof(unsigned int) * MAX_NUM_CORE); 915 916 memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 917 } 918 919 static worker_loop 920 get_worker_loop_single_burst(uint8_t atq) 921 { 922 if (atq) 923 return worker_do_tx_single_burst_atq; 924 925 return worker_do_tx_single_burst; 926 } 927 928 static worker_loop 929 get_worker_loop_single_non_burst(uint8_t atq) 930 { 931 if (atq) 932 return worker_do_tx_single_atq; 933 934 return worker_do_tx_single; 935 } 936 937 static worker_loop 938 get_worker_loop_burst(uint8_t atq) 939 { 940 if (atq) 941 return worker_do_tx_burst_atq; 942 943 return worker_do_tx_burst; 944 } 945 946 static worker_loop 947 get_worker_loop_non_burst(uint8_t atq) 948 { 949 if (atq) 950 return worker_do_tx_atq; 951 952 return worker_do_tx; 953 } 954 955 static worker_loop 956 get_worker_single_stage(bool burst) 957 { 958 uint8_t atq = cdata.all_type_queues ? 1 : 0; 959 960 if (burst) 961 return get_worker_loop_single_burst(atq); 962 963 return get_worker_loop_single_non_burst(atq); 964 } 965 966 static worker_loop 967 get_worker_multi_stage(bool burst) 968 { 969 uint8_t atq = cdata.all_type_queues ? 1 : 0; 970 971 if (burst) 972 return get_worker_loop_burst(atq); 973 974 return get_worker_loop_non_burst(atq); 975 } 976 977 void 978 set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst) 979 { 980 if (cdata.num_stages == 1) 981 caps->worker = get_worker_single_stage(burst); 982 else 983 caps->worker = get_worker_multi_stage(burst); 984 985 caps->check_opt = worker_tx_enq_opt_check; 986 caps->scheduler = schedule_devices; 987 caps->evdev_setup = setup_eventdev_worker_tx_enq; 988 caps->adptr_setup = init_adapters; 989 } 990