1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2017 Intel Corporation 3 */ 4 5 #include <stdint.h> 6 #include <stdlib.h> 7 #include <inttypes.h> 8 #include <unistd.h> 9 #include <signal.h> 10 #include <getopt.h> 11 12 #include <rte_eal.h> 13 #include <rte_ethdev.h> 14 #include <rte_cycles.h> 15 #include <rte_malloc.h> 16 #include <rte_debug.h> 17 #include <rte_prefetch.h> 18 #include <rte_distributor.h> 19 #include <rte_pause.h> 20 #include <rte_power_cpufreq.h> 21 22 #define RX_RING_SIZE 1024 23 #define TX_RING_SIZE 1024 24 #define NUM_MBUFS ((64*1024)-1) 25 #define MBUF_CACHE_SIZE 128 26 #define BURST_SIZE 64 27 #define SCHED_RX_RING_SZ 8192 28 #define SCHED_TX_RING_SZ 65536 29 #define BURST_SIZE_TX 32 30 31 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 32 33 #define ANSI_COLOR_RED "\x1b[31m" 34 #define ANSI_COLOR_RESET "\x1b[0m" 35 36 /* mask of enabled ports */ 37 static uint32_t enabled_port_mask; 38 volatile uint8_t quit_signal; 39 volatile uint8_t quit_signal_rx; 40 volatile uint8_t quit_signal_dist; 41 volatile uint8_t quit_signal_work; 42 unsigned int power_lib_initialised; 43 bool enable_lcore_rx_distributor; 44 unsigned int num_workers; 45 46 static volatile struct app_stats { 47 alignas(RTE_CACHE_LINE_SIZE) struct { 48 uint64_t rx_pkts; 49 uint64_t returned_pkts; 50 uint64_t enqueued_pkts; 51 uint64_t enqdrop_pkts; 52 } rx; 53 alignas(RTE_CACHE_LINE_SIZE) int pad1; 54 55 alignas(RTE_CACHE_LINE_SIZE) struct { 56 uint64_t in_pkts; 57 uint64_t ret_pkts; 58 uint64_t sent_pkts; 59 uint64_t enqdrop_pkts; 60 } dist; 61 alignas(RTE_CACHE_LINE_SIZE) int pad2; 62 63 alignas(RTE_CACHE_LINE_SIZE) struct { 64 uint64_t dequeue_pkts; 65 uint64_t tx_pkts; 66 uint64_t enqdrop_pkts; 67 } tx; 68 alignas(RTE_CACHE_LINE_SIZE) int pad3; 69 70 alignas(RTE_CACHE_LINE_SIZE) uint64_t worker_pkts[64]; 71 72 alignas(RTE_CACHE_LINE_SIZE) int pad4; 73 74 alignas(RTE_CACHE_LINE_SIZE) uint64_t worker_bursts[64][8]; 75 76 alignas(RTE_CACHE_LINE_SIZE) int pad5; 77 78 alignas(RTE_CACHE_LINE_SIZE) uint64_t port_rx_pkts[64]; 79 alignas(RTE_CACHE_LINE_SIZE) uint64_t port_tx_pkts[64]; 80 } app_stats; 81 82 struct app_stats prev_app_stats; 83 84 static const struct rte_eth_conf port_conf_default = { 85 .rxmode = { 86 .mq_mode = RTE_ETH_MQ_RX_RSS, 87 }, 88 .txmode = { 89 .mq_mode = RTE_ETH_MQ_TX_NONE, 90 }, 91 .rx_adv_conf = { 92 .rss_conf = { 93 .rss_hf = RTE_ETH_RSS_IP | RTE_ETH_RSS_UDP | 94 RTE_ETH_RSS_TCP | RTE_ETH_RSS_SCTP, 95 } 96 }, 97 }; 98 99 struct output_buffer { 100 unsigned count; 101 struct rte_mbuf *mbufs[BURST_SIZE]; 102 }; 103 104 static void print_stats(void); 105 106 /* 107 * Initialises a given port using global settings and with the rx buffers 108 * coming from the mbuf_pool passed as parameter 109 */ 110 static inline int 111 port_init(uint16_t port, struct rte_mempool *mbuf_pool) 112 { 113 struct rte_eth_conf port_conf = port_conf_default; 114 const uint16_t rxRings = 1, txRings = 1; 115 int retval; 116 uint16_t q; 117 uint16_t nb_rxd = RX_RING_SIZE; 118 uint16_t nb_txd = TX_RING_SIZE; 119 struct rte_eth_dev_info dev_info; 120 struct rte_eth_txconf txconf; 121 122 if (!rte_eth_dev_is_valid_port(port)) 123 return -1; 124 125 retval = rte_eth_dev_info_get(port, &dev_info); 126 if (retval != 0) { 127 printf("Error during getting device (port %u) info: %s\n", 128 port, strerror(-retval)); 129 return retval; 130 } 131 132 if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE) 133 port_conf.txmode.offloads |= 134 RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE; 135 136 port_conf.rx_adv_conf.rss_conf.rss_hf &= 137 dev_info.flow_type_rss_offloads; 138 if (port_conf.rx_adv_conf.rss_conf.rss_hf != 139 port_conf_default.rx_adv_conf.rss_conf.rss_hf) { 140 printf("Port %u modified RSS hash function based on hardware support," 141 "requested:%#"PRIx64" configured:%#"PRIx64"\n", 142 port, 143 port_conf_default.rx_adv_conf.rss_conf.rss_hf, 144 port_conf.rx_adv_conf.rss_conf.rss_hf); 145 } 146 147 retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); 148 if (retval != 0) 149 return retval; 150 151 retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd); 152 if (retval != 0) 153 return retval; 154 155 for (q = 0; q < rxRings; q++) { 156 retval = rte_eth_rx_queue_setup(port, q, nb_rxd, 157 rte_eth_dev_socket_id(port), 158 NULL, mbuf_pool); 159 if (retval < 0) 160 return retval; 161 } 162 163 txconf = dev_info.default_txconf; 164 txconf.offloads = port_conf.txmode.offloads; 165 for (q = 0; q < txRings; q++) { 166 retval = rte_eth_tx_queue_setup(port, q, nb_txd, 167 rte_eth_dev_socket_id(port), 168 &txconf); 169 if (retval < 0) 170 return retval; 171 } 172 173 retval = rte_eth_dev_start(port); 174 if (retval < 0) 175 return retval; 176 177 struct rte_eth_link link; 178 do { 179 retval = rte_eth_link_get_nowait(port, &link); 180 if (retval < 0) { 181 printf("Failed link get (port %u): %s\n", 182 port, rte_strerror(-retval)); 183 return retval; 184 } else if (link.link_status) 185 break; 186 187 printf("Waiting for Link up on port %"PRIu16"\n", port); 188 sleep(1); 189 } while (!link.link_status); 190 191 if (!link.link_status) { 192 printf("Link down on port %"PRIu16"\n", port); 193 return 0; 194 } 195 196 struct rte_ether_addr addr; 197 retval = rte_eth_macaddr_get(port, &addr); 198 if (retval < 0) { 199 printf("Failed to get MAC address (port %u): %s\n", 200 port, rte_strerror(-retval)); 201 return retval; 202 } 203 204 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 205 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 206 port, RTE_ETHER_ADDR_BYTES(&addr)); 207 208 retval = rte_eth_promiscuous_enable(port); 209 if (retval != 0) 210 return retval; 211 212 return 0; 213 } 214 215 struct lcore_params { 216 unsigned worker_id; 217 struct rte_distributor *d; 218 struct rte_ring *rx_dist_ring; 219 struct rte_ring *dist_tx_ring; 220 struct rte_mempool *mem_pool; 221 }; 222 223 static int 224 lcore_rx(struct lcore_params *p) 225 { 226 const uint16_t nb_ports = rte_eth_dev_count_avail(); 227 const int socket_id = rte_socket_id(); 228 uint16_t port; 229 struct rte_mbuf *bufs[BURST_SIZE*2]; 230 231 RTE_ETH_FOREACH_DEV(port) { 232 /* skip ports that are not enabled */ 233 if ((enabled_port_mask & (1 << port)) == 0) 234 continue; 235 236 if (rte_eth_dev_socket_id(port) >= 0 && 237 rte_eth_dev_socket_id(port) != socket_id) 238 printf("WARNING, port %u is on remote NUMA node to " 239 "RX thread.\n\tPerformance will not " 240 "be optimal.\n", port); 241 } 242 243 printf("\nCore %u doing packet RX.\n", rte_lcore_id()); 244 port = 0; 245 while (!quit_signal_rx) { 246 247 /* skip ports that are not enabled */ 248 if ((enabled_port_mask & (1 << port)) == 0) { 249 if (++port == nb_ports) 250 port = 0; 251 continue; 252 } 253 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 254 BURST_SIZE); 255 if (unlikely(nb_rx == 0)) { 256 if (++port == nb_ports) 257 port = 0; 258 continue; 259 } 260 app_stats.rx.rx_pkts += nb_rx; 261 262 /* 263 * Swap the following two lines if you want the rx traffic 264 * to go directly to tx, no distribution. 265 */ 266 struct rte_ring *out_ring = p->rx_dist_ring; 267 /* struct rte_ring *out_ring = p->dist_tx_ring; */ 268 269 uint16_t sent = rte_ring_enqueue_burst(out_ring, 270 (void *)bufs, nb_rx, NULL); 271 272 app_stats.rx.enqueued_pkts += sent; 273 if (unlikely(sent < nb_rx)) { 274 app_stats.rx.enqdrop_pkts += nb_rx - sent; 275 RTE_LOG_DP(DEBUG, DISTRAPP, 276 "%s:Packet loss due to full ring\n", __func__); 277 while (sent < nb_rx) 278 rte_pktmbuf_free(bufs[sent++]); 279 } 280 if (++port == nb_ports) 281 port = 0; 282 } 283 if (power_lib_initialised) 284 rte_power_exit(rte_lcore_id()); 285 printf("\nCore %u exiting rx task.\n", rte_lcore_id()); 286 /* set distributor threads quit flag */ 287 quit_signal_dist = 1; 288 return 0; 289 } 290 291 static int 292 lcore_rx_and_distributor(struct lcore_params *p) 293 { 294 struct rte_distributor *d = p->d; 295 const uint16_t nb_ports = rte_eth_dev_count_avail(); 296 const int socket_id = rte_socket_id(); 297 uint16_t port; 298 struct rte_mbuf *bufs[BURST_SIZE*2]; 299 300 RTE_ETH_FOREACH_DEV(port) { 301 /* skip ports that are not enabled */ 302 if ((enabled_port_mask & (1 << port)) == 0) 303 continue; 304 305 if (rte_eth_dev_socket_id(port) > 0 && 306 rte_eth_dev_socket_id(port) != socket_id) 307 printf("WARNING, port %u is on remote NUMA node to " 308 "RX thread.\n\tPerformance will not " 309 "be optimal.\n", port); 310 } 311 312 printf("\nCore %u doing packet RX and Distributor.\n", rte_lcore_id()); 313 port = 0; 314 while (!quit_signal_rx) { 315 316 /* skip ports that are not enabled */ 317 if ((enabled_port_mask & (1 << port)) == 0) { 318 if (++port == nb_ports) 319 port = 0; 320 continue; 321 } 322 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 323 BURST_SIZE); 324 if (unlikely(nb_rx == 0)) { 325 if (++port == nb_ports) 326 port = 0; 327 continue; 328 } 329 app_stats.rx.rx_pkts += nb_rx; 330 331 /* 332 * Run the distributor on the rx core. Returned 333 * packets are then send straight to the tx core. 334 */ 335 rte_distributor_process(d, bufs, nb_rx); 336 const uint16_t nb_ret = rte_distributor_returned_pkts(d, 337 bufs, BURST_SIZE*2); 338 339 app_stats.rx.returned_pkts += nb_ret; 340 if (unlikely(nb_ret == 0)) { 341 if (++port == nb_ports) 342 port = 0; 343 continue; 344 } 345 346 struct rte_ring *tx_ring = p->dist_tx_ring; 347 uint16_t sent = rte_ring_enqueue_burst(tx_ring, 348 (void *)bufs, nb_ret, NULL); 349 350 app_stats.rx.enqueued_pkts += sent; 351 if (unlikely(sent < nb_ret)) { 352 app_stats.rx.enqdrop_pkts += nb_ret - sent; 353 RTE_LOG_DP(DEBUG, DISTRAPP, 354 "%s:Packet loss due to full ring\n", __func__); 355 while (sent < nb_ret) 356 rte_pktmbuf_free(bufs[sent++]); 357 } 358 if (++port == nb_ports) 359 port = 0; 360 } 361 if (power_lib_initialised) 362 rte_power_exit(rte_lcore_id()); 363 printf("\nCore %u exiting rx task.\n", rte_lcore_id()); 364 /* set tx threads quit flag */ 365 quit_signal = 1; 366 /* set worker threads quit flag */ 367 quit_signal_work = 1; 368 rte_distributor_flush(d); 369 /* Unblock any returns so workers can exit */ 370 rte_distributor_clear_returns(d); 371 return 0; 372 } 373 374 static inline void 375 flush_one_port(struct output_buffer *outbuf, uint8_t outp) 376 { 377 unsigned int nb_tx = rte_eth_tx_burst(outp, 0, 378 outbuf->mbufs, outbuf->count); 379 app_stats.tx.tx_pkts += outbuf->count; 380 381 if (unlikely(nb_tx < outbuf->count)) { 382 app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx; 383 do { 384 rte_pktmbuf_free(outbuf->mbufs[nb_tx]); 385 } while (++nb_tx < outbuf->count); 386 } 387 outbuf->count = 0; 388 } 389 390 static inline void 391 flush_all_ports(struct output_buffer *tx_buffers) 392 { 393 uint16_t outp; 394 395 RTE_ETH_FOREACH_DEV(outp) { 396 /* skip ports that are not enabled */ 397 if ((enabled_port_mask & (1 << outp)) == 0) 398 continue; 399 400 if (tx_buffers[outp].count == 0) 401 continue; 402 403 flush_one_port(&tx_buffers[outp], outp); 404 } 405 } 406 407 408 409 static int 410 lcore_distributor(struct lcore_params *p) 411 { 412 struct rte_ring *in_r = p->rx_dist_ring; 413 struct rte_ring *out_r = p->dist_tx_ring; 414 struct rte_mbuf *bufs[BURST_SIZE * 4]; 415 struct rte_distributor *d = p->d; 416 417 printf("\nCore %u acting as distributor core.\n", rte_lcore_id()); 418 while (!quit_signal_dist) { 419 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 420 (void *)bufs, BURST_SIZE*1, NULL); 421 if (nb_rx) { 422 app_stats.dist.in_pkts += nb_rx; 423 424 /* Distribute the packets */ 425 rte_distributor_process(d, bufs, nb_rx); 426 /* Handle Returns */ 427 const uint16_t nb_ret = 428 rte_distributor_returned_pkts(d, 429 bufs, BURST_SIZE*2); 430 431 if (unlikely(nb_ret == 0)) 432 continue; 433 app_stats.dist.ret_pkts += nb_ret; 434 435 uint16_t sent = rte_ring_enqueue_burst(out_r, 436 (void *)bufs, nb_ret, NULL); 437 app_stats.dist.sent_pkts += sent; 438 if (unlikely(sent < nb_ret)) { 439 app_stats.dist.enqdrop_pkts += nb_ret - sent; 440 RTE_LOG(DEBUG, DISTRAPP, 441 "%s:Packet loss due to full out ring\n", 442 __func__); 443 while (sent < nb_ret) 444 rte_pktmbuf_free(bufs[sent++]); 445 } 446 } 447 } 448 if (power_lib_initialised) 449 rte_power_exit(rte_lcore_id()); 450 printf("\nCore %u exiting distributor task.\n", rte_lcore_id()); 451 /* set tx threads quit flag */ 452 quit_signal = 1; 453 /* set worker threads quit flag */ 454 quit_signal_work = 1; 455 rte_distributor_flush(d); 456 /* Unblock any returns so workers can exit */ 457 rte_distributor_clear_returns(d); 458 return 0; 459 } 460 461 462 static int 463 lcore_tx(struct rte_ring *in_r) 464 { 465 static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; 466 const int socket_id = rte_socket_id(); 467 uint16_t port; 468 469 RTE_ETH_FOREACH_DEV(port) { 470 /* skip ports that are not enabled */ 471 if ((enabled_port_mask & (1 << port)) == 0) 472 continue; 473 474 if (rte_eth_dev_socket_id(port) >= 0 && 475 rte_eth_dev_socket_id(port) != socket_id) 476 printf("WARNING, port %u is on remote NUMA node to " 477 "TX thread.\n\tPerformance will not " 478 "be optimal.\n", port); 479 } 480 481 printf("\nCore %u doing packet TX.\n", rte_lcore_id()); 482 while (!quit_signal) { 483 484 RTE_ETH_FOREACH_DEV(port) { 485 /* skip ports that are not enabled */ 486 if ((enabled_port_mask & (1 << port)) == 0) 487 continue; 488 489 struct rte_mbuf *bufs[BURST_SIZE_TX]; 490 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 491 (void *)bufs, BURST_SIZE_TX, NULL); 492 app_stats.tx.dequeue_pkts += nb_rx; 493 494 /* if we get no traffic, flush anything we have */ 495 if (unlikely(nb_rx == 0)) { 496 flush_all_ports(tx_buffers); 497 continue; 498 } 499 500 /* for traffic we receive, queue it up for transmit */ 501 uint16_t i; 502 rte_prefetch_non_temporal((void *)bufs[0]); 503 rte_prefetch_non_temporal((void *)bufs[1]); 504 rte_prefetch_non_temporal((void *)bufs[2]); 505 for (i = 0; i < nb_rx; i++) { 506 struct output_buffer *outbuf; 507 uint8_t outp; 508 rte_prefetch_non_temporal((void *)bufs[i + 3]); 509 /* 510 * workers should update in_port to hold the 511 * output port value 512 */ 513 outp = bufs[i]->port; 514 /* skip ports that are not enabled */ 515 if ((enabled_port_mask & (1 << outp)) == 0) 516 continue; 517 518 outbuf = &tx_buffers[outp]; 519 outbuf->mbufs[outbuf->count++] = bufs[i]; 520 if (outbuf->count == BURST_SIZE_TX) 521 flush_one_port(outbuf, outp); 522 } 523 } 524 } 525 if (power_lib_initialised) 526 rte_power_exit(rte_lcore_id()); 527 printf("\nCore %u exiting tx task.\n", rte_lcore_id()); 528 return 0; 529 } 530 531 static void 532 int_handler(int sig_num) 533 { 534 printf("Exiting on signal %d\n", sig_num); 535 /* set quit flag for rx thread to exit */ 536 quit_signal_rx = 1; 537 } 538 539 static void 540 print_stats(void) 541 { 542 struct rte_eth_stats eth_stats; 543 unsigned int i, j; 544 545 RTE_ETH_FOREACH_DEV(i) { 546 rte_eth_stats_get(i, ð_stats); 547 app_stats.port_rx_pkts[i] = eth_stats.ipackets; 548 app_stats.port_tx_pkts[i] = eth_stats.opackets; 549 } 550 551 printf("\n\nRX Thread:\n"); 552 RTE_ETH_FOREACH_DEV(i) { 553 printf("Port %u Pktsin : %5.2f\n", i, 554 (app_stats.port_rx_pkts[i] - 555 prev_app_stats.port_rx_pkts[i])/1000000.0); 556 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i]; 557 } 558 printf(" - Received: %5.2f\n", 559 (app_stats.rx.rx_pkts - 560 prev_app_stats.rx.rx_pkts)/1000000.0); 561 printf(" - Returned: %5.2f\n", 562 (app_stats.rx.returned_pkts - 563 prev_app_stats.rx.returned_pkts)/1000000.0); 564 printf(" - Enqueued: %5.2f\n", 565 (app_stats.rx.enqueued_pkts - 566 prev_app_stats.rx.enqueued_pkts)/1000000.0); 567 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 568 (app_stats.rx.enqdrop_pkts - 569 prev_app_stats.rx.enqdrop_pkts)/1000000.0, 570 ANSI_COLOR_RESET); 571 572 if (!enable_lcore_rx_distributor) { 573 printf("Distributor thread:\n"); 574 printf(" - In: %5.2f\n", 575 (app_stats.dist.in_pkts - 576 prev_app_stats.dist.in_pkts)/1000000.0); 577 printf(" - Returned: %5.2f\n", 578 (app_stats.dist.ret_pkts - 579 prev_app_stats.dist.ret_pkts)/1000000.0); 580 printf(" - Sent: %5.2f\n", 581 (app_stats.dist.sent_pkts - 582 prev_app_stats.dist.sent_pkts)/1000000.0); 583 printf(" - Dropped %s%5.2f%s\n", ANSI_COLOR_RED, 584 (app_stats.dist.enqdrop_pkts - 585 prev_app_stats.dist.enqdrop_pkts)/1000000.0, 586 ANSI_COLOR_RESET); 587 } 588 589 printf("TX thread:\n"); 590 printf(" - Dequeued: %5.2f\n", 591 (app_stats.tx.dequeue_pkts - 592 prev_app_stats.tx.dequeue_pkts)/1000000.0); 593 RTE_ETH_FOREACH_DEV(i) { 594 printf("Port %u Pktsout: %5.2f\n", 595 i, (app_stats.port_tx_pkts[i] - 596 prev_app_stats.port_tx_pkts[i])/1000000.0); 597 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i]; 598 } 599 printf(" - Transmitted: %5.2f\n", 600 (app_stats.tx.tx_pkts - 601 prev_app_stats.tx.tx_pkts)/1000000.0); 602 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 603 (app_stats.tx.enqdrop_pkts - 604 prev_app_stats.tx.enqdrop_pkts)/1000000.0, 605 ANSI_COLOR_RESET); 606 607 prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts; 608 prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts; 609 prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts; 610 prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts; 611 prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts; 612 prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts; 613 prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts; 614 prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts; 615 prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts; 616 prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts; 617 prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts; 618 619 for (i = 0; i < num_workers; i++) { 620 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i, 621 (app_stats.worker_pkts[i] - 622 prev_app_stats.worker_pkts[i])/1000000.0); 623 for (j = 0; j < 8; j++) { 624 printf("%"PRIu64" ", app_stats.worker_bursts[i][j]); 625 app_stats.worker_bursts[i][j] = 0; 626 } 627 printf("\n"); 628 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i]; 629 } 630 } 631 632 static int 633 lcore_worker(struct lcore_params *p) 634 { 635 struct rte_distributor *d = p->d; 636 const unsigned id = p->worker_id; 637 unsigned int num = 0; 638 unsigned int i; 639 640 /* 641 * for single port, xor_val will be zero so we won't modify the output 642 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa 643 */ 644 const unsigned xor_val = (rte_eth_dev_count_avail() > 1); 645 alignas(RTE_CACHE_LINE_SIZE) struct rte_mbuf *buf[8]; 646 647 for (i = 0; i < 8; i++) 648 buf[i] = NULL; 649 650 app_stats.worker_pkts[p->worker_id] = 1; 651 652 printf("\nCore %u acting as worker core.\n", rte_lcore_id()); 653 while (!quit_signal_work) { 654 num = rte_distributor_get_pkt(d, id, buf, buf, num); 655 /* Do a little bit of work for each packet */ 656 for (i = 0; i < num; i++) { 657 uint64_t t = rte_rdtsc()+100; 658 659 while (rte_rdtsc() < t) 660 rte_pause(); 661 buf[i]->port ^= xor_val; 662 } 663 664 app_stats.worker_pkts[p->worker_id] += num; 665 if (num > 0) 666 app_stats.worker_bursts[p->worker_id][num-1]++; 667 } 668 if (power_lib_initialised) 669 rte_power_exit(rte_lcore_id()); 670 rte_free(p); 671 return 0; 672 } 673 674 static int 675 init_power_library(void) 676 { 677 int ret = 0, lcore_id; 678 RTE_LCORE_FOREACH_WORKER(lcore_id) { 679 /* init power management library */ 680 ret = rte_power_init(lcore_id); 681 if (ret) { 682 fprintf(stderr, 683 "Library initialization failed on core %u\n", 684 lcore_id); 685 /* 686 * Return on first failure, we'll fall back 687 * to non-power operation 688 */ 689 return ret; 690 } 691 } 692 return ret; 693 } 694 695 /* display usage */ 696 static void 697 print_usage(const char *prgname) 698 { 699 printf("%s [EAL options] -- -p PORTMASK [-c]\n" 700 " -p PORTMASK: hexadecimal bitmask of ports to configure\n" 701 " -c: Combines the RX core with the distribution core\n", 702 prgname); 703 } 704 705 static int 706 parse_portmask(const char *portmask) 707 { 708 char *end = NULL; 709 unsigned long pm; 710 711 /* parse hexadecimal string */ 712 pm = strtoul(portmask, &end, 16); 713 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 714 return 0; 715 716 return pm; 717 } 718 719 /* Parse the argument given in the command line of the application */ 720 static int 721 parse_args(int argc, char **argv) 722 { 723 int opt; 724 char **argvopt; 725 int option_index; 726 char *prgname = argv[0]; 727 static struct option lgopts[] = { 728 {NULL, 0, 0, 0} 729 }; 730 731 argvopt = argv; 732 enable_lcore_rx_distributor = false; 733 while ((opt = getopt_long(argc, argvopt, "cp:", 734 lgopts, &option_index)) != EOF) { 735 736 switch (opt) { 737 /* portmask */ 738 case 'p': 739 enabled_port_mask = parse_portmask(optarg); 740 if (enabled_port_mask == 0) { 741 printf("invalid portmask\n"); 742 print_usage(prgname); 743 return -1; 744 } 745 break; 746 747 case 'c': 748 enable_lcore_rx_distributor = true; 749 break; 750 751 default: 752 print_usage(prgname); 753 return -1; 754 } 755 } 756 757 if (optind <= 1) { 758 print_usage(prgname); 759 return -1; 760 } 761 762 argv[optind-1] = prgname; 763 764 optind = 1; /* reset getopt lib */ 765 return 0; 766 } 767 768 /* Main function, does initialization and calls the per-lcore functions */ 769 int 770 main(int argc, char *argv[]) 771 { 772 struct rte_mempool *mbuf_pool; 773 struct rte_distributor *d; 774 struct rte_ring *dist_tx_ring; 775 struct rte_ring *rx_dist_ring; 776 struct rte_power_core_capabilities lcore_cap; 777 unsigned int lcore_id, worker_id = 0; 778 int distr_core_id = -1, rx_core_id = -1, tx_core_id = -1; 779 unsigned nb_ports; 780 unsigned int min_cores; 781 uint16_t portid; 782 uint16_t nb_ports_available; 783 uint64_t t, freq; 784 785 /* catch ctrl-c so we can print on exit */ 786 signal(SIGINT, int_handler); 787 788 /* init EAL */ 789 int ret = rte_eal_init(argc, argv); 790 if (ret < 0) 791 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); 792 argc -= ret; 793 argv += ret; 794 795 /* parse application arguments (after the EAL ones) */ 796 ret = parse_args(argc, argv); 797 if (ret < 0) 798 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); 799 800 if (enable_lcore_rx_distributor) { 801 /* RX and distributor combined, 3 fixed function cores (stat, TX, at least 1 worker) */ 802 min_cores = 4; 803 num_workers = rte_lcore_count() - 3; 804 } else { 805 /* separate RX and distributor, 3 fixed function cores (stat, TX, at least 1 worker) */ 806 min_cores = 5; 807 num_workers = rte_lcore_count() - 4; 808 } 809 810 if (rte_lcore_count() < min_cores) 811 rte_exit(EXIT_FAILURE, "Error, This application needs at " 812 "least 4 logical cores to run:\n" 813 "1 lcore for stats (can be core 0)\n" 814 "1 or 2 lcore for packet RX and distribution\n" 815 "1 lcore for packet TX\n" 816 "and at least 1 lcore for worker threads\n"); 817 818 if (init_power_library() == 0) 819 power_lib_initialised = 1; 820 821 nb_ports = rte_eth_dev_count_avail(); 822 if (nb_ports == 0) 823 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 824 if (nb_ports != 1 && (nb_ports & 1)) 825 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 826 "when using a single port\n"); 827 828 mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", 829 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, 830 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 831 if (mbuf_pool == NULL) 832 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); 833 nb_ports_available = nb_ports; 834 835 /* initialize all ports */ 836 RTE_ETH_FOREACH_DEV(portid) { 837 /* skip ports that are not enabled */ 838 if ((enabled_port_mask & (1 << portid)) == 0) { 839 printf("\nSkipping disabled port %d\n", portid); 840 nb_ports_available--; 841 continue; 842 } 843 /* init port */ 844 printf("Initializing port %u... done\n", portid); 845 846 if (port_init(portid, mbuf_pool) != 0) 847 rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n", 848 portid); 849 } 850 851 if (!nb_ports_available) { 852 rte_exit(EXIT_FAILURE, 853 "All available ports are disabled. Please set portmask.\n"); 854 } 855 856 d = rte_distributor_create("PKT_DIST", rte_socket_id(), 857 num_workers, 858 RTE_DIST_ALG_BURST); 859 if (d == NULL) 860 rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); 861 862 /* 863 * scheduler ring is read by the transmitter core, and written to 864 * by scheduler core 865 */ 866 dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ, 867 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 868 if (dist_tx_ring == NULL) 869 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 870 871 rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ, 872 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 873 if (rx_dist_ring == NULL) 874 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 875 876 if (power_lib_initialised) { 877 /* 878 * Here we'll pre-assign lcore ids to the rx, tx and 879 * distributor workloads if there's higher frequency 880 * on those cores e.g. if Turbo Boost is enabled. 881 * It's also worth mentioning that it will assign cores in a 882 * specific order, so that if there's less than three 883 * available, the higher frequency cores will go to the 884 * distributor first, then rx, then tx. 885 */ 886 RTE_LCORE_FOREACH_WORKER(lcore_id) { 887 888 rte_power_get_capabilities(lcore_id, &lcore_cap); 889 890 if (lcore_cap.priority != 1) 891 continue; 892 893 if (distr_core_id < 0 && !enable_lcore_rx_distributor) { 894 distr_core_id = lcore_id; 895 printf("Distributor on priority core %d\n", 896 lcore_id); 897 continue; 898 } 899 if (rx_core_id < 0) { 900 rx_core_id = lcore_id; 901 printf("Rx on priority core %d\n", 902 lcore_id); 903 continue; 904 } 905 if (tx_core_id < 0) { 906 tx_core_id = lcore_id; 907 printf("Tx on priority core %d\n", 908 lcore_id); 909 continue; 910 } 911 } 912 } 913 914 /* 915 * If there's any of the key workloads left without an lcore_id 916 * after the high performing core assignment above, pre-assign 917 * them here. 918 */ 919 RTE_LCORE_FOREACH_WORKER(lcore_id) { 920 if (lcore_id == (unsigned int)distr_core_id || 921 lcore_id == (unsigned int)rx_core_id || 922 lcore_id == (unsigned int)tx_core_id) 923 continue; 924 if (distr_core_id < 0 && !enable_lcore_rx_distributor) { 925 distr_core_id = lcore_id; 926 printf("Distributor on core %d\n", lcore_id); 927 continue; 928 } 929 if (rx_core_id < 0) { 930 rx_core_id = lcore_id; 931 printf("Rx on core %d\n", lcore_id); 932 continue; 933 } 934 if (tx_core_id < 0) { 935 tx_core_id = lcore_id; 936 printf("Tx on core %d\n", lcore_id); 937 continue; 938 } 939 } 940 941 if (enable_lcore_rx_distributor) 942 printf(" tx id %d, rx id %d\n", 943 tx_core_id, 944 rx_core_id); 945 else 946 printf(" tx id %d, dist id %d, rx id %d\n", 947 tx_core_id, 948 distr_core_id, 949 rx_core_id); 950 951 /* 952 * Kick off all the worker threads first, avoiding the pre-assigned 953 * lcore_ids for tx, rx and distributor workloads. 954 */ 955 RTE_LCORE_FOREACH_WORKER(lcore_id) { 956 if (lcore_id == (unsigned int)distr_core_id || 957 lcore_id == (unsigned int)rx_core_id || 958 lcore_id == (unsigned int)tx_core_id) 959 continue; 960 printf("Starting thread %d as worker, lcore_id %d\n", 961 worker_id, lcore_id); 962 struct lcore_params *p = 963 rte_malloc(NULL, sizeof(*p), 0); 964 if (!p) 965 rte_panic("malloc failure\n"); 966 *p = (struct lcore_params){worker_id++, d, rx_dist_ring, 967 dist_tx_ring, mbuf_pool}; 968 969 rte_eal_remote_launch((lcore_function_t *)lcore_worker, 970 p, lcore_id); 971 } 972 973 /* Start tx core */ 974 rte_eal_remote_launch((lcore_function_t *)lcore_tx, 975 dist_tx_ring, tx_core_id); 976 977 /* Start distributor core */ 978 struct lcore_params *pd = NULL; 979 if (!enable_lcore_rx_distributor) { 980 pd = rte_malloc(NULL, sizeof(*pd), 0); 981 if (!pd) 982 rte_panic("malloc failure\n"); 983 *pd = (struct lcore_params){worker_id++, d, 984 rx_dist_ring, dist_tx_ring, mbuf_pool}; 985 rte_eal_remote_launch((lcore_function_t *)lcore_distributor, 986 pd, distr_core_id); 987 } 988 989 /* Start rx core */ 990 struct lcore_params *pr = 991 rte_malloc(NULL, sizeof(*pr), 0); 992 if (!pr) 993 rte_panic("malloc failure\n"); 994 *pr = (struct lcore_params){worker_id++, d, rx_dist_ring, 995 dist_tx_ring, mbuf_pool}; 996 if (enable_lcore_rx_distributor) 997 rte_eal_remote_launch((lcore_function_t *)lcore_rx_and_distributor, 998 pr, rx_core_id); 999 else 1000 rte_eal_remote_launch((lcore_function_t *)lcore_rx, 1001 pr, rx_core_id); 1002 1003 freq = rte_get_timer_hz(); 1004 t = rte_rdtsc() + freq; 1005 while (!quit_signal) { 1006 if (t < rte_rdtsc()) { 1007 print_stats(); 1008 t = rte_rdtsc() + freq; 1009 } 1010 usleep(1000); 1011 } 1012 1013 RTE_LCORE_FOREACH_WORKER(lcore_id) { 1014 if (rte_eal_wait_lcore(lcore_id) < 0) 1015 return -1; 1016 } 1017 1018 print_stats(); 1019 1020 rte_free(pd); 1021 rte_free(pr); 1022 1023 /* clean up the EAL */ 1024 rte_eal_cleanup(); 1025 1026 return 0; 1027 } 1028