1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2017 Intel Corporation 3 */ 4 5 #include <stdint.h> 6 #include <inttypes.h> 7 #include <unistd.h> 8 #include <signal.h> 9 #include <getopt.h> 10 11 #include <rte_eal.h> 12 #include <rte_ethdev.h> 13 #include <rte_cycles.h> 14 #include <rte_malloc.h> 15 #include <rte_debug.h> 16 #include <rte_prefetch.h> 17 #include <rte_distributor.h> 18 #include <rte_pause.h> 19 #include <rte_power.h> 20 21 #define RX_RING_SIZE 1024 22 #define TX_RING_SIZE 1024 23 #define NUM_MBUFS ((64*1024)-1) 24 #define MBUF_CACHE_SIZE 128 25 #define BURST_SIZE 64 26 #define SCHED_RX_RING_SZ 8192 27 #define SCHED_TX_RING_SZ 65536 28 #define BURST_SIZE_TX 32 29 30 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 31 32 #define ANSI_COLOR_RED "\x1b[31m" 33 #define ANSI_COLOR_RESET "\x1b[0m" 34 35 /* mask of enabled ports */ 36 static uint32_t enabled_port_mask; 37 volatile uint8_t quit_signal; 38 volatile uint8_t quit_signal_rx; 39 volatile uint8_t quit_signal_dist; 40 volatile uint8_t quit_signal_work; 41 unsigned int power_lib_initialised; 42 43 static volatile struct app_stats { 44 struct { 45 uint64_t rx_pkts; 46 uint64_t returned_pkts; 47 uint64_t enqueued_pkts; 48 uint64_t enqdrop_pkts; 49 } rx __rte_cache_aligned; 50 int pad1 __rte_cache_aligned; 51 52 struct { 53 uint64_t in_pkts; 54 uint64_t ret_pkts; 55 uint64_t sent_pkts; 56 uint64_t enqdrop_pkts; 57 } dist __rte_cache_aligned; 58 int pad2 __rte_cache_aligned; 59 60 struct { 61 uint64_t dequeue_pkts; 62 uint64_t tx_pkts; 63 uint64_t enqdrop_pkts; 64 } tx __rte_cache_aligned; 65 int pad3 __rte_cache_aligned; 66 67 uint64_t worker_pkts[64] __rte_cache_aligned; 68 69 int pad4 __rte_cache_aligned; 70 71 uint64_t worker_bursts[64][8] __rte_cache_aligned; 72 73 int pad5 __rte_cache_aligned; 74 75 uint64_t port_rx_pkts[64] __rte_cache_aligned; 76 uint64_t port_tx_pkts[64] __rte_cache_aligned; 77 } app_stats; 78 79 struct app_stats prev_app_stats; 80 81 static const struct rte_eth_conf port_conf_default = { 82 .rxmode = { 83 .mq_mode = ETH_MQ_RX_RSS, 84 .max_rx_pkt_len = RTE_ETHER_MAX_LEN, 85 }, 86 .txmode = { 87 .mq_mode = ETH_MQ_TX_NONE, 88 }, 89 .rx_adv_conf = { 90 .rss_conf = { 91 .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | 92 ETH_RSS_TCP | ETH_RSS_SCTP, 93 } 94 }, 95 }; 96 97 struct output_buffer { 98 unsigned count; 99 struct rte_mbuf *mbufs[BURST_SIZE]; 100 }; 101 102 static void print_stats(void); 103 104 /* 105 * Initialises a given port using global settings and with the rx buffers 106 * coming from the mbuf_pool passed as parameter 107 */ 108 static inline int 109 port_init(uint16_t port, struct rte_mempool *mbuf_pool) 110 { 111 struct rte_eth_conf port_conf = port_conf_default; 112 const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; 113 int retval; 114 uint16_t q; 115 uint16_t nb_rxd = RX_RING_SIZE; 116 uint16_t nb_txd = TX_RING_SIZE; 117 struct rte_eth_dev_info dev_info; 118 struct rte_eth_txconf txconf; 119 120 if (!rte_eth_dev_is_valid_port(port)) 121 return -1; 122 123 retval = rte_eth_dev_info_get(port, &dev_info); 124 if (retval != 0) { 125 printf("Error during getting device (port %u) info: %s\n", 126 port, strerror(-retval)); 127 return retval; 128 } 129 130 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 131 port_conf.txmode.offloads |= 132 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 133 134 port_conf.rx_adv_conf.rss_conf.rss_hf &= 135 dev_info.flow_type_rss_offloads; 136 if (port_conf.rx_adv_conf.rss_conf.rss_hf != 137 port_conf_default.rx_adv_conf.rss_conf.rss_hf) { 138 printf("Port %u modified RSS hash function based on hardware support," 139 "requested:%#"PRIx64" configured:%#"PRIx64"\n", 140 port, 141 port_conf_default.rx_adv_conf.rss_conf.rss_hf, 142 port_conf.rx_adv_conf.rss_conf.rss_hf); 143 } 144 145 retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); 146 if (retval != 0) 147 return retval; 148 149 retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd); 150 if (retval != 0) 151 return retval; 152 153 for (q = 0; q < rxRings; q++) { 154 retval = rte_eth_rx_queue_setup(port, q, nb_rxd, 155 rte_eth_dev_socket_id(port), 156 NULL, mbuf_pool); 157 if (retval < 0) 158 return retval; 159 } 160 161 txconf = dev_info.default_txconf; 162 txconf.offloads = port_conf.txmode.offloads; 163 for (q = 0; q < txRings; q++) { 164 retval = rte_eth_tx_queue_setup(port, q, nb_txd, 165 rte_eth_dev_socket_id(port), 166 &txconf); 167 if (retval < 0) 168 return retval; 169 } 170 171 retval = rte_eth_dev_start(port); 172 if (retval < 0) 173 return retval; 174 175 struct rte_eth_link link; 176 do { 177 retval = rte_eth_link_get_nowait(port, &link); 178 if (retval < 0) { 179 printf("Failed link get (port %u): %s\n", 180 port, rte_strerror(-retval)); 181 return retval; 182 } else if (link.link_status) 183 break; 184 185 printf("Waiting for Link up on port %"PRIu16"\n", port); 186 sleep(1); 187 } while (!link.link_status); 188 189 if (!link.link_status) { 190 printf("Link down on port %"PRIu16"\n", port); 191 return 0; 192 } 193 194 struct rte_ether_addr addr; 195 retval = rte_eth_macaddr_get(port, &addr); 196 if (retval < 0) { 197 printf("Failed to get MAC address (port %u): %s\n", 198 port, rte_strerror(-retval)); 199 return retval; 200 } 201 202 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 203 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 204 port, RTE_ETHER_ADDR_BYTES(&addr)); 205 206 retval = rte_eth_promiscuous_enable(port); 207 if (retval != 0) 208 return retval; 209 210 return 0; 211 } 212 213 struct lcore_params { 214 unsigned worker_id; 215 struct rte_distributor *d; 216 struct rte_ring *rx_dist_ring; 217 struct rte_ring *dist_tx_ring; 218 struct rte_mempool *mem_pool; 219 }; 220 221 static int 222 lcore_rx(struct lcore_params *p) 223 { 224 const uint16_t nb_ports = rte_eth_dev_count_avail(); 225 const int socket_id = rte_socket_id(); 226 uint16_t port; 227 struct rte_mbuf *bufs[BURST_SIZE*2]; 228 229 RTE_ETH_FOREACH_DEV(port) { 230 /* skip ports that are not enabled */ 231 if ((enabled_port_mask & (1 << port)) == 0) 232 continue; 233 234 if (rte_eth_dev_socket_id(port) > 0 && 235 rte_eth_dev_socket_id(port) != socket_id) 236 printf("WARNING, port %u is on remote NUMA node to " 237 "RX thread.\n\tPerformance will not " 238 "be optimal.\n", port); 239 } 240 241 printf("\nCore %u doing packet RX.\n", rte_lcore_id()); 242 port = 0; 243 while (!quit_signal_rx) { 244 245 /* skip ports that are not enabled */ 246 if ((enabled_port_mask & (1 << port)) == 0) { 247 if (++port == nb_ports) 248 port = 0; 249 continue; 250 } 251 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 252 BURST_SIZE); 253 if (unlikely(nb_rx == 0)) { 254 if (++port == nb_ports) 255 port = 0; 256 continue; 257 } 258 app_stats.rx.rx_pkts += nb_rx; 259 260 /* 261 * You can run the distributor on the rx core with this code. Returned 262 * packets are then send straight to the tx core. 263 */ 264 #if 0 265 rte_distributor_process(d, bufs, nb_rx); 266 const uint16_t nb_ret = rte_distributor_returned_pktsd, 267 bufs, BURST_SIZE*2); 268 269 app_stats.rx.returned_pkts += nb_ret; 270 if (unlikely(nb_ret == 0)) { 271 if (++port == nb_ports) 272 port = 0; 273 continue; 274 } 275 276 struct rte_ring *tx_ring = p->dist_tx_ring; 277 uint16_t sent = rte_ring_enqueue_burst(tx_ring, 278 (void *)bufs, nb_ret, NULL); 279 #else 280 uint16_t nb_ret = nb_rx; 281 /* 282 * Swap the following two lines if you want the rx traffic 283 * to go directly to tx, no distribution. 284 */ 285 struct rte_ring *out_ring = p->rx_dist_ring; 286 /* struct rte_ring *out_ring = p->dist_tx_ring; */ 287 288 uint16_t sent = rte_ring_enqueue_burst(out_ring, 289 (void *)bufs, nb_ret, NULL); 290 #endif 291 292 app_stats.rx.enqueued_pkts += sent; 293 if (unlikely(sent < nb_ret)) { 294 app_stats.rx.enqdrop_pkts += nb_ret - sent; 295 RTE_LOG_DP(DEBUG, DISTRAPP, 296 "%s:Packet loss due to full ring\n", __func__); 297 while (sent < nb_ret) 298 rte_pktmbuf_free(bufs[sent++]); 299 } 300 if (++port == nb_ports) 301 port = 0; 302 } 303 if (power_lib_initialised) 304 rte_power_exit(rte_lcore_id()); 305 /* set worker & tx threads quit flag */ 306 printf("\nCore %u exiting rx task.\n", rte_lcore_id()); 307 quit_signal = 1; 308 return 0; 309 } 310 311 static inline void 312 flush_one_port(struct output_buffer *outbuf, uint8_t outp) 313 { 314 unsigned int nb_tx = rte_eth_tx_burst(outp, 0, 315 outbuf->mbufs, outbuf->count); 316 app_stats.tx.tx_pkts += outbuf->count; 317 318 if (unlikely(nb_tx < outbuf->count)) { 319 app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx; 320 do { 321 rte_pktmbuf_free(outbuf->mbufs[nb_tx]); 322 } while (++nb_tx < outbuf->count); 323 } 324 outbuf->count = 0; 325 } 326 327 static inline void 328 flush_all_ports(struct output_buffer *tx_buffers) 329 { 330 uint16_t outp; 331 332 RTE_ETH_FOREACH_DEV(outp) { 333 /* skip ports that are not enabled */ 334 if ((enabled_port_mask & (1 << outp)) == 0) 335 continue; 336 337 if (tx_buffers[outp].count == 0) 338 continue; 339 340 flush_one_port(&tx_buffers[outp], outp); 341 } 342 } 343 344 345 346 static int 347 lcore_distributor(struct lcore_params *p) 348 { 349 struct rte_ring *in_r = p->rx_dist_ring; 350 struct rte_ring *out_r = p->dist_tx_ring; 351 struct rte_mbuf *bufs[BURST_SIZE * 4]; 352 struct rte_distributor *d = p->d; 353 354 printf("\nCore %u acting as distributor core.\n", rte_lcore_id()); 355 while (!quit_signal_dist) { 356 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 357 (void *)bufs, BURST_SIZE*1, NULL); 358 if (nb_rx) { 359 app_stats.dist.in_pkts += nb_rx; 360 361 /* Distribute the packets */ 362 rte_distributor_process(d, bufs, nb_rx); 363 /* Handle Returns */ 364 const uint16_t nb_ret = 365 rte_distributor_returned_pkts(d, 366 bufs, BURST_SIZE*2); 367 368 if (unlikely(nb_ret == 0)) 369 continue; 370 app_stats.dist.ret_pkts += nb_ret; 371 372 uint16_t sent = rte_ring_enqueue_burst(out_r, 373 (void *)bufs, nb_ret, NULL); 374 app_stats.dist.sent_pkts += sent; 375 if (unlikely(sent < nb_ret)) { 376 app_stats.dist.enqdrop_pkts += nb_ret - sent; 377 RTE_LOG(DEBUG, DISTRAPP, 378 "%s:Packet loss due to full out ring\n", 379 __func__); 380 while (sent < nb_ret) 381 rte_pktmbuf_free(bufs[sent++]); 382 } 383 } 384 } 385 printf("\nCore %u exiting distributor task.\n", rte_lcore_id()); 386 quit_signal_work = 1; 387 if (power_lib_initialised) 388 rte_power_exit(rte_lcore_id()); 389 rte_distributor_flush(d); 390 /* Unblock any returns so workers can exit */ 391 rte_distributor_clear_returns(d); 392 quit_signal_rx = 1; 393 return 0; 394 } 395 396 397 static int 398 lcore_tx(struct rte_ring *in_r) 399 { 400 static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; 401 const int socket_id = rte_socket_id(); 402 uint16_t port; 403 404 RTE_ETH_FOREACH_DEV(port) { 405 /* skip ports that are not enabled */ 406 if ((enabled_port_mask & (1 << port)) == 0) 407 continue; 408 409 if (rte_eth_dev_socket_id(port) > 0 && 410 rte_eth_dev_socket_id(port) != socket_id) 411 printf("WARNING, port %u is on remote NUMA node to " 412 "TX thread.\n\tPerformance will not " 413 "be optimal.\n", port); 414 } 415 416 printf("\nCore %u doing packet TX.\n", rte_lcore_id()); 417 while (!quit_signal) { 418 419 RTE_ETH_FOREACH_DEV(port) { 420 /* skip ports that are not enabled */ 421 if ((enabled_port_mask & (1 << port)) == 0) 422 continue; 423 424 struct rte_mbuf *bufs[BURST_SIZE_TX]; 425 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 426 (void *)bufs, BURST_SIZE_TX, NULL); 427 app_stats.tx.dequeue_pkts += nb_rx; 428 429 /* if we get no traffic, flush anything we have */ 430 if (unlikely(nb_rx == 0)) { 431 flush_all_ports(tx_buffers); 432 continue; 433 } 434 435 /* for traffic we receive, queue it up for transmit */ 436 uint16_t i; 437 rte_prefetch_non_temporal((void *)bufs[0]); 438 rte_prefetch_non_temporal((void *)bufs[1]); 439 rte_prefetch_non_temporal((void *)bufs[2]); 440 for (i = 0; i < nb_rx; i++) { 441 struct output_buffer *outbuf; 442 uint8_t outp; 443 rte_prefetch_non_temporal((void *)bufs[i + 3]); 444 /* 445 * workers should update in_port to hold the 446 * output port value 447 */ 448 outp = bufs[i]->port; 449 /* skip ports that are not enabled */ 450 if ((enabled_port_mask & (1 << outp)) == 0) 451 continue; 452 453 outbuf = &tx_buffers[outp]; 454 outbuf->mbufs[outbuf->count++] = bufs[i]; 455 if (outbuf->count == BURST_SIZE_TX) 456 flush_one_port(outbuf, outp); 457 } 458 } 459 } 460 if (power_lib_initialised) 461 rte_power_exit(rte_lcore_id()); 462 printf("\nCore %u exiting tx task.\n", rte_lcore_id()); 463 return 0; 464 } 465 466 static void 467 int_handler(int sig_num) 468 { 469 printf("Exiting on signal %d\n", sig_num); 470 /* set quit flag for rx thread to exit */ 471 quit_signal_dist = 1; 472 } 473 474 static void 475 print_stats(void) 476 { 477 struct rte_eth_stats eth_stats; 478 unsigned int i, j; 479 const unsigned int num_workers = rte_lcore_count() - 4; 480 481 RTE_ETH_FOREACH_DEV(i) { 482 rte_eth_stats_get(i, ð_stats); 483 app_stats.port_rx_pkts[i] = eth_stats.ipackets; 484 app_stats.port_tx_pkts[i] = eth_stats.opackets; 485 } 486 487 printf("\n\nRX Thread:\n"); 488 RTE_ETH_FOREACH_DEV(i) { 489 printf("Port %u Pktsin : %5.2f\n", i, 490 (app_stats.port_rx_pkts[i] - 491 prev_app_stats.port_rx_pkts[i])/1000000.0); 492 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i]; 493 } 494 printf(" - Received: %5.2f\n", 495 (app_stats.rx.rx_pkts - 496 prev_app_stats.rx.rx_pkts)/1000000.0); 497 printf(" - Returned: %5.2f\n", 498 (app_stats.rx.returned_pkts - 499 prev_app_stats.rx.returned_pkts)/1000000.0); 500 printf(" - Enqueued: %5.2f\n", 501 (app_stats.rx.enqueued_pkts - 502 prev_app_stats.rx.enqueued_pkts)/1000000.0); 503 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 504 (app_stats.rx.enqdrop_pkts - 505 prev_app_stats.rx.enqdrop_pkts)/1000000.0, 506 ANSI_COLOR_RESET); 507 508 printf("Distributor thread:\n"); 509 printf(" - In: %5.2f\n", 510 (app_stats.dist.in_pkts - 511 prev_app_stats.dist.in_pkts)/1000000.0); 512 printf(" - Returned: %5.2f\n", 513 (app_stats.dist.ret_pkts - 514 prev_app_stats.dist.ret_pkts)/1000000.0); 515 printf(" - Sent: %5.2f\n", 516 (app_stats.dist.sent_pkts - 517 prev_app_stats.dist.sent_pkts)/1000000.0); 518 printf(" - Dropped %s%5.2f%s\n", ANSI_COLOR_RED, 519 (app_stats.dist.enqdrop_pkts - 520 prev_app_stats.dist.enqdrop_pkts)/1000000.0, 521 ANSI_COLOR_RESET); 522 523 printf("TX thread:\n"); 524 printf(" - Dequeued: %5.2f\n", 525 (app_stats.tx.dequeue_pkts - 526 prev_app_stats.tx.dequeue_pkts)/1000000.0); 527 RTE_ETH_FOREACH_DEV(i) { 528 printf("Port %u Pktsout: %5.2f\n", 529 i, (app_stats.port_tx_pkts[i] - 530 prev_app_stats.port_tx_pkts[i])/1000000.0); 531 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i]; 532 } 533 printf(" - Transmitted: %5.2f\n", 534 (app_stats.tx.tx_pkts - 535 prev_app_stats.tx.tx_pkts)/1000000.0); 536 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 537 (app_stats.tx.enqdrop_pkts - 538 prev_app_stats.tx.enqdrop_pkts)/1000000.0, 539 ANSI_COLOR_RESET); 540 541 prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts; 542 prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts; 543 prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts; 544 prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts; 545 prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts; 546 prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts; 547 prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts; 548 prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts; 549 prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts; 550 prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts; 551 prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts; 552 553 for (i = 0; i < num_workers; i++) { 554 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i, 555 (app_stats.worker_pkts[i] - 556 prev_app_stats.worker_pkts[i])/1000000.0); 557 for (j = 0; j < 8; j++) { 558 printf("%"PRIu64" ", app_stats.worker_bursts[i][j]); 559 app_stats.worker_bursts[i][j] = 0; 560 } 561 printf("\n"); 562 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i]; 563 } 564 } 565 566 static int 567 lcore_worker(struct lcore_params *p) 568 { 569 struct rte_distributor *d = p->d; 570 const unsigned id = p->worker_id; 571 unsigned int num = 0; 572 unsigned int i; 573 574 /* 575 * for single port, xor_val will be zero so we won't modify the output 576 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa 577 */ 578 const unsigned xor_val = (rte_eth_dev_count_avail() > 1); 579 struct rte_mbuf *buf[8] __rte_cache_aligned; 580 581 for (i = 0; i < 8; i++) 582 buf[i] = NULL; 583 584 app_stats.worker_pkts[p->worker_id] = 1; 585 586 printf("\nCore %u acting as worker core.\n", rte_lcore_id()); 587 while (!quit_signal_work) { 588 num = rte_distributor_get_pkt(d, id, buf, buf, num); 589 /* Do a little bit of work for each packet */ 590 for (i = 0; i < num; i++) { 591 uint64_t t = rte_rdtsc()+100; 592 593 while (rte_rdtsc() < t) 594 rte_pause(); 595 buf[i]->port ^= xor_val; 596 } 597 598 app_stats.worker_pkts[p->worker_id] += num; 599 if (num > 0) 600 app_stats.worker_bursts[p->worker_id][num-1]++; 601 } 602 if (power_lib_initialised) 603 rte_power_exit(rte_lcore_id()); 604 rte_free(p); 605 return 0; 606 } 607 608 static int 609 init_power_library(void) 610 { 611 int ret = 0, lcore_id; 612 RTE_LCORE_FOREACH_WORKER(lcore_id) { 613 /* init power management library */ 614 ret = rte_power_init(lcore_id); 615 if (ret) { 616 RTE_LOG(ERR, POWER, 617 "Library initialization failed on core %u\n", 618 lcore_id); 619 /* 620 * Return on first failure, we'll fall back 621 * to non-power operation 622 */ 623 return ret; 624 } 625 } 626 return ret; 627 } 628 629 /* display usage */ 630 static void 631 print_usage(const char *prgname) 632 { 633 printf("%s [EAL options] -- -p PORTMASK\n" 634 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 635 prgname); 636 } 637 638 static int 639 parse_portmask(const char *portmask) 640 { 641 char *end = NULL; 642 unsigned long pm; 643 644 /* parse hexadecimal string */ 645 pm = strtoul(portmask, &end, 16); 646 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 647 return 0; 648 649 return pm; 650 } 651 652 /* Parse the argument given in the command line of the application */ 653 static int 654 parse_args(int argc, char **argv) 655 { 656 int opt; 657 char **argvopt; 658 int option_index; 659 char *prgname = argv[0]; 660 static struct option lgopts[] = { 661 {NULL, 0, 0, 0} 662 }; 663 664 argvopt = argv; 665 666 while ((opt = getopt_long(argc, argvopt, "p:", 667 lgopts, &option_index)) != EOF) { 668 669 switch (opt) { 670 /* portmask */ 671 case 'p': 672 enabled_port_mask = parse_portmask(optarg); 673 if (enabled_port_mask == 0) { 674 printf("invalid portmask\n"); 675 print_usage(prgname); 676 return -1; 677 } 678 break; 679 680 default: 681 print_usage(prgname); 682 return -1; 683 } 684 } 685 686 if (optind <= 1) { 687 print_usage(prgname); 688 return -1; 689 } 690 691 argv[optind-1] = prgname; 692 693 optind = 1; /* reset getopt lib */ 694 return 0; 695 } 696 697 /* Main function, does initialization and calls the per-lcore functions */ 698 int 699 main(int argc, char *argv[]) 700 { 701 struct rte_mempool *mbuf_pool; 702 struct rte_distributor *d; 703 struct rte_ring *dist_tx_ring; 704 struct rte_ring *rx_dist_ring; 705 struct rte_power_core_capabilities lcore_cap; 706 unsigned int lcore_id, worker_id = 0; 707 int distr_core_id = -1, rx_core_id = -1, tx_core_id = -1; 708 unsigned nb_ports; 709 uint16_t portid; 710 uint16_t nb_ports_available; 711 uint64_t t, freq; 712 713 /* catch ctrl-c so we can print on exit */ 714 signal(SIGINT, int_handler); 715 716 /* init EAL */ 717 int ret = rte_eal_init(argc, argv); 718 if (ret < 0) 719 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); 720 argc -= ret; 721 argv += ret; 722 723 /* parse application arguments (after the EAL ones) */ 724 ret = parse_args(argc, argv); 725 if (ret < 0) 726 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); 727 728 if (rte_lcore_count() < 5) 729 rte_exit(EXIT_FAILURE, "Error, This application needs at " 730 "least 5 logical cores to run:\n" 731 "1 lcore for stats (can be core 0)\n" 732 "1 lcore for packet RX\n" 733 "1 lcore for distribution\n" 734 "1 lcore for packet TX\n" 735 "and at least 1 lcore for worker threads\n"); 736 737 if (init_power_library() == 0) 738 power_lib_initialised = 1; 739 740 nb_ports = rte_eth_dev_count_avail(); 741 if (nb_ports == 0) 742 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 743 if (nb_ports != 1 && (nb_ports & 1)) 744 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 745 "when using a single port\n"); 746 747 mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", 748 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, 749 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 750 if (mbuf_pool == NULL) 751 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); 752 nb_ports_available = nb_ports; 753 754 /* initialize all ports */ 755 RTE_ETH_FOREACH_DEV(portid) { 756 /* skip ports that are not enabled */ 757 if ((enabled_port_mask & (1 << portid)) == 0) { 758 printf("\nSkipping disabled port %d\n", portid); 759 nb_ports_available--; 760 continue; 761 } 762 /* init port */ 763 printf("Initializing port %u... done\n", portid); 764 765 if (port_init(portid, mbuf_pool) != 0) 766 rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n", 767 portid); 768 } 769 770 if (!nb_ports_available) { 771 rte_exit(EXIT_FAILURE, 772 "All available ports are disabled. Please set portmask.\n"); 773 } 774 775 d = rte_distributor_create("PKT_DIST", rte_socket_id(), 776 rte_lcore_count() - 4, 777 RTE_DIST_ALG_BURST); 778 if (d == NULL) 779 rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); 780 781 /* 782 * scheduler ring is read by the transmitter core, and written to 783 * by scheduler core 784 */ 785 dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ, 786 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 787 if (dist_tx_ring == NULL) 788 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 789 790 rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ, 791 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 792 if (rx_dist_ring == NULL) 793 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 794 795 if (power_lib_initialised) { 796 /* 797 * Here we'll pre-assign lcore ids to the rx, tx and 798 * distributor workloads if there's higher frequency 799 * on those cores e.g. if Turbo Boost is enabled. 800 * It's also worth mentioning that it will assign cores in a 801 * specific order, so that if there's less than three 802 * available, the higher frequency cores will go to the 803 * distributor first, then rx, then tx. 804 */ 805 RTE_LCORE_FOREACH_WORKER(lcore_id) { 806 807 rte_power_get_capabilities(lcore_id, &lcore_cap); 808 809 if (lcore_cap.priority != 1) 810 continue; 811 812 if (distr_core_id < 0) { 813 distr_core_id = lcore_id; 814 printf("Distributor on priority core %d\n", 815 lcore_id); 816 continue; 817 } 818 if (rx_core_id < 0) { 819 rx_core_id = lcore_id; 820 printf("Rx on priority core %d\n", 821 lcore_id); 822 continue; 823 } 824 if (tx_core_id < 0) { 825 tx_core_id = lcore_id; 826 printf("Tx on priority core %d\n", 827 lcore_id); 828 continue; 829 } 830 } 831 } 832 833 /* 834 * If there's any of the key workloads left without an lcore_id 835 * after the high performing core assignment above, pre-assign 836 * them here. 837 */ 838 RTE_LCORE_FOREACH_WORKER(lcore_id) { 839 if (lcore_id == (unsigned int)distr_core_id || 840 lcore_id == (unsigned int)rx_core_id || 841 lcore_id == (unsigned int)tx_core_id) 842 continue; 843 if (distr_core_id < 0) { 844 distr_core_id = lcore_id; 845 printf("Distributor on core %d\n", lcore_id); 846 continue; 847 } 848 if (rx_core_id < 0) { 849 rx_core_id = lcore_id; 850 printf("Rx on core %d\n", lcore_id); 851 continue; 852 } 853 if (tx_core_id < 0) { 854 tx_core_id = lcore_id; 855 printf("Tx on core %d\n", lcore_id); 856 continue; 857 } 858 } 859 860 printf(" tx id %d, dist id %d, rx id %d\n", 861 tx_core_id, 862 distr_core_id, 863 rx_core_id); 864 865 /* 866 * Kick off all the worker threads first, avoiding the pre-assigned 867 * lcore_ids for tx, rx and distributor workloads. 868 */ 869 RTE_LCORE_FOREACH_WORKER(lcore_id) { 870 if (lcore_id == (unsigned int)distr_core_id || 871 lcore_id == (unsigned int)rx_core_id || 872 lcore_id == (unsigned int)tx_core_id) 873 continue; 874 printf("Starting thread %d as worker, lcore_id %d\n", 875 worker_id, lcore_id); 876 struct lcore_params *p = 877 rte_malloc(NULL, sizeof(*p), 0); 878 if (!p) 879 rte_panic("malloc failure\n"); 880 *p = (struct lcore_params){worker_id++, d, rx_dist_ring, 881 dist_tx_ring, mbuf_pool}; 882 883 rte_eal_remote_launch((lcore_function_t *)lcore_worker, 884 p, lcore_id); 885 } 886 887 /* Start tx core */ 888 rte_eal_remote_launch((lcore_function_t *)lcore_tx, 889 dist_tx_ring, tx_core_id); 890 891 /* Start distributor core */ 892 struct lcore_params *pd = 893 rte_malloc(NULL, sizeof(*pd), 0); 894 if (!pd) 895 rte_panic("malloc failure\n"); 896 *pd = (struct lcore_params){worker_id++, d, 897 rx_dist_ring, dist_tx_ring, mbuf_pool}; 898 rte_eal_remote_launch( 899 (lcore_function_t *)lcore_distributor, 900 pd, distr_core_id); 901 902 /* Start rx core */ 903 struct lcore_params *pr = 904 rte_malloc(NULL, sizeof(*pr), 0); 905 if (!pr) 906 rte_panic("malloc failure\n"); 907 *pr = (struct lcore_params){worker_id++, d, rx_dist_ring, 908 dist_tx_ring, mbuf_pool}; 909 rte_eal_remote_launch((lcore_function_t *)lcore_rx, 910 pr, rx_core_id); 911 912 freq = rte_get_timer_hz(); 913 t = rte_rdtsc() + freq; 914 while (!quit_signal_dist) { 915 if (t < rte_rdtsc()) { 916 print_stats(); 917 t = rte_rdtsc() + freq; 918 } 919 usleep(1000); 920 } 921 922 RTE_LCORE_FOREACH_WORKER(lcore_id) { 923 if (rte_eal_wait_lcore(lcore_id) < 0) 924 return -1; 925 } 926 927 print_stats(); 928 929 rte_free(pd); 930 rte_free(pr); 931 932 /* clean up the EAL */ 933 rte_eal_cleanup(); 934 935 return 0; 936 } 937