1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright(c) 2010-2014 Intel Corporation 4 * Copyright 2017 Cavium, Inc. 5 */ 6 7 #include <stdlib.h> 8 9 #include "pipeline_common.h" 10 11 static __rte_always_inline void 12 worker_fwd_event(struct rte_event *ev, uint8_t sched) 13 { 14 ev->event_type = RTE_EVENT_TYPE_CPU; 15 ev->op = RTE_EVENT_OP_FORWARD; 16 ev->sched_type = sched; 17 } 18 19 static __rte_always_inline void 20 worker_event_enqueue(const uint8_t dev, const uint8_t port, 21 struct rte_event *ev) 22 { 23 while (!rte_event_enqueue_burst(dev, port, ev, 1) && !fdata->done) 24 rte_pause(); 25 } 26 27 static __rte_always_inline uint16_t 28 worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, 29 struct rte_event *ev, const uint16_t nb_rx) 30 { 31 uint16_t enq; 32 33 enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); 34 while (enq < nb_rx && !fdata->done) 35 enq += rte_event_enqueue_burst(dev, port, 36 ev + enq, nb_rx - enq); 37 38 return enq; 39 } 40 41 static __rte_always_inline void 42 worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev) 43 { 44 exchange_mac(ev->mbuf); 45 rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0); 46 while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0) && 47 !fdata->done) 48 rte_pause(); 49 } 50 51 /* Single stage pipeline workers */ 52 53 static int 54 worker_do_tx_single(void *arg) 55 { 56 struct worker_data *data = (struct worker_data *)arg; 57 const uint8_t dev = data->dev_id; 58 const uint8_t port = data->port_id; 59 size_t fwd = 0, received = 0, tx = 0; 60 struct rte_event ev; 61 62 while (!fdata->done) { 63 64 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 65 rte_pause(); 66 continue; 67 } 68 69 received++; 70 71 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 72 worker_tx_pkt(dev, port, &ev); 73 tx++; 74 } else { 75 work(); 76 ev.queue_id++; 77 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 78 worker_event_enqueue(dev, port, &ev); 79 fwd++; 80 } 81 } 82 83 if (ev.u64) { 84 ev.op = RTE_EVENT_OP_RELEASE; 85 rte_event_enqueue_burst(dev, port, &ev, 1); 86 } 87 88 if (!cdata.quiet) 89 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 90 rte_lcore_id(), received, fwd, tx); 91 return 0; 92 } 93 94 static int 95 worker_do_tx_single_atq(void *arg) 96 { 97 struct worker_data *data = (struct worker_data *)arg; 98 const uint8_t dev = data->dev_id; 99 const uint8_t port = data->port_id; 100 size_t fwd = 0, received = 0, tx = 0; 101 struct rte_event ev; 102 103 while (!fdata->done) { 104 105 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 106 rte_pause(); 107 continue; 108 } 109 110 received++; 111 112 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 113 worker_tx_pkt(dev, port, &ev); 114 tx++; 115 } else { 116 work(); 117 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 118 worker_event_enqueue(dev, port, &ev); 119 fwd++; 120 } 121 } 122 123 if (ev.u64) { 124 ev.op = RTE_EVENT_OP_RELEASE; 125 rte_event_enqueue_burst(dev, port, &ev, 1); 126 } 127 128 if (!cdata.quiet) 129 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 130 rte_lcore_id(), received, fwd, tx); 131 return 0; 132 } 133 134 static int 135 worker_do_tx_single_burst(void *arg) 136 { 137 struct rte_event ev[BATCH_SIZE + 1]; 138 139 struct worker_data *data = (struct worker_data *)arg; 140 const uint8_t dev = data->dev_id; 141 const uint8_t port = data->port_id; 142 size_t fwd = 0, received = 0, tx = 0; 143 uint16_t nb_tx = 0, nb_rx = 0, i; 144 145 while (!fdata->done) { 146 nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); 147 148 if (!nb_rx) { 149 rte_pause(); 150 continue; 151 } 152 received += nb_rx; 153 154 for (i = 0; i < nb_rx; i++) { 155 rte_prefetch0(ev[i + 1].mbuf); 156 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 157 158 worker_tx_pkt(dev, port, &ev[i]); 159 ev[i].op = RTE_EVENT_OP_RELEASE; 160 tx++; 161 162 } else { 163 ev[i].queue_id++; 164 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 165 } 166 work(); 167 } 168 169 nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); 170 fwd += nb_tx; 171 } 172 173 worker_cleanup(dev, port, ev, nb_tx, nb_rx); 174 175 if (!cdata.quiet) 176 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 177 rte_lcore_id(), received, fwd, tx); 178 return 0; 179 } 180 181 static int 182 worker_do_tx_single_burst_atq(void *arg) 183 { 184 struct rte_event ev[BATCH_SIZE + 1]; 185 186 struct worker_data *data = (struct worker_data *)arg; 187 const uint8_t dev = data->dev_id; 188 const uint8_t port = data->port_id; 189 size_t fwd = 0, received = 0, tx = 0; 190 uint16_t i, nb_rx = 0, nb_tx = 0; 191 192 while (!fdata->done) { 193 nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); 194 195 if (!nb_rx) { 196 rte_pause(); 197 continue; 198 } 199 200 received += nb_rx; 201 202 for (i = 0; i < nb_rx; i++) { 203 rte_prefetch0(ev[i + 1].mbuf); 204 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 205 206 worker_tx_pkt(dev, port, &ev[i]); 207 ev[i].op = RTE_EVENT_OP_RELEASE; 208 tx++; 209 } else 210 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 211 work(); 212 } 213 214 nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); 215 fwd += nb_tx; 216 } 217 218 worker_cleanup(dev, port, ev, nb_tx, nb_rx); 219 220 if (!cdata.quiet) 221 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 222 rte_lcore_id(), received, fwd, tx); 223 return 0; 224 } 225 226 /* Multi stage Pipeline Workers */ 227 228 static int 229 worker_do_tx(void *arg) 230 { 231 struct rte_event ev; 232 233 struct worker_data *data = (struct worker_data *)arg; 234 const uint8_t dev = data->dev_id; 235 const uint8_t port = data->port_id; 236 const uint8_t lst_qid = cdata.num_stages - 1; 237 size_t fwd = 0, received = 0, tx = 0; 238 239 240 while (!fdata->done) { 241 242 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 243 rte_pause(); 244 continue; 245 } 246 247 received++; 248 const uint8_t cq_id = ev.queue_id % cdata.num_stages; 249 250 if (cq_id >= lst_qid) { 251 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 252 worker_tx_pkt(dev, port, &ev); 253 tx++; 254 continue; 255 } 256 257 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 258 ev.queue_id = (cq_id == lst_qid) ? 259 cdata.next_qid[ev.queue_id] : ev.queue_id; 260 } else { 261 ev.queue_id = cdata.next_qid[ev.queue_id]; 262 worker_fwd_event(&ev, cdata.queue_type); 263 } 264 work(); 265 266 worker_event_enqueue(dev, port, &ev); 267 fwd++; 268 } 269 270 if (ev.u64) { 271 ev.op = RTE_EVENT_OP_RELEASE; 272 rte_event_enqueue_burst(dev, port, &ev, 1); 273 } 274 275 if (!cdata.quiet) 276 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 277 rte_lcore_id(), received, fwd, tx); 278 279 return 0; 280 } 281 282 static int 283 worker_do_tx_atq(void *arg) 284 { 285 struct rte_event ev; 286 287 struct worker_data *data = (struct worker_data *)arg; 288 const uint8_t dev = data->dev_id; 289 const uint8_t port = data->port_id; 290 const uint8_t lst_qid = cdata.num_stages - 1; 291 size_t fwd = 0, received = 0, tx = 0; 292 293 while (!fdata->done) { 294 295 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 296 rte_pause(); 297 continue; 298 } 299 300 received++; 301 const uint8_t cq_id = ev.sub_event_type % cdata.num_stages; 302 303 if (cq_id == lst_qid) { 304 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 305 worker_tx_pkt(dev, port, &ev); 306 tx++; 307 continue; 308 } 309 310 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 311 } else { 312 ev.sub_event_type++; 313 worker_fwd_event(&ev, cdata.queue_type); 314 } 315 work(); 316 317 worker_event_enqueue(dev, port, &ev); 318 fwd++; 319 } 320 321 if (ev.u64) { 322 ev.op = RTE_EVENT_OP_RELEASE; 323 rte_event_enqueue_burst(dev, port, &ev, 1); 324 } 325 326 if (!cdata.quiet) 327 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 328 rte_lcore_id(), received, fwd, tx); 329 330 return 0; 331 } 332 333 static int 334 worker_do_tx_burst(void *arg) 335 { 336 struct rte_event ev[BATCH_SIZE]; 337 338 struct worker_data *data = (struct worker_data *)arg; 339 uint8_t dev = data->dev_id; 340 uint8_t port = data->port_id; 341 uint8_t lst_qid = cdata.num_stages - 1; 342 size_t fwd = 0, received = 0, tx = 0; 343 uint16_t i, nb_rx = 0, nb_tx = 0; 344 345 while (!fdata->done) { 346 nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); 347 348 if (nb_rx == 0) { 349 rte_pause(); 350 continue; 351 } 352 received += nb_rx; 353 354 for (i = 0; i < nb_rx; i++) { 355 const uint8_t cq_id = ev[i].queue_id % cdata.num_stages; 356 357 if (cq_id >= lst_qid) { 358 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 359 worker_tx_pkt(dev, port, &ev[i]); 360 tx++; 361 ev[i].op = RTE_EVENT_OP_RELEASE; 362 continue; 363 } 364 ev[i].queue_id = (cq_id == lst_qid) ? 365 cdata.next_qid[ev[i].queue_id] : 366 ev[i].queue_id; 367 368 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 369 } else { 370 ev[i].queue_id = cdata.next_qid[ev[i].queue_id]; 371 worker_fwd_event(&ev[i], cdata.queue_type); 372 } 373 work(); 374 } 375 376 nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); 377 fwd += nb_tx; 378 } 379 380 worker_cleanup(dev, port, ev, nb_tx, nb_rx); 381 382 if (!cdata.quiet) 383 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 384 rte_lcore_id(), received, fwd, tx); 385 386 return 0; 387 } 388 389 static int 390 worker_do_tx_burst_atq(void *arg) 391 { 392 struct rte_event ev[BATCH_SIZE]; 393 394 struct worker_data *data = (struct worker_data *)arg; 395 uint8_t dev = data->dev_id; 396 uint8_t port = data->port_id; 397 uint8_t lst_qid = cdata.num_stages - 1; 398 size_t fwd = 0, received = 0, tx = 0; 399 uint16_t i, nb_rx = 0, nb_tx = 0; 400 401 while (!fdata->done) { 402 nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); 403 404 if (nb_rx == 0) { 405 rte_pause(); 406 continue; 407 } 408 received += nb_rx; 409 410 for (i = 0; i < nb_rx; i++) { 411 const uint8_t cq_id = ev[i].sub_event_type % 412 cdata.num_stages; 413 414 if (cq_id == lst_qid) { 415 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 416 worker_tx_pkt(dev, port, &ev[i]); 417 tx++; 418 ev[i].op = RTE_EVENT_OP_RELEASE; 419 continue; 420 } 421 422 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 423 } else { 424 ev[i].sub_event_type++; 425 worker_fwd_event(&ev[i], cdata.queue_type); 426 } 427 work(); 428 } 429 430 nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); 431 fwd += nb_tx; 432 } 433 434 worker_cleanup(dev, port, ev, nb_tx, nb_rx); 435 436 if (!cdata.quiet) 437 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 438 rte_lcore_id(), received, fwd, tx); 439 440 return 0; 441 } 442 443 static int 444 setup_eventdev_worker_tx_enq(struct worker_data *worker_data) 445 { 446 uint8_t i; 447 const uint8_t atq = cdata.all_type_queues ? 1 : 0; 448 const uint8_t dev_id = 0; 449 const uint8_t nb_ports = cdata.num_workers; 450 uint8_t nb_slots = 0; 451 uint8_t nb_queues = rte_eth_dev_count_avail(); 452 453 /* 454 * In case where all type queues are not enabled, use queues equal to 455 * number of stages * eth_dev_count and one extra queue per pipeline 456 * for Tx. 457 */ 458 if (!atq) { 459 nb_queues *= cdata.num_stages; 460 nb_queues += rte_eth_dev_count_avail(); 461 } 462 463 struct rte_event_dev_config config = { 464 .nb_event_queues = nb_queues, 465 .nb_event_ports = nb_ports, 466 .nb_single_link_event_port_queues = 0, 467 .nb_events_limit = 4096, 468 .nb_event_queue_flows = 1024, 469 .nb_event_port_dequeue_depth = 128, 470 .nb_event_port_enqueue_depth = 128, 471 }; 472 struct rte_event_port_conf wkr_p_conf = { 473 .dequeue_depth = cdata.worker_cq_depth, 474 .enqueue_depth = 64, 475 .new_event_threshold = 4096, 476 .event_port_cfg = RTE_EVENT_PORT_CFG_HINT_WORKER, 477 }; 478 struct rte_event_queue_conf wkr_q_conf = { 479 .schedule_type = cdata.queue_type, 480 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 481 .nb_atomic_flows = 1024, 482 .nb_atomic_order_sequences = 1024, 483 }; 484 485 int ret, ndev = rte_event_dev_count(); 486 487 if (ndev < 1) { 488 printf("%d: No Eventdev Devices Found\n", __LINE__); 489 return -1; 490 } 491 492 493 struct rte_event_dev_info dev_info; 494 ret = rte_event_dev_info_get(dev_id, &dev_info); 495 printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); 496 497 if (dev_info.max_num_events < config.nb_events_limit) 498 config.nb_events_limit = dev_info.max_num_events; 499 if (dev_info.max_event_port_dequeue_depth < 500 config.nb_event_port_dequeue_depth) 501 config.nb_event_port_dequeue_depth = 502 dev_info.max_event_port_dequeue_depth; 503 if (dev_info.max_event_port_enqueue_depth < 504 config.nb_event_port_enqueue_depth) 505 config.nb_event_port_enqueue_depth = 506 dev_info.max_event_port_enqueue_depth; 507 508 ret = rte_event_dev_configure(dev_id, &config); 509 if (ret < 0) { 510 printf("%d: Error configuring device\n", __LINE__); 511 return -1; 512 } 513 514 printf(" Stages:\n"); 515 for (i = 0; i < nb_queues; i++) { 516 517 if (atq) { 518 519 nb_slots = cdata.num_stages; 520 wkr_q_conf.event_queue_cfg = 521 RTE_EVENT_QUEUE_CFG_ALL_TYPES; 522 } else { 523 uint8_t slot; 524 525 nb_slots = cdata.num_stages + 1; 526 slot = i % nb_slots; 527 wkr_q_conf.schedule_type = slot == cdata.num_stages ? 528 RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; 529 } 530 531 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) { 532 printf("%d: error creating qid %d\n", __LINE__, i); 533 return -1; 534 } 535 cdata.qid[i] = i; 536 cdata.next_qid[i] = i+1; 537 if (cdata.enable_queue_priorities) { 538 const uint32_t prio_delta = 539 (RTE_EVENT_DEV_PRIORITY_LOWEST) / 540 nb_slots; 541 542 /* higher priority for queues closer to tx */ 543 wkr_q_conf.priority = 544 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * 545 (i % nb_slots); 546 } 547 548 const char *type_str = "Atomic"; 549 switch (wkr_q_conf.schedule_type) { 550 case RTE_SCHED_TYPE_ORDERED: 551 type_str = "Ordered"; 552 break; 553 case RTE_SCHED_TYPE_PARALLEL: 554 type_str = "Parallel"; 555 break; 556 } 557 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str, 558 wkr_q_conf.priority); 559 } 560 561 printf("\n"); 562 if (wkr_p_conf.new_event_threshold > config.nb_events_limit) 563 wkr_p_conf.new_event_threshold = config.nb_events_limit; 564 if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) 565 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; 566 if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) 567 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; 568 569 /* set up one port per worker, linking to all stage queues */ 570 for (i = 0; i < cdata.num_workers; i++) { 571 struct worker_data *w = &worker_data[i]; 572 w->dev_id = dev_id; 573 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) { 574 printf("Error setting up port %d\n", i); 575 return -1; 576 } 577 578 if (rte_event_port_link(dev_id, i, NULL, NULL, 0) 579 != nb_queues) { 580 printf("%d: error creating link for port %d\n", 581 __LINE__, i); 582 return -1; 583 } 584 w->port_id = i; 585 } 586 /* 587 * Reduce the load on ingress event queue by splitting the traffic 588 * across multiple event queues. 589 * for example, nb_stages = 2 and nb_ethdev = 2 then 590 * 591 * nb_queues = (2 * 2) + 2 = 6 (non atq) 592 * rx_stride = 3 593 * 594 * So, traffic is split across queue 0 and queue 3 since queue id for 595 * rx adapter is chosen <ethport_id> * <rx_stride> i.e in the above 596 * case eth port 0, 1 will inject packets into event queue 0, 3 597 * respectively. 598 * 599 * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx. 600 */ 601 cdata.rx_stride = atq ? 1 : nb_slots; 602 ret = rte_event_dev_service_id_get(dev_id, 603 &fdata->evdev_service_id); 604 if (ret != -ESRCH && ret != 0) { 605 printf("Error getting the service ID\n"); 606 return -1; 607 } 608 rte_service_runstate_set(fdata->evdev_service_id, 1); 609 rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0); 610 611 if (rte_event_dev_start(dev_id) < 0) 612 rte_exit(EXIT_FAILURE, "Error starting eventdev"); 613 614 return dev_id; 615 } 616 617 618 struct rx_adptr_services { 619 uint16_t nb_rx_adptrs; 620 uint32_t *rx_adpt_arr; 621 }; 622 623 static int32_t 624 service_rx_adapter(void *arg) 625 { 626 int i; 627 struct rx_adptr_services *adptr_services = arg; 628 629 for (i = 0; i < adptr_services->nb_rx_adptrs; i++) 630 rte_service_run_iter_on_app_lcore( 631 adptr_services->rx_adpt_arr[i], 1); 632 return 0; 633 } 634 635 /* 636 * Initializes a given port using global settings and with the RX buffers 637 * coming from the mbuf_pool passed as a parameter. 638 */ 639 static inline int 640 port_init(uint8_t port, struct rte_mempool *mbuf_pool) 641 { 642 struct rte_eth_rxconf rx_conf; 643 static const struct rte_eth_conf port_conf_default = { 644 .rxmode = { 645 .mq_mode = RTE_ETH_MQ_RX_RSS, 646 }, 647 .rx_adv_conf = { 648 .rss_conf = { 649 .rss_hf = RTE_ETH_RSS_IP | 650 RTE_ETH_RSS_TCP | 651 RTE_ETH_RSS_UDP, 652 } 653 } 654 }; 655 const uint16_t rx_rings = 1, tx_rings = 1; 656 const uint16_t rx_ring_size = 512, tx_ring_size = 512; 657 struct rte_eth_conf port_conf = port_conf_default; 658 int retval; 659 uint16_t q; 660 struct rte_eth_dev_info dev_info; 661 struct rte_eth_txconf txconf; 662 663 if (!rte_eth_dev_is_valid_port(port)) 664 return -1; 665 666 retval = rte_eth_dev_info_get(port, &dev_info); 667 if (retval != 0) { 668 printf("Error during getting device (port %u) info: %s\n", 669 port, strerror(-retval)); 670 return retval; 671 } 672 673 if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE) 674 port_conf.txmode.offloads |= 675 RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE; 676 rx_conf = dev_info.default_rxconf; 677 rx_conf.offloads = port_conf.rxmode.offloads; 678 679 port_conf.rx_adv_conf.rss_conf.rss_hf &= 680 dev_info.flow_type_rss_offloads; 681 if (port_conf.rx_adv_conf.rss_conf.rss_hf != 682 port_conf_default.rx_adv_conf.rss_conf.rss_hf) { 683 printf("Port %u modified RSS hash function based on hardware support," 684 "requested:%#"PRIx64" configured:%#"PRIx64"\n", 685 port, 686 port_conf_default.rx_adv_conf.rss_conf.rss_hf, 687 port_conf.rx_adv_conf.rss_conf.rss_hf); 688 } 689 690 /* Configure the Ethernet device. */ 691 retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf); 692 if (retval != 0) 693 return retval; 694 695 /* Allocate and set up 1 RX queue per Ethernet port. */ 696 for (q = 0; q < rx_rings; q++) { 697 retval = rte_eth_rx_queue_setup(port, q, rx_ring_size, 698 rte_eth_dev_socket_id(port), &rx_conf, 699 mbuf_pool); 700 if (retval < 0) 701 return retval; 702 } 703 704 txconf = dev_info.default_txconf; 705 txconf.offloads = port_conf_default.txmode.offloads; 706 /* Allocate and set up 1 TX queue per Ethernet port. */ 707 for (q = 0; q < tx_rings; q++) { 708 retval = rte_eth_tx_queue_setup(port, q, tx_ring_size, 709 rte_eth_dev_socket_id(port), &txconf); 710 if (retval < 0) 711 return retval; 712 } 713 714 /* Display the port MAC address. */ 715 struct rte_ether_addr addr; 716 retval = rte_eth_macaddr_get(port, &addr); 717 if (retval != 0) { 718 printf("Failed to get MAC address (port %u): %s\n", 719 port, rte_strerror(-retval)); 720 return retval; 721 } 722 723 printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8 724 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n", 725 (unsigned int)port, RTE_ETHER_ADDR_BYTES(&addr)); 726 727 /* Enable RX in promiscuous mode for the Ethernet device. */ 728 retval = rte_eth_promiscuous_enable(port); 729 if (retval != 0) 730 return retval; 731 732 return 0; 733 } 734 735 static int 736 init_ports(uint16_t num_ports) 737 { 738 uint16_t portid; 739 740 if (!cdata.num_mbuf) 741 cdata.num_mbuf = 16384 * num_ports; 742 743 struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool", 744 /* mbufs */ cdata.num_mbuf, 745 /* cache_size */ 512, 746 /* priv_size*/ 0, 747 /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE, 748 rte_socket_id()); 749 750 RTE_ETH_FOREACH_DEV(portid) 751 if (port_init(portid, mp) != 0) 752 rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n", 753 portid); 754 755 return 0; 756 } 757 758 static void 759 init_adapters(uint16_t nb_ports) 760 { 761 int i; 762 int ret; 763 uint8_t evdev_id = 0; 764 struct rx_adptr_services *adptr_services = NULL; 765 struct rte_event_dev_info dev_info; 766 767 ret = rte_event_dev_info_get(evdev_id, &dev_info); 768 adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0); 769 770 struct rte_event_port_conf adptr_p_conf = { 771 .dequeue_depth = cdata.worker_cq_depth, 772 .enqueue_depth = 64, 773 .new_event_threshold = 4096, 774 .event_port_cfg = RTE_EVENT_PORT_CFG_HINT_PRODUCER, 775 }; 776 777 init_ports(nb_ports); 778 if (adptr_p_conf.new_event_threshold > dev_info.max_num_events) 779 adptr_p_conf.new_event_threshold = dev_info.max_num_events; 780 if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth) 781 adptr_p_conf.dequeue_depth = 782 dev_info.max_event_port_dequeue_depth; 783 if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth) 784 adptr_p_conf.enqueue_depth = 785 dev_info.max_event_port_enqueue_depth; 786 787 struct rte_event_eth_rx_adapter_queue_conf queue_conf; 788 memset(&queue_conf, 0, sizeof(queue_conf)); 789 queue_conf.ev.sched_type = cdata.queue_type; 790 791 for (i = 0; i < nb_ports; i++) { 792 uint32_t cap; 793 uint32_t service_id; 794 795 ret = rte_event_eth_rx_adapter_create(i, evdev_id, 796 &adptr_p_conf); 797 if (ret) 798 rte_exit(EXIT_FAILURE, 799 "failed to create rx adapter[%d]", i); 800 801 ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap); 802 if (ret) 803 rte_exit(EXIT_FAILURE, 804 "failed to get event rx adapter " 805 "capabilities"); 806 807 queue_conf.ev.queue_id = cdata.rx_stride ? 808 (i * cdata.rx_stride) 809 : (uint8_t)cdata.qid[0]; 810 811 ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf); 812 if (ret) 813 rte_exit(EXIT_FAILURE, 814 "Failed to add queues to Rx adapter"); 815 816 /* Producer needs to be scheduled. */ 817 if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { 818 ret = rte_event_eth_rx_adapter_service_id_get(i, 819 &service_id); 820 if (ret != -ESRCH && ret != 0) { 821 rte_exit(EXIT_FAILURE, 822 "Error getting the service ID for rx adptr\n"); 823 } 824 825 rte_service_runstate_set(service_id, 1); 826 rte_service_set_runstate_mapped_check(service_id, 0); 827 828 adptr_services->nb_rx_adptrs++; 829 adptr_services->rx_adpt_arr = rte_realloc( 830 adptr_services->rx_adpt_arr, 831 adptr_services->nb_rx_adptrs * 832 sizeof(uint32_t), 0); 833 adptr_services->rx_adpt_arr[ 834 adptr_services->nb_rx_adptrs - 1] = 835 service_id; 836 } 837 838 ret = rte_event_eth_rx_adapter_start(i); 839 if (ret) 840 rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed", 841 i); 842 } 843 844 /* We already know that Tx adapter has INTERNAL port cap*/ 845 ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id, 846 &adptr_p_conf); 847 if (ret) 848 rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]", 849 cdata.tx_adapter_id); 850 851 for (i = 0; i < nb_ports; i++) { 852 ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i, 853 -1); 854 if (ret) 855 rte_exit(EXIT_FAILURE, 856 "Failed to add queues to Tx adapter"); 857 } 858 859 ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id); 860 if (ret) 861 rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed", 862 cdata.tx_adapter_id); 863 864 if (adptr_services->nb_rx_adptrs) { 865 struct rte_service_spec service; 866 867 memset(&service, 0, sizeof(struct rte_service_spec)); 868 snprintf(service.name, sizeof(service.name), "rx_service"); 869 service.callback = service_rx_adapter; 870 service.callback_userdata = (void *)adptr_services; 871 872 int32_t ret = rte_service_component_register(&service, 873 &fdata->rxadptr_service_id); 874 if (ret) 875 rte_exit(EXIT_FAILURE, 876 "Rx adapter service register failed"); 877 878 rte_service_runstate_set(fdata->rxadptr_service_id, 1); 879 rte_service_component_runstate_set(fdata->rxadptr_service_id, 880 1); 881 rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 882 0); 883 } else { 884 memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 885 rte_free(adptr_services); 886 } 887 888 if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap & 889 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)) 890 fdata->cap.scheduler = NULL; 891 } 892 893 static void 894 worker_tx_enq_opt_check(void) 895 { 896 int i; 897 int ret; 898 uint32_t cap = 0; 899 uint8_t rx_needed = 0; 900 uint8_t sched_needed = 0; 901 struct rte_event_dev_info eventdev_info; 902 903 memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); 904 rte_event_dev_info_get(0, &eventdev_info); 905 906 if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & 907 RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) 908 rte_exit(EXIT_FAILURE, 909 "Event dev doesn't support all type queues\n"); 910 sched_needed = !(eventdev_info.event_dev_cap & 911 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED); 912 913 RTE_ETH_FOREACH_DEV(i) { 914 ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); 915 if (ret) 916 rte_exit(EXIT_FAILURE, 917 "failed to get event rx adapter capabilities"); 918 rx_needed |= 919 !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT); 920 } 921 922 if (cdata.worker_lcore_mask == 0 || 923 (rx_needed && cdata.rx_lcore_mask == 0) || 924 (sched_needed && cdata.sched_lcore_mask == 0)) { 925 printf("Core part of pipeline was not assigned any cores. " 926 "This will stall the pipeline, please check core masks " 927 "(use -h for details on setting core masks):\n" 928 "\trx: %"PRIu64"\n\tsched: %"PRIu64 929 "\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask, 930 cdata.sched_lcore_mask, cdata.worker_lcore_mask); 931 rte_exit(-1, "Fix core masks\n"); 932 } 933 934 if (!sched_needed) 935 memset(fdata->sched_core, 0, 936 sizeof(unsigned int) * MAX_NUM_CORE); 937 if (!rx_needed) 938 memset(fdata->rx_core, 0, 939 sizeof(unsigned int) * MAX_NUM_CORE); 940 941 memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 942 } 943 944 static worker_loop 945 get_worker_loop_single_burst(uint8_t atq) 946 { 947 if (atq) 948 return worker_do_tx_single_burst_atq; 949 950 return worker_do_tx_single_burst; 951 } 952 953 static worker_loop 954 get_worker_loop_single_non_burst(uint8_t atq) 955 { 956 if (atq) 957 return worker_do_tx_single_atq; 958 959 return worker_do_tx_single; 960 } 961 962 static worker_loop 963 get_worker_loop_burst(uint8_t atq) 964 { 965 if (atq) 966 return worker_do_tx_burst_atq; 967 968 return worker_do_tx_burst; 969 } 970 971 static worker_loop 972 get_worker_loop_non_burst(uint8_t atq) 973 { 974 if (atq) 975 return worker_do_tx_atq; 976 977 return worker_do_tx; 978 } 979 980 static worker_loop 981 get_worker_single_stage(bool burst) 982 { 983 uint8_t atq = cdata.all_type_queues ? 1 : 0; 984 985 if (burst) 986 return get_worker_loop_single_burst(atq); 987 988 return get_worker_loop_single_non_burst(atq); 989 } 990 991 static worker_loop 992 get_worker_multi_stage(bool burst) 993 { 994 uint8_t atq = cdata.all_type_queues ? 1 : 0; 995 996 if (burst) 997 return get_worker_loop_burst(atq); 998 999 return get_worker_loop_non_burst(atq); 1000 } 1001 1002 void 1003 set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst) 1004 { 1005 if (cdata.num_stages == 1) 1006 caps->worker = get_worker_single_stage(burst); 1007 else 1008 caps->worker = get_worker_multi_stage(burst); 1009 1010 caps->check_opt = worker_tx_enq_opt_check; 1011 caps->scheduler = schedule_devices; 1012 caps->evdev_setup = setup_eventdev_worker_tx_enq; 1013 caps->adptr_setup = init_adapters; 1014 } 1015