1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright(c) 2010-2014 Intel Corporation 4 * Copyright 2017 Cavium, Inc. 5 */ 6 7 #include <stdlib.h> 8 9 #include "pipeline_common.h" 10 11 static __rte_always_inline void 12 worker_fwd_event(struct rte_event *ev, uint8_t sched) 13 { 14 ev->event_type = RTE_EVENT_TYPE_CPU; 15 ev->op = RTE_EVENT_OP_FORWARD; 16 ev->sched_type = sched; 17 } 18 19 static __rte_always_inline void 20 worker_event_enqueue(const uint8_t dev, const uint8_t port, 21 struct rte_event *ev) 22 { 23 while (!rte_event_enqueue_burst(dev, port, ev, 1) && !fdata->done) 24 rte_pause(); 25 } 26 27 static __rte_always_inline uint16_t 28 worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, 29 struct rte_event *ev, const uint16_t nb_rx) 30 { 31 uint16_t enq; 32 33 enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); 34 while (enq < nb_rx && !fdata->done) 35 enq += rte_event_enqueue_burst(dev, port, 36 ev + enq, nb_rx - enq); 37 38 return enq; 39 } 40 41 static __rte_always_inline void 42 worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev) 43 { 44 exchange_mac(ev->mbuf); 45 rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0); 46 while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0) && 47 !fdata->done) 48 rte_pause(); 49 } 50 51 /* Single stage pipeline workers */ 52 53 static int 54 worker_do_tx_single(void *arg) 55 { 56 struct worker_data *data = (struct worker_data *)arg; 57 const uint8_t dev = data->dev_id; 58 const uint8_t port = data->port_id; 59 size_t fwd = 0, received = 0, tx = 0; 60 struct rte_event ev; 61 62 while (!fdata->done) { 63 64 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 65 rte_pause(); 66 continue; 67 } 68 69 received++; 70 71 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 72 worker_tx_pkt(dev, port, &ev); 73 tx++; 74 } else { 75 work(); 76 ev.queue_id++; 77 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 78 worker_event_enqueue(dev, port, &ev); 79 fwd++; 80 } 81 } 82 83 if (ev.u64) { 84 ev.op = RTE_EVENT_OP_RELEASE; 85 rte_event_enqueue_burst(dev, port, &ev, 1); 86 } 87 88 if (!cdata.quiet) 89 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 90 rte_lcore_id(), received, fwd, tx); 91 return 0; 92 } 93 94 static int 95 worker_do_tx_single_atq(void *arg) 96 { 97 struct worker_data *data = (struct worker_data *)arg; 98 const uint8_t dev = data->dev_id; 99 const uint8_t port = data->port_id; 100 size_t fwd = 0, received = 0, tx = 0; 101 struct rte_event ev; 102 103 while (!fdata->done) { 104 105 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 106 rte_pause(); 107 continue; 108 } 109 110 received++; 111 112 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 113 worker_tx_pkt(dev, port, &ev); 114 tx++; 115 } else { 116 work(); 117 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 118 worker_event_enqueue(dev, port, &ev); 119 fwd++; 120 } 121 } 122 123 if (ev.u64) { 124 ev.op = RTE_EVENT_OP_RELEASE; 125 rte_event_enqueue_burst(dev, port, &ev, 1); 126 } 127 128 if (!cdata.quiet) 129 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 130 rte_lcore_id(), received, fwd, tx); 131 return 0; 132 } 133 134 static int 135 worker_do_tx_single_burst(void *arg) 136 { 137 struct rte_event ev[BATCH_SIZE + 1]; 138 139 struct worker_data *data = (struct worker_data *)arg; 140 const uint8_t dev = data->dev_id; 141 const uint8_t port = data->port_id; 142 size_t fwd = 0, received = 0, tx = 0; 143 uint16_t nb_tx = 0, nb_rx = 0, i; 144 145 while (!fdata->done) { 146 nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); 147 148 if (!nb_rx) { 149 rte_pause(); 150 continue; 151 } 152 received += nb_rx; 153 154 for (i = 0; i < nb_rx; i++) { 155 rte_prefetch0(ev[i + 1].mbuf); 156 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 157 158 worker_tx_pkt(dev, port, &ev[i]); 159 ev[i].op = RTE_EVENT_OP_RELEASE; 160 tx++; 161 162 } else { 163 ev[i].queue_id++; 164 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 165 } 166 work(); 167 } 168 169 nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); 170 fwd += nb_tx; 171 } 172 173 worker_cleanup(dev, port, ev, nb_tx, nb_rx); 174 175 if (!cdata.quiet) 176 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 177 rte_lcore_id(), received, fwd, tx); 178 return 0; 179 } 180 181 static int 182 worker_do_tx_single_burst_atq(void *arg) 183 { 184 struct rte_event ev[BATCH_SIZE + 1]; 185 186 struct worker_data *data = (struct worker_data *)arg; 187 const uint8_t dev = data->dev_id; 188 const uint8_t port = data->port_id; 189 size_t fwd = 0, received = 0, tx = 0; 190 uint16_t i, nb_rx = 0, nb_tx = 0; 191 192 while (!fdata->done) { 193 nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); 194 195 if (!nb_rx) { 196 rte_pause(); 197 continue; 198 } 199 200 received += nb_rx; 201 202 for (i = 0; i < nb_rx; i++) { 203 rte_prefetch0(ev[i + 1].mbuf); 204 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 205 206 worker_tx_pkt(dev, port, &ev[i]); 207 ev[i].op = RTE_EVENT_OP_RELEASE; 208 tx++; 209 } else 210 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 211 work(); 212 } 213 214 nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); 215 fwd += nb_tx; 216 } 217 218 worker_cleanup(dev, port, ev, nb_tx, nb_rx); 219 220 if (!cdata.quiet) 221 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 222 rte_lcore_id(), received, fwd, tx); 223 return 0; 224 } 225 226 /* Multi stage Pipeline Workers */ 227 228 static int 229 worker_do_tx(void *arg) 230 { 231 struct rte_event ev; 232 233 struct worker_data *data = (struct worker_data *)arg; 234 const uint8_t dev = data->dev_id; 235 const uint8_t port = data->port_id; 236 const uint8_t lst_qid = cdata.num_stages - 1; 237 size_t fwd = 0, received = 0, tx = 0; 238 239 240 while (!fdata->done) { 241 242 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 243 rte_pause(); 244 continue; 245 } 246 247 received++; 248 const uint8_t cq_id = ev.queue_id % cdata.num_stages; 249 250 if (cq_id >= lst_qid) { 251 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 252 worker_tx_pkt(dev, port, &ev); 253 tx++; 254 continue; 255 } 256 257 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 258 ev.queue_id = (cq_id == lst_qid) ? 259 cdata.next_qid[ev.queue_id] : ev.queue_id; 260 } else { 261 ev.queue_id = cdata.next_qid[ev.queue_id]; 262 worker_fwd_event(&ev, cdata.queue_type); 263 } 264 work(); 265 266 worker_event_enqueue(dev, port, &ev); 267 fwd++; 268 } 269 270 if (ev.u64) { 271 ev.op = RTE_EVENT_OP_RELEASE; 272 rte_event_enqueue_burst(dev, port, &ev, 1); 273 } 274 275 if (!cdata.quiet) 276 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 277 rte_lcore_id(), received, fwd, tx); 278 279 return 0; 280 } 281 282 static int 283 worker_do_tx_atq(void *arg) 284 { 285 struct rte_event ev; 286 287 struct worker_data *data = (struct worker_data *)arg; 288 const uint8_t dev = data->dev_id; 289 const uint8_t port = data->port_id; 290 const uint8_t lst_qid = cdata.num_stages - 1; 291 size_t fwd = 0, received = 0, tx = 0; 292 293 while (!fdata->done) { 294 295 if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { 296 rte_pause(); 297 continue; 298 } 299 300 received++; 301 const uint8_t cq_id = ev.sub_event_type % cdata.num_stages; 302 303 if (cq_id == lst_qid) { 304 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 305 worker_tx_pkt(dev, port, &ev); 306 tx++; 307 continue; 308 } 309 310 worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 311 } else { 312 ev.sub_event_type++; 313 worker_fwd_event(&ev, cdata.queue_type); 314 } 315 work(); 316 317 worker_event_enqueue(dev, port, &ev); 318 fwd++; 319 } 320 321 if (ev.u64) { 322 ev.op = RTE_EVENT_OP_RELEASE; 323 rte_event_enqueue_burst(dev, port, &ev, 1); 324 } 325 326 if (!cdata.quiet) 327 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 328 rte_lcore_id(), received, fwd, tx); 329 330 return 0; 331 } 332 333 static int 334 worker_do_tx_burst(void *arg) 335 { 336 struct rte_event ev[BATCH_SIZE]; 337 338 struct worker_data *data = (struct worker_data *)arg; 339 uint8_t dev = data->dev_id; 340 uint8_t port = data->port_id; 341 uint8_t lst_qid = cdata.num_stages - 1; 342 size_t fwd = 0, received = 0, tx = 0; 343 uint16_t i, nb_rx = 0, nb_tx = 0; 344 345 while (!fdata->done) { 346 nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); 347 348 if (nb_rx == 0) { 349 rte_pause(); 350 continue; 351 } 352 received += nb_rx; 353 354 for (i = 0; i < nb_rx; i++) { 355 const uint8_t cq_id = ev[i].queue_id % cdata.num_stages; 356 357 if (cq_id >= lst_qid) { 358 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 359 worker_tx_pkt(dev, port, &ev[i]); 360 tx++; 361 ev[i].op = RTE_EVENT_OP_RELEASE; 362 continue; 363 } 364 ev[i].queue_id = (cq_id == lst_qid) ? 365 cdata.next_qid[ev[i].queue_id] : 366 ev[i].queue_id; 367 368 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 369 } else { 370 ev[i].queue_id = cdata.next_qid[ev[i].queue_id]; 371 worker_fwd_event(&ev[i], cdata.queue_type); 372 } 373 work(); 374 } 375 376 nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); 377 fwd += nb_tx; 378 } 379 380 worker_cleanup(dev, port, ev, nb_tx, nb_rx); 381 382 if (!cdata.quiet) 383 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 384 rte_lcore_id(), received, fwd, tx); 385 386 return 0; 387 } 388 389 static int 390 worker_do_tx_burst_atq(void *arg) 391 { 392 struct rte_event ev[BATCH_SIZE]; 393 394 struct worker_data *data = (struct worker_data *)arg; 395 uint8_t dev = data->dev_id; 396 uint8_t port = data->port_id; 397 uint8_t lst_qid = cdata.num_stages - 1; 398 size_t fwd = 0, received = 0, tx = 0; 399 uint16_t i, nb_rx = 0, nb_tx = 0; 400 401 while (!fdata->done) { 402 nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); 403 404 if (nb_rx == 0) { 405 rte_pause(); 406 continue; 407 } 408 received += nb_rx; 409 410 for (i = 0; i < nb_rx; i++) { 411 const uint8_t cq_id = ev[i].sub_event_type % 412 cdata.num_stages; 413 414 if (cq_id == lst_qid) { 415 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 416 worker_tx_pkt(dev, port, &ev[i]); 417 tx++; 418 ev[i].op = RTE_EVENT_OP_RELEASE; 419 continue; 420 } 421 422 worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 423 } else { 424 ev[i].sub_event_type++; 425 worker_fwd_event(&ev[i], cdata.queue_type); 426 } 427 work(); 428 } 429 430 nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); 431 fwd += nb_tx; 432 } 433 434 worker_cleanup(dev, port, ev, nb_tx, nb_rx); 435 436 if (!cdata.quiet) 437 printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", 438 rte_lcore_id(), received, fwd, tx); 439 440 return 0; 441 } 442 443 static int 444 setup_eventdev_worker_tx_enq(struct worker_data *worker_data) 445 { 446 uint8_t i; 447 const uint8_t atq = cdata.all_type_queues ? 1 : 0; 448 const uint8_t dev_id = 0; 449 const uint8_t nb_ports = cdata.num_workers; 450 uint8_t nb_slots = 0; 451 uint8_t nb_queues = rte_eth_dev_count_avail(); 452 453 /* 454 * In case where all type queues are not enabled, use queues equal to 455 * number of stages * eth_dev_count and one extra queue per pipeline 456 * for Tx. 457 */ 458 if (!atq) { 459 nb_queues *= cdata.num_stages; 460 nb_queues += rte_eth_dev_count_avail(); 461 } 462 463 struct rte_event_dev_config config = { 464 .nb_event_queues = nb_queues, 465 .nb_event_ports = nb_ports, 466 .nb_single_link_event_port_queues = 0, 467 .nb_events_limit = 4096, 468 .nb_event_queue_flows = 1024, 469 .nb_event_port_dequeue_depth = 128, 470 .nb_event_port_enqueue_depth = 128, 471 }; 472 struct rte_event_port_conf wkr_p_conf = { 473 .dequeue_depth = cdata.worker_cq_depth, 474 .enqueue_depth = 64, 475 .new_event_threshold = 4096, 476 .event_port_cfg = RTE_EVENT_PORT_CFG_HINT_WORKER, 477 }; 478 struct rte_event_queue_conf wkr_q_conf = { 479 .schedule_type = cdata.queue_type, 480 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 481 .nb_atomic_flows = 1024, 482 .nb_atomic_order_sequences = 1024, 483 }; 484 485 int ret, ndev = rte_event_dev_count(); 486 487 if (ndev < 1) { 488 printf("%d: No Eventdev Devices Found\n", __LINE__); 489 return -1; 490 } 491 492 493 struct rte_event_dev_info dev_info; 494 ret = rte_event_dev_info_get(dev_id, &dev_info); 495 printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); 496 497 if (dev_info.max_num_events < config.nb_events_limit) 498 config.nb_events_limit = dev_info.max_num_events; 499 if (dev_info.max_event_port_dequeue_depth < 500 config.nb_event_port_dequeue_depth) 501 config.nb_event_port_dequeue_depth = 502 dev_info.max_event_port_dequeue_depth; 503 if (dev_info.max_event_port_enqueue_depth < 504 config.nb_event_port_enqueue_depth) 505 config.nb_event_port_enqueue_depth = 506 dev_info.max_event_port_enqueue_depth; 507 508 if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_EVENT_PRESCHEDULE) 509 config.preschedule_type = RTE_EVENT_PRESCHEDULE; 510 511 if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_EVENT_PRESCHEDULE_ADAPTIVE) 512 config.preschedule_type = RTE_EVENT_PRESCHEDULE_ADAPTIVE; 513 514 ret = rte_event_dev_configure(dev_id, &config); 515 if (ret < 0) { 516 printf("%d: Error configuring device\n", __LINE__); 517 return -1; 518 } 519 520 printf(" Stages:\n"); 521 for (i = 0; i < nb_queues; i++) { 522 523 if (atq) { 524 525 nb_slots = cdata.num_stages; 526 wkr_q_conf.event_queue_cfg = 527 RTE_EVENT_QUEUE_CFG_ALL_TYPES; 528 } else { 529 uint8_t slot; 530 531 nb_slots = cdata.num_stages + 1; 532 slot = i % nb_slots; 533 wkr_q_conf.schedule_type = slot == cdata.num_stages ? 534 RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; 535 } 536 537 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) { 538 printf("%d: error creating qid %d\n", __LINE__, i); 539 return -1; 540 } 541 cdata.qid[i] = i; 542 cdata.next_qid[i] = i+1; 543 if (cdata.enable_queue_priorities) { 544 const uint32_t prio_delta = 545 (RTE_EVENT_DEV_PRIORITY_LOWEST) / 546 nb_slots; 547 548 /* higher priority for queues closer to tx */ 549 wkr_q_conf.priority = 550 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * 551 (i % nb_slots); 552 } 553 554 const char *type_str = "Atomic"; 555 switch (wkr_q_conf.schedule_type) { 556 case RTE_SCHED_TYPE_ORDERED: 557 type_str = "Ordered"; 558 break; 559 case RTE_SCHED_TYPE_PARALLEL: 560 type_str = "Parallel"; 561 break; 562 } 563 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str, 564 wkr_q_conf.priority); 565 } 566 567 printf("\n"); 568 if (wkr_p_conf.new_event_threshold > config.nb_events_limit) 569 wkr_p_conf.new_event_threshold = config.nb_events_limit; 570 if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) 571 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; 572 if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) 573 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; 574 575 /* set up one port per worker, linking to all stage queues */ 576 for (i = 0; i < cdata.num_workers; i++) { 577 struct worker_data *w = &worker_data[i]; 578 w->dev_id = dev_id; 579 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) { 580 printf("Error setting up port %d\n", i); 581 return -1; 582 } 583 584 if (rte_event_port_link(dev_id, i, NULL, NULL, 0) 585 != nb_queues) { 586 printf("%d: error creating link for port %d\n", 587 __LINE__, i); 588 return -1; 589 } 590 w->port_id = i; 591 } 592 /* 593 * Reduce the load on ingress event queue by splitting the traffic 594 * across multiple event queues. 595 * for example, nb_stages = 2 and nb_ethdev = 2 then 596 * 597 * nb_queues = (2 * 2) + 2 = 6 (non atq) 598 * rx_stride = 3 599 * 600 * So, traffic is split across queue 0 and queue 3 since queue id for 601 * rx adapter is chosen <ethport_id> * <rx_stride> i.e in the above 602 * case eth port 0, 1 will inject packets into event queue 0, 3 603 * respectively. 604 * 605 * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx. 606 */ 607 cdata.rx_stride = atq ? 1 : nb_slots; 608 ret = rte_event_dev_service_id_get(dev_id, 609 &fdata->evdev_service_id); 610 if (ret != -ESRCH && ret != 0) { 611 printf("Error getting the service ID\n"); 612 return -1; 613 } 614 rte_service_runstate_set(fdata->evdev_service_id, 1); 615 rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0); 616 617 if (rte_event_dev_start(dev_id) < 0) 618 rte_exit(EXIT_FAILURE, "Error starting eventdev"); 619 620 return dev_id; 621 } 622 623 624 struct rx_adptr_services { 625 uint16_t nb_rx_adptrs; 626 uint32_t *rx_adpt_arr; 627 }; 628 629 static int32_t 630 service_rx_adapter(void *arg) 631 { 632 int i; 633 struct rx_adptr_services *adptr_services = arg; 634 635 for (i = 0; i < adptr_services->nb_rx_adptrs; i++) 636 rte_service_run_iter_on_app_lcore( 637 adptr_services->rx_adpt_arr[i], 1); 638 return 0; 639 } 640 641 /* 642 * Initializes a given port using global settings and with the RX buffers 643 * coming from the mbuf_pool passed as a parameter. 644 */ 645 static inline int 646 port_init(uint8_t port, struct rte_mempool *mbuf_pool) 647 { 648 struct rte_eth_rxconf rx_conf; 649 static const struct rte_eth_conf port_conf_default = { 650 .rxmode = { 651 .mq_mode = RTE_ETH_MQ_RX_RSS, 652 }, 653 .rx_adv_conf = { 654 .rss_conf = { 655 .rss_hf = RTE_ETH_RSS_IP | 656 RTE_ETH_RSS_TCP | 657 RTE_ETH_RSS_UDP, 658 } 659 } 660 }; 661 const uint16_t rx_rings = 1, tx_rings = 1; 662 const uint16_t rx_ring_size = 512, tx_ring_size = 512; 663 struct rte_eth_conf port_conf = port_conf_default; 664 int retval; 665 uint16_t q; 666 struct rte_eth_dev_info dev_info; 667 struct rte_eth_txconf txconf; 668 669 if (!rte_eth_dev_is_valid_port(port)) 670 return -1; 671 672 retval = rte_eth_dev_info_get(port, &dev_info); 673 if (retval != 0) { 674 printf("Error during getting device (port %u) info: %s\n", 675 port, strerror(-retval)); 676 return retval; 677 } 678 679 if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE) 680 port_conf.txmode.offloads |= 681 RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE; 682 rx_conf = dev_info.default_rxconf; 683 rx_conf.offloads = port_conf.rxmode.offloads; 684 685 port_conf.rx_adv_conf.rss_conf.rss_hf &= 686 dev_info.flow_type_rss_offloads; 687 if (port_conf.rx_adv_conf.rss_conf.rss_hf != 688 port_conf_default.rx_adv_conf.rss_conf.rss_hf) { 689 printf("Port %u modified RSS hash function based on hardware support," 690 "requested:%#"PRIx64" configured:%#"PRIx64"\n", 691 port, 692 port_conf_default.rx_adv_conf.rss_conf.rss_hf, 693 port_conf.rx_adv_conf.rss_conf.rss_hf); 694 } 695 696 /* Configure the Ethernet device. */ 697 retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf); 698 if (retval != 0) 699 return retval; 700 701 /* Allocate and set up 1 RX queue per Ethernet port. */ 702 for (q = 0; q < rx_rings; q++) { 703 retval = rte_eth_rx_queue_setup(port, q, rx_ring_size, 704 rte_eth_dev_socket_id(port), &rx_conf, 705 mbuf_pool); 706 if (retval < 0) 707 return retval; 708 } 709 710 txconf = dev_info.default_txconf; 711 txconf.offloads = port_conf_default.txmode.offloads; 712 /* Allocate and set up 1 TX queue per Ethernet port. */ 713 for (q = 0; q < tx_rings; q++) { 714 retval = rte_eth_tx_queue_setup(port, q, tx_ring_size, 715 rte_eth_dev_socket_id(port), &txconf); 716 if (retval < 0) 717 return retval; 718 } 719 720 /* Display the port MAC address. */ 721 struct rte_ether_addr addr; 722 retval = rte_eth_macaddr_get(port, &addr); 723 if (retval != 0) { 724 printf("Failed to get MAC address (port %u): %s\n", 725 port, rte_strerror(-retval)); 726 return retval; 727 } 728 729 printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8 730 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n", 731 (unsigned int)port, RTE_ETHER_ADDR_BYTES(&addr)); 732 733 /* Enable RX in promiscuous mode for the Ethernet device. */ 734 retval = rte_eth_promiscuous_enable(port); 735 if (retval != 0) 736 return retval; 737 738 return 0; 739 } 740 741 static int 742 init_ports(uint16_t num_ports) 743 { 744 uint16_t portid; 745 746 if (!cdata.num_mbuf) 747 cdata.num_mbuf = 16384 * num_ports; 748 749 struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool", 750 /* mbufs */ cdata.num_mbuf, 751 /* cache_size */ 512, 752 /* priv_size*/ 0, 753 /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE, 754 rte_socket_id()); 755 756 RTE_ETH_FOREACH_DEV(portid) 757 if (port_init(portid, mp) != 0) 758 rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n", 759 portid); 760 761 return 0; 762 } 763 764 static void 765 init_adapters(uint16_t nb_ports) 766 { 767 int i; 768 int ret; 769 uint8_t evdev_id = 0; 770 struct rx_adptr_services *adptr_services = NULL; 771 struct rte_event_dev_info dev_info; 772 773 ret = rte_event_dev_info_get(evdev_id, &dev_info); 774 adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0); 775 776 struct rte_event_port_conf adptr_p_conf = { 777 .dequeue_depth = cdata.worker_cq_depth, 778 .enqueue_depth = 64, 779 .new_event_threshold = 4096, 780 .event_port_cfg = RTE_EVENT_PORT_CFG_HINT_PRODUCER, 781 }; 782 783 init_ports(nb_ports); 784 if (adptr_p_conf.new_event_threshold > dev_info.max_num_events) 785 adptr_p_conf.new_event_threshold = dev_info.max_num_events; 786 if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth) 787 adptr_p_conf.dequeue_depth = 788 dev_info.max_event_port_dequeue_depth; 789 if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth) 790 adptr_p_conf.enqueue_depth = 791 dev_info.max_event_port_enqueue_depth; 792 793 struct rte_event_eth_rx_adapter_queue_conf queue_conf; 794 memset(&queue_conf, 0, sizeof(queue_conf)); 795 queue_conf.ev.sched_type = cdata.queue_type; 796 797 for (i = 0; i < nb_ports; i++) { 798 uint32_t cap; 799 uint32_t service_id; 800 801 ret = rte_event_eth_rx_adapter_create(i, evdev_id, 802 &adptr_p_conf); 803 if (ret) 804 rte_exit(EXIT_FAILURE, 805 "failed to create rx adapter[%d]", i); 806 807 ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap); 808 if (ret) 809 rte_exit(EXIT_FAILURE, 810 "failed to get event rx adapter " 811 "capabilities"); 812 813 queue_conf.ev.queue_id = cdata.rx_stride ? 814 (i * cdata.rx_stride) 815 : (uint8_t)cdata.qid[0]; 816 817 ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf); 818 if (ret) 819 rte_exit(EXIT_FAILURE, 820 "Failed to add queues to Rx adapter"); 821 822 /* Producer needs to be scheduled. */ 823 if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { 824 ret = rte_event_eth_rx_adapter_service_id_get(i, 825 &service_id); 826 if (ret != -ESRCH && ret != 0) { 827 rte_exit(EXIT_FAILURE, 828 "Error getting the service ID for rx adptr\n"); 829 } 830 831 rte_service_runstate_set(service_id, 1); 832 rte_service_set_runstate_mapped_check(service_id, 0); 833 834 adptr_services->nb_rx_adptrs++; 835 adptr_services->rx_adpt_arr = rte_realloc( 836 adptr_services->rx_adpt_arr, 837 adptr_services->nb_rx_adptrs * 838 sizeof(uint32_t), 0); 839 adptr_services->rx_adpt_arr[ 840 adptr_services->nb_rx_adptrs - 1] = 841 service_id; 842 } 843 844 ret = rte_event_eth_rx_adapter_start(i); 845 if (ret) 846 rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed", 847 i); 848 } 849 850 /* We already know that Tx adapter has INTERNAL port cap*/ 851 ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id, 852 &adptr_p_conf); 853 if (ret) 854 rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]", 855 cdata.tx_adapter_id); 856 857 for (i = 0; i < nb_ports; i++) { 858 ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i, 859 -1); 860 if (ret) 861 rte_exit(EXIT_FAILURE, 862 "Failed to add queues to Tx adapter"); 863 } 864 865 ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id); 866 if (ret) 867 rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed", 868 cdata.tx_adapter_id); 869 870 if (adptr_services->nb_rx_adptrs) { 871 struct rte_service_spec service; 872 873 memset(&service, 0, sizeof(struct rte_service_spec)); 874 snprintf(service.name, sizeof(service.name), "rx_service"); 875 service.callback = service_rx_adapter; 876 service.callback_userdata = (void *)adptr_services; 877 878 int32_t ret = rte_service_component_register(&service, 879 &fdata->rxadptr_service_id); 880 if (ret) 881 rte_exit(EXIT_FAILURE, 882 "Rx adapter service register failed"); 883 884 rte_service_runstate_set(fdata->rxadptr_service_id, 1); 885 rte_service_component_runstate_set(fdata->rxadptr_service_id, 886 1); 887 rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 888 0); 889 } else { 890 memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 891 rte_free(adptr_services); 892 } 893 894 if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap & 895 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)) 896 fdata->cap.scheduler = NULL; 897 } 898 899 static void 900 worker_tx_enq_opt_check(void) 901 { 902 int i; 903 int ret; 904 uint32_t cap = 0; 905 uint8_t rx_needed = 0; 906 uint8_t sched_needed = 0; 907 struct rte_event_dev_info eventdev_info; 908 909 memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); 910 rte_event_dev_info_get(0, &eventdev_info); 911 912 if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & 913 RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) 914 rte_exit(EXIT_FAILURE, 915 "Event dev doesn't support all type queues\n"); 916 sched_needed = !(eventdev_info.event_dev_cap & 917 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED); 918 919 RTE_ETH_FOREACH_DEV(i) { 920 ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); 921 if (ret) 922 rte_exit(EXIT_FAILURE, 923 "failed to get event rx adapter capabilities"); 924 rx_needed |= 925 !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT); 926 } 927 928 if (cdata.worker_lcore_mask == 0 || 929 (rx_needed && cdata.rx_lcore_mask == 0) || 930 (sched_needed && cdata.sched_lcore_mask == 0)) { 931 printf("Core part of pipeline was not assigned any cores. " 932 "This will stall the pipeline, please check core masks " 933 "(use -h for details on setting core masks):\n" 934 "\trx: %"PRIu64"\n\tsched: %"PRIu64 935 "\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask, 936 cdata.sched_lcore_mask, cdata.worker_lcore_mask); 937 rte_exit(-1, "Fix core masks\n"); 938 } 939 940 if (!sched_needed) 941 memset(fdata->sched_core, 0, 942 sizeof(unsigned int) * MAX_NUM_CORE); 943 if (!rx_needed) 944 memset(fdata->rx_core, 0, 945 sizeof(unsigned int) * MAX_NUM_CORE); 946 947 memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); 948 } 949 950 static worker_loop 951 get_worker_loop_single_burst(uint8_t atq) 952 { 953 if (atq) 954 return worker_do_tx_single_burst_atq; 955 956 return worker_do_tx_single_burst; 957 } 958 959 static worker_loop 960 get_worker_loop_single_non_burst(uint8_t atq) 961 { 962 if (atq) 963 return worker_do_tx_single_atq; 964 965 return worker_do_tx_single; 966 } 967 968 static worker_loop 969 get_worker_loop_burst(uint8_t atq) 970 { 971 if (atq) 972 return worker_do_tx_burst_atq; 973 974 return worker_do_tx_burst; 975 } 976 977 static worker_loop 978 get_worker_loop_non_burst(uint8_t atq) 979 { 980 if (atq) 981 return worker_do_tx_atq; 982 983 return worker_do_tx; 984 } 985 986 static worker_loop 987 get_worker_single_stage(bool burst) 988 { 989 uint8_t atq = cdata.all_type_queues ? 1 : 0; 990 991 if (burst) 992 return get_worker_loop_single_burst(atq); 993 994 return get_worker_loop_single_non_burst(atq); 995 } 996 997 static worker_loop 998 get_worker_multi_stage(bool burst) 999 { 1000 uint8_t atq = cdata.all_type_queues ? 1 : 0; 1001 1002 if (burst) 1003 return get_worker_loop_burst(atq); 1004 1005 return get_worker_loop_non_burst(atq); 1006 } 1007 1008 void 1009 set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst) 1010 { 1011 if (cdata.num_stages == 1) 1012 caps->worker = get_worker_single_stage(burst); 1013 else 1014 caps->worker = get_worker_multi_stage(burst); 1015 1016 caps->check_opt = worker_tx_enq_opt_check; 1017 caps->scheduler = schedule_devices; 1018 caps->evdev_setup = setup_eventdev_worker_tx_enq; 1019 caps->adptr_setup = init_adapters; 1020 } 1021