1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2017 Intel Corporation 3 */ 4 5 #include <stdint.h> 6 #include <inttypes.h> 7 #include <unistd.h> 8 #include <signal.h> 9 #include <getopt.h> 10 11 #include <rte_eal.h> 12 #include <rte_ethdev.h> 13 #include <rte_cycles.h> 14 #include <rte_malloc.h> 15 #include <rte_debug.h> 16 #include <rte_prefetch.h> 17 #include <rte_distributor.h> 18 #include <rte_pause.h> 19 #include <rte_power.h> 20 21 #define RX_RING_SIZE 1024 22 #define TX_RING_SIZE 1024 23 #define NUM_MBUFS ((64*1024)-1) 24 #define MBUF_CACHE_SIZE 128 25 #define BURST_SIZE 64 26 #define SCHED_RX_RING_SZ 8192 27 #define SCHED_TX_RING_SZ 65536 28 #define BURST_SIZE_TX 32 29 30 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 31 32 #define ANSI_COLOR_RED "\x1b[31m" 33 #define ANSI_COLOR_RESET "\x1b[0m" 34 35 /* mask of enabled ports */ 36 static uint32_t enabled_port_mask; 37 volatile uint8_t quit_signal; 38 volatile uint8_t quit_signal_rx; 39 volatile uint8_t quit_signal_dist; 40 volatile uint8_t quit_signal_work; 41 unsigned int power_lib_initialised; 42 43 static volatile struct app_stats { 44 struct { 45 uint64_t rx_pkts; 46 uint64_t returned_pkts; 47 uint64_t enqueued_pkts; 48 uint64_t enqdrop_pkts; 49 } rx __rte_cache_aligned; 50 int pad1 __rte_cache_aligned; 51 52 struct { 53 uint64_t in_pkts; 54 uint64_t ret_pkts; 55 uint64_t sent_pkts; 56 uint64_t enqdrop_pkts; 57 } dist __rte_cache_aligned; 58 int pad2 __rte_cache_aligned; 59 60 struct { 61 uint64_t dequeue_pkts; 62 uint64_t tx_pkts; 63 uint64_t enqdrop_pkts; 64 } tx __rte_cache_aligned; 65 int pad3 __rte_cache_aligned; 66 67 uint64_t worker_pkts[64] __rte_cache_aligned; 68 69 int pad4 __rte_cache_aligned; 70 71 uint64_t worker_bursts[64][8] __rte_cache_aligned; 72 73 int pad5 __rte_cache_aligned; 74 75 uint64_t port_rx_pkts[64] __rte_cache_aligned; 76 uint64_t port_tx_pkts[64] __rte_cache_aligned; 77 } app_stats; 78 79 struct app_stats prev_app_stats; 80 81 static const struct rte_eth_conf port_conf_default = { 82 .rxmode = { 83 .mq_mode = ETH_MQ_RX_RSS, 84 .max_rx_pkt_len = RTE_ETHER_MAX_LEN, 85 }, 86 .txmode = { 87 .mq_mode = ETH_MQ_TX_NONE, 88 }, 89 .rx_adv_conf = { 90 .rss_conf = { 91 .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | 92 ETH_RSS_TCP | ETH_RSS_SCTP, 93 } 94 }, 95 }; 96 97 struct output_buffer { 98 unsigned count; 99 struct rte_mbuf *mbufs[BURST_SIZE]; 100 }; 101 102 static void print_stats(void); 103 104 /* 105 * Initialises a given port using global settings and with the rx buffers 106 * coming from the mbuf_pool passed as parameter 107 */ 108 static inline int 109 port_init(uint16_t port, struct rte_mempool *mbuf_pool) 110 { 111 struct rte_eth_conf port_conf = port_conf_default; 112 const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; 113 int retval; 114 uint16_t q; 115 uint16_t nb_rxd = RX_RING_SIZE; 116 uint16_t nb_txd = TX_RING_SIZE; 117 struct rte_eth_dev_info dev_info; 118 struct rte_eth_txconf txconf; 119 120 if (!rte_eth_dev_is_valid_port(port)) 121 return -1; 122 123 retval = rte_eth_dev_info_get(port, &dev_info); 124 if (retval != 0) { 125 printf("Error during getting device (port %u) info: %s\n", 126 port, strerror(-retval)); 127 return retval; 128 } 129 130 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 131 port_conf.txmode.offloads |= 132 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 133 134 port_conf.rx_adv_conf.rss_conf.rss_hf &= 135 dev_info.flow_type_rss_offloads; 136 if (port_conf.rx_adv_conf.rss_conf.rss_hf != 137 port_conf_default.rx_adv_conf.rss_conf.rss_hf) { 138 printf("Port %u modified RSS hash function based on hardware support," 139 "requested:%#"PRIx64" configured:%#"PRIx64"\n", 140 port, 141 port_conf_default.rx_adv_conf.rss_conf.rss_hf, 142 port_conf.rx_adv_conf.rss_conf.rss_hf); 143 } 144 145 retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); 146 if (retval != 0) 147 return retval; 148 149 retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd); 150 if (retval != 0) 151 return retval; 152 153 for (q = 0; q < rxRings; q++) { 154 retval = rte_eth_rx_queue_setup(port, q, nb_rxd, 155 rte_eth_dev_socket_id(port), 156 NULL, mbuf_pool); 157 if (retval < 0) 158 return retval; 159 } 160 161 txconf = dev_info.default_txconf; 162 txconf.offloads = port_conf.txmode.offloads; 163 for (q = 0; q < txRings; q++) { 164 retval = rte_eth_tx_queue_setup(port, q, nb_txd, 165 rte_eth_dev_socket_id(port), 166 &txconf); 167 if (retval < 0) 168 return retval; 169 } 170 171 retval = rte_eth_dev_start(port); 172 if (retval < 0) 173 return retval; 174 175 struct rte_eth_link link; 176 do { 177 retval = rte_eth_link_get_nowait(port, &link); 178 if (retval < 0) { 179 printf("Failed link get (port %u): %s\n", 180 port, rte_strerror(-retval)); 181 return retval; 182 } else if (link.link_status) 183 break; 184 185 printf("Waiting for Link up on port %"PRIu16"\n", port); 186 sleep(1); 187 } while (!link.link_status); 188 189 if (!link.link_status) { 190 printf("Link down on port %"PRIu16"\n", port); 191 return 0; 192 } 193 194 struct rte_ether_addr addr; 195 retval = rte_eth_macaddr_get(port, &addr); 196 if (retval < 0) { 197 printf("Failed to get MAC address (port %u): %s\n", 198 port, rte_strerror(-retval)); 199 return retval; 200 } 201 202 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 203 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 204 port, 205 addr.addr_bytes[0], addr.addr_bytes[1], 206 addr.addr_bytes[2], addr.addr_bytes[3], 207 addr.addr_bytes[4], addr.addr_bytes[5]); 208 209 retval = rte_eth_promiscuous_enable(port); 210 if (retval != 0) 211 return retval; 212 213 return 0; 214 } 215 216 struct lcore_params { 217 unsigned worker_id; 218 struct rte_distributor *d; 219 struct rte_ring *rx_dist_ring; 220 struct rte_ring *dist_tx_ring; 221 struct rte_mempool *mem_pool; 222 }; 223 224 static int 225 lcore_rx(struct lcore_params *p) 226 { 227 const uint16_t nb_ports = rte_eth_dev_count_avail(); 228 const int socket_id = rte_socket_id(); 229 uint16_t port; 230 struct rte_mbuf *bufs[BURST_SIZE*2]; 231 232 RTE_ETH_FOREACH_DEV(port) { 233 /* skip ports that are not enabled */ 234 if ((enabled_port_mask & (1 << port)) == 0) 235 continue; 236 237 if (rte_eth_dev_socket_id(port) > 0 && 238 rte_eth_dev_socket_id(port) != socket_id) 239 printf("WARNING, port %u is on remote NUMA node to " 240 "RX thread.\n\tPerformance will not " 241 "be optimal.\n", port); 242 } 243 244 printf("\nCore %u doing packet RX.\n", rte_lcore_id()); 245 port = 0; 246 while (!quit_signal_rx) { 247 248 /* skip ports that are not enabled */ 249 if ((enabled_port_mask & (1 << port)) == 0) { 250 if (++port == nb_ports) 251 port = 0; 252 continue; 253 } 254 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 255 BURST_SIZE); 256 if (unlikely(nb_rx == 0)) { 257 if (++port == nb_ports) 258 port = 0; 259 continue; 260 } 261 app_stats.rx.rx_pkts += nb_rx; 262 263 /* 264 * You can run the distributor on the rx core with this code. Returned 265 * packets are then send straight to the tx core. 266 */ 267 #if 0 268 rte_distributor_process(d, bufs, nb_rx); 269 const uint16_t nb_ret = rte_distributor_returned_pktsd, 270 bufs, BURST_SIZE*2); 271 272 app_stats.rx.returned_pkts += nb_ret; 273 if (unlikely(nb_ret == 0)) { 274 if (++port == nb_ports) 275 port = 0; 276 continue; 277 } 278 279 struct rte_ring *tx_ring = p->dist_tx_ring; 280 uint16_t sent = rte_ring_enqueue_burst(tx_ring, 281 (void *)bufs, nb_ret, NULL); 282 #else 283 uint16_t nb_ret = nb_rx; 284 /* 285 * Swap the following two lines if you want the rx traffic 286 * to go directly to tx, no distribution. 287 */ 288 struct rte_ring *out_ring = p->rx_dist_ring; 289 /* struct rte_ring *out_ring = p->dist_tx_ring; */ 290 291 uint16_t sent = rte_ring_enqueue_burst(out_ring, 292 (void *)bufs, nb_ret, NULL); 293 #endif 294 295 app_stats.rx.enqueued_pkts += sent; 296 if (unlikely(sent < nb_ret)) { 297 app_stats.rx.enqdrop_pkts += nb_ret - sent; 298 RTE_LOG_DP(DEBUG, DISTRAPP, 299 "%s:Packet loss due to full ring\n", __func__); 300 while (sent < nb_ret) 301 rte_pktmbuf_free(bufs[sent++]); 302 } 303 if (++port == nb_ports) 304 port = 0; 305 } 306 if (power_lib_initialised) 307 rte_power_exit(rte_lcore_id()); 308 /* set worker & tx threads quit flag */ 309 printf("\nCore %u exiting rx task.\n", rte_lcore_id()); 310 quit_signal = 1; 311 return 0; 312 } 313 314 static inline void 315 flush_one_port(struct output_buffer *outbuf, uint8_t outp) 316 { 317 unsigned int nb_tx = rte_eth_tx_burst(outp, 0, 318 outbuf->mbufs, outbuf->count); 319 app_stats.tx.tx_pkts += outbuf->count; 320 321 if (unlikely(nb_tx < outbuf->count)) { 322 app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx; 323 do { 324 rte_pktmbuf_free(outbuf->mbufs[nb_tx]); 325 } while (++nb_tx < outbuf->count); 326 } 327 outbuf->count = 0; 328 } 329 330 static inline void 331 flush_all_ports(struct output_buffer *tx_buffers) 332 { 333 uint16_t outp; 334 335 RTE_ETH_FOREACH_DEV(outp) { 336 /* skip ports that are not enabled */ 337 if ((enabled_port_mask & (1 << outp)) == 0) 338 continue; 339 340 if (tx_buffers[outp].count == 0) 341 continue; 342 343 flush_one_port(&tx_buffers[outp], outp); 344 } 345 } 346 347 348 349 static int 350 lcore_distributor(struct lcore_params *p) 351 { 352 struct rte_ring *in_r = p->rx_dist_ring; 353 struct rte_ring *out_r = p->dist_tx_ring; 354 struct rte_mbuf *bufs[BURST_SIZE * 4]; 355 struct rte_distributor *d = p->d; 356 357 printf("\nCore %u acting as distributor core.\n", rte_lcore_id()); 358 while (!quit_signal_dist) { 359 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 360 (void *)bufs, BURST_SIZE*1, NULL); 361 if (nb_rx) { 362 app_stats.dist.in_pkts += nb_rx; 363 364 /* Distribute the packets */ 365 rte_distributor_process(d, bufs, nb_rx); 366 /* Handle Returns */ 367 const uint16_t nb_ret = 368 rte_distributor_returned_pkts(d, 369 bufs, BURST_SIZE*2); 370 371 if (unlikely(nb_ret == 0)) 372 continue; 373 app_stats.dist.ret_pkts += nb_ret; 374 375 uint16_t sent = rte_ring_enqueue_burst(out_r, 376 (void *)bufs, nb_ret, NULL); 377 app_stats.dist.sent_pkts += sent; 378 if (unlikely(sent < nb_ret)) { 379 app_stats.dist.enqdrop_pkts += nb_ret - sent; 380 RTE_LOG(DEBUG, DISTRAPP, 381 "%s:Packet loss due to full out ring\n", 382 __func__); 383 while (sent < nb_ret) 384 rte_pktmbuf_free(bufs[sent++]); 385 } 386 } 387 } 388 printf("\nCore %u exiting distributor task.\n", rte_lcore_id()); 389 quit_signal_work = 1; 390 if (power_lib_initialised) 391 rte_power_exit(rte_lcore_id()); 392 rte_distributor_flush(d); 393 /* Unblock any returns so workers can exit */ 394 rte_distributor_clear_returns(d); 395 quit_signal_rx = 1; 396 return 0; 397 } 398 399 400 static int 401 lcore_tx(struct rte_ring *in_r) 402 { 403 static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; 404 const int socket_id = rte_socket_id(); 405 uint16_t port; 406 407 RTE_ETH_FOREACH_DEV(port) { 408 /* skip ports that are not enabled */ 409 if ((enabled_port_mask & (1 << port)) == 0) 410 continue; 411 412 if (rte_eth_dev_socket_id(port) > 0 && 413 rte_eth_dev_socket_id(port) != socket_id) 414 printf("WARNING, port %u is on remote NUMA node to " 415 "TX thread.\n\tPerformance will not " 416 "be optimal.\n", port); 417 } 418 419 printf("\nCore %u doing packet TX.\n", rte_lcore_id()); 420 while (!quit_signal) { 421 422 RTE_ETH_FOREACH_DEV(port) { 423 /* skip ports that are not enabled */ 424 if ((enabled_port_mask & (1 << port)) == 0) 425 continue; 426 427 struct rte_mbuf *bufs[BURST_SIZE_TX]; 428 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 429 (void *)bufs, BURST_SIZE_TX, NULL); 430 app_stats.tx.dequeue_pkts += nb_rx; 431 432 /* if we get no traffic, flush anything we have */ 433 if (unlikely(nb_rx == 0)) { 434 flush_all_ports(tx_buffers); 435 continue; 436 } 437 438 /* for traffic we receive, queue it up for transmit */ 439 uint16_t i; 440 rte_prefetch_non_temporal((void *)bufs[0]); 441 rte_prefetch_non_temporal((void *)bufs[1]); 442 rte_prefetch_non_temporal((void *)bufs[2]); 443 for (i = 0; i < nb_rx; i++) { 444 struct output_buffer *outbuf; 445 uint8_t outp; 446 rte_prefetch_non_temporal((void *)bufs[i + 3]); 447 /* 448 * workers should update in_port to hold the 449 * output port value 450 */ 451 outp = bufs[i]->port; 452 /* skip ports that are not enabled */ 453 if ((enabled_port_mask & (1 << outp)) == 0) 454 continue; 455 456 outbuf = &tx_buffers[outp]; 457 outbuf->mbufs[outbuf->count++] = bufs[i]; 458 if (outbuf->count == BURST_SIZE_TX) 459 flush_one_port(outbuf, outp); 460 } 461 } 462 } 463 if (power_lib_initialised) 464 rte_power_exit(rte_lcore_id()); 465 printf("\nCore %u exiting tx task.\n", rte_lcore_id()); 466 return 0; 467 } 468 469 static void 470 int_handler(int sig_num) 471 { 472 printf("Exiting on signal %d\n", sig_num); 473 /* set quit flag for rx thread to exit */ 474 quit_signal_dist = 1; 475 } 476 477 static void 478 print_stats(void) 479 { 480 struct rte_eth_stats eth_stats; 481 unsigned int i, j; 482 const unsigned int num_workers = rte_lcore_count() - 4; 483 484 RTE_ETH_FOREACH_DEV(i) { 485 rte_eth_stats_get(i, ð_stats); 486 app_stats.port_rx_pkts[i] = eth_stats.ipackets; 487 app_stats.port_tx_pkts[i] = eth_stats.opackets; 488 } 489 490 printf("\n\nRX Thread:\n"); 491 RTE_ETH_FOREACH_DEV(i) { 492 printf("Port %u Pktsin : %5.2f\n", i, 493 (app_stats.port_rx_pkts[i] - 494 prev_app_stats.port_rx_pkts[i])/1000000.0); 495 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i]; 496 } 497 printf(" - Received: %5.2f\n", 498 (app_stats.rx.rx_pkts - 499 prev_app_stats.rx.rx_pkts)/1000000.0); 500 printf(" - Returned: %5.2f\n", 501 (app_stats.rx.returned_pkts - 502 prev_app_stats.rx.returned_pkts)/1000000.0); 503 printf(" - Enqueued: %5.2f\n", 504 (app_stats.rx.enqueued_pkts - 505 prev_app_stats.rx.enqueued_pkts)/1000000.0); 506 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 507 (app_stats.rx.enqdrop_pkts - 508 prev_app_stats.rx.enqdrop_pkts)/1000000.0, 509 ANSI_COLOR_RESET); 510 511 printf("Distributor thread:\n"); 512 printf(" - In: %5.2f\n", 513 (app_stats.dist.in_pkts - 514 prev_app_stats.dist.in_pkts)/1000000.0); 515 printf(" - Returned: %5.2f\n", 516 (app_stats.dist.ret_pkts - 517 prev_app_stats.dist.ret_pkts)/1000000.0); 518 printf(" - Sent: %5.2f\n", 519 (app_stats.dist.sent_pkts - 520 prev_app_stats.dist.sent_pkts)/1000000.0); 521 printf(" - Dropped %s%5.2f%s\n", ANSI_COLOR_RED, 522 (app_stats.dist.enqdrop_pkts - 523 prev_app_stats.dist.enqdrop_pkts)/1000000.0, 524 ANSI_COLOR_RESET); 525 526 printf("TX thread:\n"); 527 printf(" - Dequeued: %5.2f\n", 528 (app_stats.tx.dequeue_pkts - 529 prev_app_stats.tx.dequeue_pkts)/1000000.0); 530 RTE_ETH_FOREACH_DEV(i) { 531 printf("Port %u Pktsout: %5.2f\n", 532 i, (app_stats.port_tx_pkts[i] - 533 prev_app_stats.port_tx_pkts[i])/1000000.0); 534 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i]; 535 } 536 printf(" - Transmitted: %5.2f\n", 537 (app_stats.tx.tx_pkts - 538 prev_app_stats.tx.tx_pkts)/1000000.0); 539 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 540 (app_stats.tx.enqdrop_pkts - 541 prev_app_stats.tx.enqdrop_pkts)/1000000.0, 542 ANSI_COLOR_RESET); 543 544 prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts; 545 prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts; 546 prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts; 547 prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts; 548 prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts; 549 prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts; 550 prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts; 551 prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts; 552 prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts; 553 prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts; 554 prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts; 555 556 for (i = 0; i < num_workers; i++) { 557 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i, 558 (app_stats.worker_pkts[i] - 559 prev_app_stats.worker_pkts[i])/1000000.0); 560 for (j = 0; j < 8; j++) { 561 printf("%"PRIu64" ", app_stats.worker_bursts[i][j]); 562 app_stats.worker_bursts[i][j] = 0; 563 } 564 printf("\n"); 565 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i]; 566 } 567 } 568 569 static int 570 lcore_worker(struct lcore_params *p) 571 { 572 struct rte_distributor *d = p->d; 573 const unsigned id = p->worker_id; 574 unsigned int num = 0; 575 unsigned int i; 576 577 /* 578 * for single port, xor_val will be zero so we won't modify the output 579 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa 580 */ 581 const unsigned xor_val = (rte_eth_dev_count_avail() > 1); 582 struct rte_mbuf *buf[8] __rte_cache_aligned; 583 584 for (i = 0; i < 8; i++) 585 buf[i] = NULL; 586 587 app_stats.worker_pkts[p->worker_id] = 1; 588 589 printf("\nCore %u acting as worker core.\n", rte_lcore_id()); 590 while (!quit_signal_work) { 591 num = rte_distributor_get_pkt(d, id, buf, buf, num); 592 /* Do a little bit of work for each packet */ 593 for (i = 0; i < num; i++) { 594 uint64_t t = rte_rdtsc()+100; 595 596 while (rte_rdtsc() < t) 597 rte_pause(); 598 buf[i]->port ^= xor_val; 599 } 600 601 app_stats.worker_pkts[p->worker_id] += num; 602 if (num > 0) 603 app_stats.worker_bursts[p->worker_id][num-1]++; 604 } 605 if (power_lib_initialised) 606 rte_power_exit(rte_lcore_id()); 607 rte_free(p); 608 return 0; 609 } 610 611 static int 612 init_power_library(void) 613 { 614 int ret = 0, lcore_id; 615 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 616 /* init power management library */ 617 ret = rte_power_init(lcore_id); 618 if (ret) { 619 RTE_LOG(ERR, POWER, 620 "Library initialization failed on core %u\n", 621 lcore_id); 622 /* 623 * Return on first failure, we'll fall back 624 * to non-power operation 625 */ 626 return ret; 627 } 628 } 629 return ret; 630 } 631 632 /* display usage */ 633 static void 634 print_usage(const char *prgname) 635 { 636 printf("%s [EAL options] -- -p PORTMASK\n" 637 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 638 prgname); 639 } 640 641 static int 642 parse_portmask(const char *portmask) 643 { 644 char *end = NULL; 645 unsigned long pm; 646 647 /* parse hexadecimal string */ 648 pm = strtoul(portmask, &end, 16); 649 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 650 return 0; 651 652 return pm; 653 } 654 655 /* Parse the argument given in the command line of the application */ 656 static int 657 parse_args(int argc, char **argv) 658 { 659 int opt; 660 char **argvopt; 661 int option_index; 662 char *prgname = argv[0]; 663 static struct option lgopts[] = { 664 {NULL, 0, 0, 0} 665 }; 666 667 argvopt = argv; 668 669 while ((opt = getopt_long(argc, argvopt, "p:", 670 lgopts, &option_index)) != EOF) { 671 672 switch (opt) { 673 /* portmask */ 674 case 'p': 675 enabled_port_mask = parse_portmask(optarg); 676 if (enabled_port_mask == 0) { 677 printf("invalid portmask\n"); 678 print_usage(prgname); 679 return -1; 680 } 681 break; 682 683 default: 684 print_usage(prgname); 685 return -1; 686 } 687 } 688 689 if (optind <= 1) { 690 print_usage(prgname); 691 return -1; 692 } 693 694 argv[optind-1] = prgname; 695 696 optind = 1; /* reset getopt lib */ 697 return 0; 698 } 699 700 /* Main function, does initialization and calls the per-lcore functions */ 701 int 702 main(int argc, char *argv[]) 703 { 704 struct rte_mempool *mbuf_pool; 705 struct rte_distributor *d; 706 struct rte_ring *dist_tx_ring; 707 struct rte_ring *rx_dist_ring; 708 struct rte_power_core_capabilities lcore_cap; 709 unsigned int lcore_id, worker_id = 0; 710 int distr_core_id = -1, rx_core_id = -1, tx_core_id = -1; 711 unsigned nb_ports; 712 uint16_t portid; 713 uint16_t nb_ports_available; 714 uint64_t t, freq; 715 716 /* catch ctrl-c so we can print on exit */ 717 signal(SIGINT, int_handler); 718 719 /* init EAL */ 720 int ret = rte_eal_init(argc, argv); 721 if (ret < 0) 722 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); 723 argc -= ret; 724 argv += ret; 725 726 /* parse application arguments (after the EAL ones) */ 727 ret = parse_args(argc, argv); 728 if (ret < 0) 729 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); 730 731 if (rte_lcore_count() < 5) 732 rte_exit(EXIT_FAILURE, "Error, This application needs at " 733 "least 5 logical cores to run:\n" 734 "1 lcore for stats (can be core 0)\n" 735 "1 lcore for packet RX\n" 736 "1 lcore for distribution\n" 737 "1 lcore for packet TX\n" 738 "and at least 1 lcore for worker threads\n"); 739 740 if (init_power_library() == 0) 741 power_lib_initialised = 1; 742 743 nb_ports = rte_eth_dev_count_avail(); 744 if (nb_ports == 0) 745 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 746 if (nb_ports != 1 && (nb_ports & 1)) 747 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 748 "when using a single port\n"); 749 750 mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", 751 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, 752 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 753 if (mbuf_pool == NULL) 754 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); 755 nb_ports_available = nb_ports; 756 757 /* initialize all ports */ 758 RTE_ETH_FOREACH_DEV(portid) { 759 /* skip ports that are not enabled */ 760 if ((enabled_port_mask & (1 << portid)) == 0) { 761 printf("\nSkipping disabled port %d\n", portid); 762 nb_ports_available--; 763 continue; 764 } 765 /* init port */ 766 printf("Initializing port %u... done\n", portid); 767 768 if (port_init(portid, mbuf_pool) != 0) 769 rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n", 770 portid); 771 } 772 773 if (!nb_ports_available) { 774 rte_exit(EXIT_FAILURE, 775 "All available ports are disabled. Please set portmask.\n"); 776 } 777 778 d = rte_distributor_create("PKT_DIST", rte_socket_id(), 779 rte_lcore_count() - 4, 780 RTE_DIST_ALG_BURST); 781 if (d == NULL) 782 rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); 783 784 /* 785 * scheduler ring is read by the transmitter core, and written to 786 * by scheduler core 787 */ 788 dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ, 789 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 790 if (dist_tx_ring == NULL) 791 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 792 793 rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ, 794 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 795 if (rx_dist_ring == NULL) 796 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 797 798 if (power_lib_initialised) { 799 /* 800 * Here we'll pre-assign lcore ids to the rx, tx and 801 * distributor workloads if there's higher frequency 802 * on those cores e.g. if Turbo Boost is enabled. 803 * It's also worth mentioning that it will assign cores in a 804 * specific order, so that if there's less than three 805 * available, the higher frequency cores will go to the 806 * distributor first, then rx, then tx. 807 */ 808 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 809 810 rte_power_get_capabilities(lcore_id, &lcore_cap); 811 812 if (lcore_cap.priority != 1) 813 continue; 814 815 if (distr_core_id < 0) { 816 distr_core_id = lcore_id; 817 printf("Distributor on priority core %d\n", 818 lcore_id); 819 continue; 820 } 821 if (rx_core_id < 0) { 822 rx_core_id = lcore_id; 823 printf("Rx on priority core %d\n", 824 lcore_id); 825 continue; 826 } 827 if (tx_core_id < 0) { 828 tx_core_id = lcore_id; 829 printf("Tx on priority core %d\n", 830 lcore_id); 831 continue; 832 } 833 } 834 } 835 836 /* 837 * If there's any of the key workloads left without an lcore_id 838 * after the high performing core assignment above, pre-assign 839 * them here. 840 */ 841 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 842 if (lcore_id == (unsigned int)distr_core_id || 843 lcore_id == (unsigned int)rx_core_id || 844 lcore_id == (unsigned int)tx_core_id) 845 continue; 846 if (distr_core_id < 0) { 847 distr_core_id = lcore_id; 848 printf("Distributor on core %d\n", lcore_id); 849 continue; 850 } 851 if (rx_core_id < 0) { 852 rx_core_id = lcore_id; 853 printf("Rx on core %d\n", lcore_id); 854 continue; 855 } 856 if (tx_core_id < 0) { 857 tx_core_id = lcore_id; 858 printf("Tx on core %d\n", lcore_id); 859 continue; 860 } 861 } 862 863 printf(" tx id %d, dist id %d, rx id %d\n", 864 tx_core_id, 865 distr_core_id, 866 rx_core_id); 867 868 /* 869 * Kick off all the worker threads first, avoiding the pre-assigned 870 * lcore_ids for tx, rx and distributor workloads. 871 */ 872 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 873 if (lcore_id == (unsigned int)distr_core_id || 874 lcore_id == (unsigned int)rx_core_id || 875 lcore_id == (unsigned int)tx_core_id) 876 continue; 877 printf("Starting thread %d as worker, lcore_id %d\n", 878 worker_id, lcore_id); 879 struct lcore_params *p = 880 rte_malloc(NULL, sizeof(*p), 0); 881 if (!p) 882 rte_panic("malloc failure\n"); 883 *p = (struct lcore_params){worker_id++, d, rx_dist_ring, 884 dist_tx_ring, mbuf_pool}; 885 886 rte_eal_remote_launch((lcore_function_t *)lcore_worker, 887 p, lcore_id); 888 } 889 890 /* Start tx core */ 891 rte_eal_remote_launch((lcore_function_t *)lcore_tx, 892 dist_tx_ring, tx_core_id); 893 894 /* Start distributor core */ 895 struct lcore_params *pd = 896 rte_malloc(NULL, sizeof(*pd), 0); 897 if (!pd) 898 rte_panic("malloc failure\n"); 899 *pd = (struct lcore_params){worker_id++, d, 900 rx_dist_ring, dist_tx_ring, mbuf_pool}; 901 rte_eal_remote_launch( 902 (lcore_function_t *)lcore_distributor, 903 pd, distr_core_id); 904 905 /* Start rx core */ 906 struct lcore_params *pr = 907 rte_malloc(NULL, sizeof(*pr), 0); 908 if (!pr) 909 rte_panic("malloc failure\n"); 910 *pr = (struct lcore_params){worker_id++, d, rx_dist_ring, 911 dist_tx_ring, mbuf_pool}; 912 rte_eal_remote_launch((lcore_function_t *)lcore_rx, 913 pr, rx_core_id); 914 915 freq = rte_get_timer_hz(); 916 t = rte_rdtsc() + freq; 917 while (!quit_signal_dist) { 918 if (t < rte_rdtsc()) { 919 print_stats(); 920 t = rte_rdtsc() + freq; 921 } 922 usleep(1000); 923 } 924 925 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 926 if (rte_eal_wait_lcore(lcore_id) < 0) 927 return -1; 928 } 929 930 print_stats(); 931 932 rte_free(pd); 933 rte_free(pr); 934 935 return 0; 936 } 937