1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2017 Intel Corporation 3 */ 4 5 #include <stdint.h> 6 #include <inttypes.h> 7 #include <unistd.h> 8 #include <signal.h> 9 #include <getopt.h> 10 11 #include <rte_eal.h> 12 #include <rte_ethdev.h> 13 #include <rte_cycles.h> 14 #include <rte_malloc.h> 15 #include <rte_debug.h> 16 #include <rte_prefetch.h> 17 #include <rte_distributor.h> 18 #include <rte_pause.h> 19 #include <rte_power.h> 20 21 #define RX_RING_SIZE 1024 22 #define TX_RING_SIZE 1024 23 #define NUM_MBUFS ((64*1024)-1) 24 #define MBUF_CACHE_SIZE 128 25 #define BURST_SIZE 64 26 #define SCHED_RX_RING_SZ 8192 27 #define SCHED_TX_RING_SZ 65536 28 #define BURST_SIZE_TX 32 29 30 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 31 32 #define ANSI_COLOR_RED "\x1b[31m" 33 #define ANSI_COLOR_RESET "\x1b[0m" 34 35 /* mask of enabled ports */ 36 static uint32_t enabled_port_mask; 37 volatile uint8_t quit_signal; 38 volatile uint8_t quit_signal_rx; 39 volatile uint8_t quit_signal_dist; 40 volatile uint8_t quit_signal_work; 41 unsigned int power_lib_initialised; 42 43 static volatile struct app_stats { 44 struct { 45 uint64_t rx_pkts; 46 uint64_t returned_pkts; 47 uint64_t enqueued_pkts; 48 uint64_t enqdrop_pkts; 49 } rx __rte_cache_aligned; 50 int pad1 __rte_cache_aligned; 51 52 struct { 53 uint64_t in_pkts; 54 uint64_t ret_pkts; 55 uint64_t sent_pkts; 56 uint64_t enqdrop_pkts; 57 } dist __rte_cache_aligned; 58 int pad2 __rte_cache_aligned; 59 60 struct { 61 uint64_t dequeue_pkts; 62 uint64_t tx_pkts; 63 uint64_t enqdrop_pkts; 64 } tx __rte_cache_aligned; 65 int pad3 __rte_cache_aligned; 66 67 uint64_t worker_pkts[64] __rte_cache_aligned; 68 69 int pad4 __rte_cache_aligned; 70 71 uint64_t worker_bursts[64][8] __rte_cache_aligned; 72 73 int pad5 __rte_cache_aligned; 74 75 uint64_t port_rx_pkts[64] __rte_cache_aligned; 76 uint64_t port_tx_pkts[64] __rte_cache_aligned; 77 } app_stats; 78 79 struct app_stats prev_app_stats; 80 81 static const struct rte_eth_conf port_conf_default = { 82 .rxmode = { 83 .mq_mode = ETH_MQ_RX_RSS, 84 }, 85 .txmode = { 86 .mq_mode = ETH_MQ_TX_NONE, 87 }, 88 .rx_adv_conf = { 89 .rss_conf = { 90 .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | 91 ETH_RSS_TCP | ETH_RSS_SCTP, 92 } 93 }, 94 }; 95 96 struct output_buffer { 97 unsigned count; 98 struct rte_mbuf *mbufs[BURST_SIZE]; 99 }; 100 101 static void print_stats(void); 102 103 /* 104 * Initialises a given port using global settings and with the rx buffers 105 * coming from the mbuf_pool passed as parameter 106 */ 107 static inline int 108 port_init(uint16_t port, struct rte_mempool *mbuf_pool) 109 { 110 struct rte_eth_conf port_conf = port_conf_default; 111 const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; 112 int retval; 113 uint16_t q; 114 uint16_t nb_rxd = RX_RING_SIZE; 115 uint16_t nb_txd = TX_RING_SIZE; 116 struct rte_eth_dev_info dev_info; 117 struct rte_eth_txconf txconf; 118 119 if (!rte_eth_dev_is_valid_port(port)) 120 return -1; 121 122 retval = rte_eth_dev_info_get(port, &dev_info); 123 if (retval != 0) { 124 printf("Error during getting device (port %u) info: %s\n", 125 port, strerror(-retval)); 126 return retval; 127 } 128 129 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 130 port_conf.txmode.offloads |= 131 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 132 133 port_conf.rx_adv_conf.rss_conf.rss_hf &= 134 dev_info.flow_type_rss_offloads; 135 if (port_conf.rx_adv_conf.rss_conf.rss_hf != 136 port_conf_default.rx_adv_conf.rss_conf.rss_hf) { 137 printf("Port %u modified RSS hash function based on hardware support," 138 "requested:%#"PRIx64" configured:%#"PRIx64"\n", 139 port, 140 port_conf_default.rx_adv_conf.rss_conf.rss_hf, 141 port_conf.rx_adv_conf.rss_conf.rss_hf); 142 } 143 144 retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); 145 if (retval != 0) 146 return retval; 147 148 retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd); 149 if (retval != 0) 150 return retval; 151 152 for (q = 0; q < rxRings; q++) { 153 retval = rte_eth_rx_queue_setup(port, q, nb_rxd, 154 rte_eth_dev_socket_id(port), 155 NULL, mbuf_pool); 156 if (retval < 0) 157 return retval; 158 } 159 160 txconf = dev_info.default_txconf; 161 txconf.offloads = port_conf.txmode.offloads; 162 for (q = 0; q < txRings; q++) { 163 retval = rte_eth_tx_queue_setup(port, q, nb_txd, 164 rte_eth_dev_socket_id(port), 165 &txconf); 166 if (retval < 0) 167 return retval; 168 } 169 170 retval = rte_eth_dev_start(port); 171 if (retval < 0) 172 return retval; 173 174 struct rte_eth_link link; 175 do { 176 retval = rte_eth_link_get_nowait(port, &link); 177 if (retval < 0) { 178 printf("Failed link get (port %u): %s\n", 179 port, rte_strerror(-retval)); 180 return retval; 181 } else if (link.link_status) 182 break; 183 184 printf("Waiting for Link up on port %"PRIu16"\n", port); 185 sleep(1); 186 } while (!link.link_status); 187 188 if (!link.link_status) { 189 printf("Link down on port %"PRIu16"\n", port); 190 return 0; 191 } 192 193 struct rte_ether_addr addr; 194 retval = rte_eth_macaddr_get(port, &addr); 195 if (retval < 0) { 196 printf("Failed to get MAC address (port %u): %s\n", 197 port, rte_strerror(-retval)); 198 return retval; 199 } 200 201 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 202 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 203 port, RTE_ETHER_ADDR_BYTES(&addr)); 204 205 retval = rte_eth_promiscuous_enable(port); 206 if (retval != 0) 207 return retval; 208 209 return 0; 210 } 211 212 struct lcore_params { 213 unsigned worker_id; 214 struct rte_distributor *d; 215 struct rte_ring *rx_dist_ring; 216 struct rte_ring *dist_tx_ring; 217 struct rte_mempool *mem_pool; 218 }; 219 220 static int 221 lcore_rx(struct lcore_params *p) 222 { 223 const uint16_t nb_ports = rte_eth_dev_count_avail(); 224 const int socket_id = rte_socket_id(); 225 uint16_t port; 226 struct rte_mbuf *bufs[BURST_SIZE*2]; 227 228 RTE_ETH_FOREACH_DEV(port) { 229 /* skip ports that are not enabled */ 230 if ((enabled_port_mask & (1 << port)) == 0) 231 continue; 232 233 if (rte_eth_dev_socket_id(port) > 0 && 234 rte_eth_dev_socket_id(port) != socket_id) 235 printf("WARNING, port %u is on remote NUMA node to " 236 "RX thread.\n\tPerformance will not " 237 "be optimal.\n", port); 238 } 239 240 printf("\nCore %u doing packet RX.\n", rte_lcore_id()); 241 port = 0; 242 while (!quit_signal_rx) { 243 244 /* skip ports that are not enabled */ 245 if ((enabled_port_mask & (1 << port)) == 0) { 246 if (++port == nb_ports) 247 port = 0; 248 continue; 249 } 250 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 251 BURST_SIZE); 252 if (unlikely(nb_rx == 0)) { 253 if (++port == nb_ports) 254 port = 0; 255 continue; 256 } 257 app_stats.rx.rx_pkts += nb_rx; 258 259 /* 260 * You can run the distributor on the rx core with this code. Returned 261 * packets are then send straight to the tx core. 262 */ 263 #if 0 264 rte_distributor_process(d, bufs, nb_rx); 265 const uint16_t nb_ret = rte_distributor_returned_pktsd, 266 bufs, BURST_SIZE*2); 267 268 app_stats.rx.returned_pkts += nb_ret; 269 if (unlikely(nb_ret == 0)) { 270 if (++port == nb_ports) 271 port = 0; 272 continue; 273 } 274 275 struct rte_ring *tx_ring = p->dist_tx_ring; 276 uint16_t sent = rte_ring_enqueue_burst(tx_ring, 277 (void *)bufs, nb_ret, NULL); 278 #else 279 uint16_t nb_ret = nb_rx; 280 /* 281 * Swap the following two lines if you want the rx traffic 282 * to go directly to tx, no distribution. 283 */ 284 struct rte_ring *out_ring = p->rx_dist_ring; 285 /* struct rte_ring *out_ring = p->dist_tx_ring; */ 286 287 uint16_t sent = rte_ring_enqueue_burst(out_ring, 288 (void *)bufs, nb_ret, NULL); 289 #endif 290 291 app_stats.rx.enqueued_pkts += sent; 292 if (unlikely(sent < nb_ret)) { 293 app_stats.rx.enqdrop_pkts += nb_ret - sent; 294 RTE_LOG_DP(DEBUG, DISTRAPP, 295 "%s:Packet loss due to full ring\n", __func__); 296 while (sent < nb_ret) 297 rte_pktmbuf_free(bufs[sent++]); 298 } 299 if (++port == nb_ports) 300 port = 0; 301 } 302 if (power_lib_initialised) 303 rte_power_exit(rte_lcore_id()); 304 /* set worker & tx threads quit flag */ 305 printf("\nCore %u exiting rx task.\n", rte_lcore_id()); 306 quit_signal = 1; 307 return 0; 308 } 309 310 static inline void 311 flush_one_port(struct output_buffer *outbuf, uint8_t outp) 312 { 313 unsigned int nb_tx = rte_eth_tx_burst(outp, 0, 314 outbuf->mbufs, outbuf->count); 315 app_stats.tx.tx_pkts += outbuf->count; 316 317 if (unlikely(nb_tx < outbuf->count)) { 318 app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx; 319 do { 320 rte_pktmbuf_free(outbuf->mbufs[nb_tx]); 321 } while (++nb_tx < outbuf->count); 322 } 323 outbuf->count = 0; 324 } 325 326 static inline void 327 flush_all_ports(struct output_buffer *tx_buffers) 328 { 329 uint16_t outp; 330 331 RTE_ETH_FOREACH_DEV(outp) { 332 /* skip ports that are not enabled */ 333 if ((enabled_port_mask & (1 << outp)) == 0) 334 continue; 335 336 if (tx_buffers[outp].count == 0) 337 continue; 338 339 flush_one_port(&tx_buffers[outp], outp); 340 } 341 } 342 343 344 345 static int 346 lcore_distributor(struct lcore_params *p) 347 { 348 struct rte_ring *in_r = p->rx_dist_ring; 349 struct rte_ring *out_r = p->dist_tx_ring; 350 struct rte_mbuf *bufs[BURST_SIZE * 4]; 351 struct rte_distributor *d = p->d; 352 353 printf("\nCore %u acting as distributor core.\n", rte_lcore_id()); 354 while (!quit_signal_dist) { 355 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 356 (void *)bufs, BURST_SIZE*1, NULL); 357 if (nb_rx) { 358 app_stats.dist.in_pkts += nb_rx; 359 360 /* Distribute the packets */ 361 rte_distributor_process(d, bufs, nb_rx); 362 /* Handle Returns */ 363 const uint16_t nb_ret = 364 rte_distributor_returned_pkts(d, 365 bufs, BURST_SIZE*2); 366 367 if (unlikely(nb_ret == 0)) 368 continue; 369 app_stats.dist.ret_pkts += nb_ret; 370 371 uint16_t sent = rte_ring_enqueue_burst(out_r, 372 (void *)bufs, nb_ret, NULL); 373 app_stats.dist.sent_pkts += sent; 374 if (unlikely(sent < nb_ret)) { 375 app_stats.dist.enqdrop_pkts += nb_ret - sent; 376 RTE_LOG(DEBUG, DISTRAPP, 377 "%s:Packet loss due to full out ring\n", 378 __func__); 379 while (sent < nb_ret) 380 rte_pktmbuf_free(bufs[sent++]); 381 } 382 } 383 } 384 printf("\nCore %u exiting distributor task.\n", rte_lcore_id()); 385 quit_signal_work = 1; 386 if (power_lib_initialised) 387 rte_power_exit(rte_lcore_id()); 388 rte_distributor_flush(d); 389 /* Unblock any returns so workers can exit */ 390 rte_distributor_clear_returns(d); 391 quit_signal_rx = 1; 392 return 0; 393 } 394 395 396 static int 397 lcore_tx(struct rte_ring *in_r) 398 { 399 static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; 400 const int socket_id = rte_socket_id(); 401 uint16_t port; 402 403 RTE_ETH_FOREACH_DEV(port) { 404 /* skip ports that are not enabled */ 405 if ((enabled_port_mask & (1 << port)) == 0) 406 continue; 407 408 if (rte_eth_dev_socket_id(port) > 0 && 409 rte_eth_dev_socket_id(port) != socket_id) 410 printf("WARNING, port %u is on remote NUMA node to " 411 "TX thread.\n\tPerformance will not " 412 "be optimal.\n", port); 413 } 414 415 printf("\nCore %u doing packet TX.\n", rte_lcore_id()); 416 while (!quit_signal) { 417 418 RTE_ETH_FOREACH_DEV(port) { 419 /* skip ports that are not enabled */ 420 if ((enabled_port_mask & (1 << port)) == 0) 421 continue; 422 423 struct rte_mbuf *bufs[BURST_SIZE_TX]; 424 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 425 (void *)bufs, BURST_SIZE_TX, NULL); 426 app_stats.tx.dequeue_pkts += nb_rx; 427 428 /* if we get no traffic, flush anything we have */ 429 if (unlikely(nb_rx == 0)) { 430 flush_all_ports(tx_buffers); 431 continue; 432 } 433 434 /* for traffic we receive, queue it up for transmit */ 435 uint16_t i; 436 rte_prefetch_non_temporal((void *)bufs[0]); 437 rte_prefetch_non_temporal((void *)bufs[1]); 438 rte_prefetch_non_temporal((void *)bufs[2]); 439 for (i = 0; i < nb_rx; i++) { 440 struct output_buffer *outbuf; 441 uint8_t outp; 442 rte_prefetch_non_temporal((void *)bufs[i + 3]); 443 /* 444 * workers should update in_port to hold the 445 * output port value 446 */ 447 outp = bufs[i]->port; 448 /* skip ports that are not enabled */ 449 if ((enabled_port_mask & (1 << outp)) == 0) 450 continue; 451 452 outbuf = &tx_buffers[outp]; 453 outbuf->mbufs[outbuf->count++] = bufs[i]; 454 if (outbuf->count == BURST_SIZE_TX) 455 flush_one_port(outbuf, outp); 456 } 457 } 458 } 459 if (power_lib_initialised) 460 rte_power_exit(rte_lcore_id()); 461 printf("\nCore %u exiting tx task.\n", rte_lcore_id()); 462 return 0; 463 } 464 465 static void 466 int_handler(int sig_num) 467 { 468 printf("Exiting on signal %d\n", sig_num); 469 /* set quit flag for rx thread to exit */ 470 quit_signal_dist = 1; 471 } 472 473 static void 474 print_stats(void) 475 { 476 struct rte_eth_stats eth_stats; 477 unsigned int i, j; 478 const unsigned int num_workers = rte_lcore_count() - 4; 479 480 RTE_ETH_FOREACH_DEV(i) { 481 rte_eth_stats_get(i, ð_stats); 482 app_stats.port_rx_pkts[i] = eth_stats.ipackets; 483 app_stats.port_tx_pkts[i] = eth_stats.opackets; 484 } 485 486 printf("\n\nRX Thread:\n"); 487 RTE_ETH_FOREACH_DEV(i) { 488 printf("Port %u Pktsin : %5.2f\n", i, 489 (app_stats.port_rx_pkts[i] - 490 prev_app_stats.port_rx_pkts[i])/1000000.0); 491 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i]; 492 } 493 printf(" - Received: %5.2f\n", 494 (app_stats.rx.rx_pkts - 495 prev_app_stats.rx.rx_pkts)/1000000.0); 496 printf(" - Returned: %5.2f\n", 497 (app_stats.rx.returned_pkts - 498 prev_app_stats.rx.returned_pkts)/1000000.0); 499 printf(" - Enqueued: %5.2f\n", 500 (app_stats.rx.enqueued_pkts - 501 prev_app_stats.rx.enqueued_pkts)/1000000.0); 502 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 503 (app_stats.rx.enqdrop_pkts - 504 prev_app_stats.rx.enqdrop_pkts)/1000000.0, 505 ANSI_COLOR_RESET); 506 507 printf("Distributor thread:\n"); 508 printf(" - In: %5.2f\n", 509 (app_stats.dist.in_pkts - 510 prev_app_stats.dist.in_pkts)/1000000.0); 511 printf(" - Returned: %5.2f\n", 512 (app_stats.dist.ret_pkts - 513 prev_app_stats.dist.ret_pkts)/1000000.0); 514 printf(" - Sent: %5.2f\n", 515 (app_stats.dist.sent_pkts - 516 prev_app_stats.dist.sent_pkts)/1000000.0); 517 printf(" - Dropped %s%5.2f%s\n", ANSI_COLOR_RED, 518 (app_stats.dist.enqdrop_pkts - 519 prev_app_stats.dist.enqdrop_pkts)/1000000.0, 520 ANSI_COLOR_RESET); 521 522 printf("TX thread:\n"); 523 printf(" - Dequeued: %5.2f\n", 524 (app_stats.tx.dequeue_pkts - 525 prev_app_stats.tx.dequeue_pkts)/1000000.0); 526 RTE_ETH_FOREACH_DEV(i) { 527 printf("Port %u Pktsout: %5.2f\n", 528 i, (app_stats.port_tx_pkts[i] - 529 prev_app_stats.port_tx_pkts[i])/1000000.0); 530 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i]; 531 } 532 printf(" - Transmitted: %5.2f\n", 533 (app_stats.tx.tx_pkts - 534 prev_app_stats.tx.tx_pkts)/1000000.0); 535 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 536 (app_stats.tx.enqdrop_pkts - 537 prev_app_stats.tx.enqdrop_pkts)/1000000.0, 538 ANSI_COLOR_RESET); 539 540 prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts; 541 prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts; 542 prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts; 543 prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts; 544 prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts; 545 prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts; 546 prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts; 547 prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts; 548 prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts; 549 prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts; 550 prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts; 551 552 for (i = 0; i < num_workers; i++) { 553 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i, 554 (app_stats.worker_pkts[i] - 555 prev_app_stats.worker_pkts[i])/1000000.0); 556 for (j = 0; j < 8; j++) { 557 printf("%"PRIu64" ", app_stats.worker_bursts[i][j]); 558 app_stats.worker_bursts[i][j] = 0; 559 } 560 printf("\n"); 561 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i]; 562 } 563 } 564 565 static int 566 lcore_worker(struct lcore_params *p) 567 { 568 struct rte_distributor *d = p->d; 569 const unsigned id = p->worker_id; 570 unsigned int num = 0; 571 unsigned int i; 572 573 /* 574 * for single port, xor_val will be zero so we won't modify the output 575 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa 576 */ 577 const unsigned xor_val = (rte_eth_dev_count_avail() > 1); 578 struct rte_mbuf *buf[8] __rte_cache_aligned; 579 580 for (i = 0; i < 8; i++) 581 buf[i] = NULL; 582 583 app_stats.worker_pkts[p->worker_id] = 1; 584 585 printf("\nCore %u acting as worker core.\n", rte_lcore_id()); 586 while (!quit_signal_work) { 587 num = rte_distributor_get_pkt(d, id, buf, buf, num); 588 /* Do a little bit of work for each packet */ 589 for (i = 0; i < num; i++) { 590 uint64_t t = rte_rdtsc()+100; 591 592 while (rte_rdtsc() < t) 593 rte_pause(); 594 buf[i]->port ^= xor_val; 595 } 596 597 app_stats.worker_pkts[p->worker_id] += num; 598 if (num > 0) 599 app_stats.worker_bursts[p->worker_id][num-1]++; 600 } 601 if (power_lib_initialised) 602 rte_power_exit(rte_lcore_id()); 603 rte_free(p); 604 return 0; 605 } 606 607 static int 608 init_power_library(void) 609 { 610 int ret = 0, lcore_id; 611 RTE_LCORE_FOREACH_WORKER(lcore_id) { 612 /* init power management library */ 613 ret = rte_power_init(lcore_id); 614 if (ret) { 615 RTE_LOG(ERR, POWER, 616 "Library initialization failed on core %u\n", 617 lcore_id); 618 /* 619 * Return on first failure, we'll fall back 620 * to non-power operation 621 */ 622 return ret; 623 } 624 } 625 return ret; 626 } 627 628 /* display usage */ 629 static void 630 print_usage(const char *prgname) 631 { 632 printf("%s [EAL options] -- -p PORTMASK\n" 633 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 634 prgname); 635 } 636 637 static int 638 parse_portmask(const char *portmask) 639 { 640 char *end = NULL; 641 unsigned long pm; 642 643 /* parse hexadecimal string */ 644 pm = strtoul(portmask, &end, 16); 645 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 646 return 0; 647 648 return pm; 649 } 650 651 /* Parse the argument given in the command line of the application */ 652 static int 653 parse_args(int argc, char **argv) 654 { 655 int opt; 656 char **argvopt; 657 int option_index; 658 char *prgname = argv[0]; 659 static struct option lgopts[] = { 660 {NULL, 0, 0, 0} 661 }; 662 663 argvopt = argv; 664 665 while ((opt = getopt_long(argc, argvopt, "p:", 666 lgopts, &option_index)) != EOF) { 667 668 switch (opt) { 669 /* portmask */ 670 case 'p': 671 enabled_port_mask = parse_portmask(optarg); 672 if (enabled_port_mask == 0) { 673 printf("invalid portmask\n"); 674 print_usage(prgname); 675 return -1; 676 } 677 break; 678 679 default: 680 print_usage(prgname); 681 return -1; 682 } 683 } 684 685 if (optind <= 1) { 686 print_usage(prgname); 687 return -1; 688 } 689 690 argv[optind-1] = prgname; 691 692 optind = 1; /* reset getopt lib */ 693 return 0; 694 } 695 696 /* Main function, does initialization and calls the per-lcore functions */ 697 int 698 main(int argc, char *argv[]) 699 { 700 struct rte_mempool *mbuf_pool; 701 struct rte_distributor *d; 702 struct rte_ring *dist_tx_ring; 703 struct rte_ring *rx_dist_ring; 704 struct rte_power_core_capabilities lcore_cap; 705 unsigned int lcore_id, worker_id = 0; 706 int distr_core_id = -1, rx_core_id = -1, tx_core_id = -1; 707 unsigned nb_ports; 708 uint16_t portid; 709 uint16_t nb_ports_available; 710 uint64_t t, freq; 711 712 /* catch ctrl-c so we can print on exit */ 713 signal(SIGINT, int_handler); 714 715 /* init EAL */ 716 int ret = rte_eal_init(argc, argv); 717 if (ret < 0) 718 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); 719 argc -= ret; 720 argv += ret; 721 722 /* parse application arguments (after the EAL ones) */ 723 ret = parse_args(argc, argv); 724 if (ret < 0) 725 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); 726 727 if (rte_lcore_count() < 5) 728 rte_exit(EXIT_FAILURE, "Error, This application needs at " 729 "least 5 logical cores to run:\n" 730 "1 lcore for stats (can be core 0)\n" 731 "1 lcore for packet RX\n" 732 "1 lcore for distribution\n" 733 "1 lcore for packet TX\n" 734 "and at least 1 lcore for worker threads\n"); 735 736 if (init_power_library() == 0) 737 power_lib_initialised = 1; 738 739 nb_ports = rte_eth_dev_count_avail(); 740 if (nb_ports == 0) 741 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 742 if (nb_ports != 1 && (nb_ports & 1)) 743 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 744 "when using a single port\n"); 745 746 mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", 747 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, 748 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 749 if (mbuf_pool == NULL) 750 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); 751 nb_ports_available = nb_ports; 752 753 /* initialize all ports */ 754 RTE_ETH_FOREACH_DEV(portid) { 755 /* skip ports that are not enabled */ 756 if ((enabled_port_mask & (1 << portid)) == 0) { 757 printf("\nSkipping disabled port %d\n", portid); 758 nb_ports_available--; 759 continue; 760 } 761 /* init port */ 762 printf("Initializing port %u... done\n", portid); 763 764 if (port_init(portid, mbuf_pool) != 0) 765 rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n", 766 portid); 767 } 768 769 if (!nb_ports_available) { 770 rte_exit(EXIT_FAILURE, 771 "All available ports are disabled. Please set portmask.\n"); 772 } 773 774 d = rte_distributor_create("PKT_DIST", rte_socket_id(), 775 rte_lcore_count() - 4, 776 RTE_DIST_ALG_BURST); 777 if (d == NULL) 778 rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); 779 780 /* 781 * scheduler ring is read by the transmitter core, and written to 782 * by scheduler core 783 */ 784 dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ, 785 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 786 if (dist_tx_ring == NULL) 787 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 788 789 rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ, 790 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 791 if (rx_dist_ring == NULL) 792 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 793 794 if (power_lib_initialised) { 795 /* 796 * Here we'll pre-assign lcore ids to the rx, tx and 797 * distributor workloads if there's higher frequency 798 * on those cores e.g. if Turbo Boost is enabled. 799 * It's also worth mentioning that it will assign cores in a 800 * specific order, so that if there's less than three 801 * available, the higher frequency cores will go to the 802 * distributor first, then rx, then tx. 803 */ 804 RTE_LCORE_FOREACH_WORKER(lcore_id) { 805 806 rte_power_get_capabilities(lcore_id, &lcore_cap); 807 808 if (lcore_cap.priority != 1) 809 continue; 810 811 if (distr_core_id < 0) { 812 distr_core_id = lcore_id; 813 printf("Distributor on priority core %d\n", 814 lcore_id); 815 continue; 816 } 817 if (rx_core_id < 0) { 818 rx_core_id = lcore_id; 819 printf("Rx on priority core %d\n", 820 lcore_id); 821 continue; 822 } 823 if (tx_core_id < 0) { 824 tx_core_id = lcore_id; 825 printf("Tx on priority core %d\n", 826 lcore_id); 827 continue; 828 } 829 } 830 } 831 832 /* 833 * If there's any of the key workloads left without an lcore_id 834 * after the high performing core assignment above, pre-assign 835 * them here. 836 */ 837 RTE_LCORE_FOREACH_WORKER(lcore_id) { 838 if (lcore_id == (unsigned int)distr_core_id || 839 lcore_id == (unsigned int)rx_core_id || 840 lcore_id == (unsigned int)tx_core_id) 841 continue; 842 if (distr_core_id < 0) { 843 distr_core_id = lcore_id; 844 printf("Distributor on core %d\n", lcore_id); 845 continue; 846 } 847 if (rx_core_id < 0) { 848 rx_core_id = lcore_id; 849 printf("Rx on core %d\n", lcore_id); 850 continue; 851 } 852 if (tx_core_id < 0) { 853 tx_core_id = lcore_id; 854 printf("Tx on core %d\n", lcore_id); 855 continue; 856 } 857 } 858 859 printf(" tx id %d, dist id %d, rx id %d\n", 860 tx_core_id, 861 distr_core_id, 862 rx_core_id); 863 864 /* 865 * Kick off all the worker threads first, avoiding the pre-assigned 866 * lcore_ids for tx, rx and distributor workloads. 867 */ 868 RTE_LCORE_FOREACH_WORKER(lcore_id) { 869 if (lcore_id == (unsigned int)distr_core_id || 870 lcore_id == (unsigned int)rx_core_id || 871 lcore_id == (unsigned int)tx_core_id) 872 continue; 873 printf("Starting thread %d as worker, lcore_id %d\n", 874 worker_id, lcore_id); 875 struct lcore_params *p = 876 rte_malloc(NULL, sizeof(*p), 0); 877 if (!p) 878 rte_panic("malloc failure\n"); 879 *p = (struct lcore_params){worker_id++, d, rx_dist_ring, 880 dist_tx_ring, mbuf_pool}; 881 882 rte_eal_remote_launch((lcore_function_t *)lcore_worker, 883 p, lcore_id); 884 } 885 886 /* Start tx core */ 887 rte_eal_remote_launch((lcore_function_t *)lcore_tx, 888 dist_tx_ring, tx_core_id); 889 890 /* Start distributor core */ 891 struct lcore_params *pd = 892 rte_malloc(NULL, sizeof(*pd), 0); 893 if (!pd) 894 rte_panic("malloc failure\n"); 895 *pd = (struct lcore_params){worker_id++, d, 896 rx_dist_ring, dist_tx_ring, mbuf_pool}; 897 rte_eal_remote_launch( 898 (lcore_function_t *)lcore_distributor, 899 pd, distr_core_id); 900 901 /* Start rx core */ 902 struct lcore_params *pr = 903 rte_malloc(NULL, sizeof(*pr), 0); 904 if (!pr) 905 rte_panic("malloc failure\n"); 906 *pr = (struct lcore_params){worker_id++, d, rx_dist_ring, 907 dist_tx_ring, mbuf_pool}; 908 rte_eal_remote_launch((lcore_function_t *)lcore_rx, 909 pr, rx_core_id); 910 911 freq = rte_get_timer_hz(); 912 t = rte_rdtsc() + freq; 913 while (!quit_signal_dist) { 914 if (t < rte_rdtsc()) { 915 print_stats(); 916 t = rte_rdtsc() + freq; 917 } 918 usleep(1000); 919 } 920 921 RTE_LCORE_FOREACH_WORKER(lcore_id) { 922 if (rte_eal_wait_lcore(lcore_id) < 0) 923 return -1; 924 } 925 926 print_stats(); 927 928 rte_free(pd); 929 rte_free(pr); 930 931 /* clean up the EAL */ 932 rte_eal_cleanup(); 933 934 return 0; 935 } 936