1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2017 Intel Corporation 3 */ 4 5 #include <stdint.h> 6 #include <inttypes.h> 7 #include <unistd.h> 8 #include <signal.h> 9 #include <getopt.h> 10 11 #include <rte_eal.h> 12 #include <rte_ethdev.h> 13 #include <rte_cycles.h> 14 #include <rte_malloc.h> 15 #include <rte_debug.h> 16 #include <rte_prefetch.h> 17 #include <rte_distributor.h> 18 #include <rte_pause.h> 19 #include <rte_power.h> 20 21 #define RX_RING_SIZE 1024 22 #define TX_RING_SIZE 1024 23 #define NUM_MBUFS ((64*1024)-1) 24 #define MBUF_CACHE_SIZE 128 25 #define BURST_SIZE 64 26 #define SCHED_RX_RING_SZ 8192 27 #define SCHED_TX_RING_SZ 65536 28 #define BURST_SIZE_TX 32 29 30 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 31 32 #define ANSI_COLOR_RED "\x1b[31m" 33 #define ANSI_COLOR_RESET "\x1b[0m" 34 35 /* mask of enabled ports */ 36 static uint32_t enabled_port_mask; 37 volatile uint8_t quit_signal; 38 volatile uint8_t quit_signal_rx; 39 volatile uint8_t quit_signal_dist; 40 volatile uint8_t quit_signal_work; 41 unsigned int power_lib_initialised; 42 43 static volatile struct app_stats { 44 struct { 45 uint64_t rx_pkts; 46 uint64_t returned_pkts; 47 uint64_t enqueued_pkts; 48 uint64_t enqdrop_pkts; 49 } rx __rte_cache_aligned; 50 int pad1 __rte_cache_aligned; 51 52 struct { 53 uint64_t in_pkts; 54 uint64_t ret_pkts; 55 uint64_t sent_pkts; 56 uint64_t enqdrop_pkts; 57 } dist __rte_cache_aligned; 58 int pad2 __rte_cache_aligned; 59 60 struct { 61 uint64_t dequeue_pkts; 62 uint64_t tx_pkts; 63 uint64_t enqdrop_pkts; 64 } tx __rte_cache_aligned; 65 int pad3 __rte_cache_aligned; 66 67 uint64_t worker_pkts[64] __rte_cache_aligned; 68 69 int pad4 __rte_cache_aligned; 70 71 uint64_t worker_bursts[64][8] __rte_cache_aligned; 72 73 int pad5 __rte_cache_aligned; 74 75 uint64_t port_rx_pkts[64] __rte_cache_aligned; 76 uint64_t port_tx_pkts[64] __rte_cache_aligned; 77 } app_stats; 78 79 struct app_stats prev_app_stats; 80 81 static const struct rte_eth_conf port_conf_default = { 82 .rxmode = { 83 .mq_mode = ETH_MQ_RX_RSS, 84 .max_rx_pkt_len = ETHER_MAX_LEN, 85 }, 86 .txmode = { 87 .mq_mode = ETH_MQ_TX_NONE, 88 }, 89 .rx_adv_conf = { 90 .rss_conf = { 91 .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | 92 ETH_RSS_TCP | ETH_RSS_SCTP, 93 } 94 }, 95 }; 96 97 struct output_buffer { 98 unsigned count; 99 struct rte_mbuf *mbufs[BURST_SIZE]; 100 }; 101 102 static void print_stats(void); 103 104 /* 105 * Initialises a given port using global settings and with the rx buffers 106 * coming from the mbuf_pool passed as parameter 107 */ 108 static inline int 109 port_init(uint16_t port, struct rte_mempool *mbuf_pool) 110 { 111 struct rte_eth_conf port_conf = port_conf_default; 112 const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; 113 int retval; 114 uint16_t q; 115 uint16_t nb_rxd = RX_RING_SIZE; 116 uint16_t nb_txd = TX_RING_SIZE; 117 struct rte_eth_dev_info dev_info; 118 struct rte_eth_txconf txconf; 119 120 if (!rte_eth_dev_is_valid_port(port)) 121 return -1; 122 123 rte_eth_dev_info_get(port, &dev_info); 124 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 125 port_conf.txmode.offloads |= 126 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 127 128 port_conf.rx_adv_conf.rss_conf.rss_hf &= 129 dev_info.flow_type_rss_offloads; 130 if (port_conf.rx_adv_conf.rss_conf.rss_hf != 131 port_conf_default.rx_adv_conf.rss_conf.rss_hf) { 132 printf("Port %u modified RSS hash function based on hardware support," 133 "requested:%#"PRIx64" configured:%#"PRIx64"\n", 134 port, 135 port_conf_default.rx_adv_conf.rss_conf.rss_hf, 136 port_conf.rx_adv_conf.rss_conf.rss_hf); 137 } 138 139 retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); 140 if (retval != 0) 141 return retval; 142 143 retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd); 144 if (retval != 0) 145 return retval; 146 147 for (q = 0; q < rxRings; q++) { 148 retval = rte_eth_rx_queue_setup(port, q, nb_rxd, 149 rte_eth_dev_socket_id(port), 150 NULL, mbuf_pool); 151 if (retval < 0) 152 return retval; 153 } 154 155 txconf = dev_info.default_txconf; 156 txconf.offloads = port_conf.txmode.offloads; 157 for (q = 0; q < txRings; q++) { 158 retval = rte_eth_tx_queue_setup(port, q, nb_txd, 159 rte_eth_dev_socket_id(port), 160 &txconf); 161 if (retval < 0) 162 return retval; 163 } 164 165 retval = rte_eth_dev_start(port); 166 if (retval < 0) 167 return retval; 168 169 struct rte_eth_link link; 170 rte_eth_link_get_nowait(port, &link); 171 while (!link.link_status) { 172 printf("Waiting for Link up on port %"PRIu16"\n", port); 173 sleep(1); 174 rte_eth_link_get_nowait(port, &link); 175 } 176 177 if (!link.link_status) { 178 printf("Link down on port %"PRIu16"\n", port); 179 return 0; 180 } 181 182 struct ether_addr addr; 183 rte_eth_macaddr_get(port, &addr); 184 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 185 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 186 port, 187 addr.addr_bytes[0], addr.addr_bytes[1], 188 addr.addr_bytes[2], addr.addr_bytes[3], 189 addr.addr_bytes[4], addr.addr_bytes[5]); 190 191 rte_eth_promiscuous_enable(port); 192 193 return 0; 194 } 195 196 struct lcore_params { 197 unsigned worker_id; 198 struct rte_distributor *d; 199 struct rte_ring *rx_dist_ring; 200 struct rte_ring *dist_tx_ring; 201 struct rte_mempool *mem_pool; 202 }; 203 204 static int 205 lcore_rx(struct lcore_params *p) 206 { 207 const uint16_t nb_ports = rte_eth_dev_count_avail(); 208 const int socket_id = rte_socket_id(); 209 uint16_t port; 210 struct rte_mbuf *bufs[BURST_SIZE*2]; 211 212 RTE_ETH_FOREACH_DEV(port) { 213 /* skip ports that are not enabled */ 214 if ((enabled_port_mask & (1 << port)) == 0) 215 continue; 216 217 if (rte_eth_dev_socket_id(port) > 0 && 218 rte_eth_dev_socket_id(port) != socket_id) 219 printf("WARNING, port %u is on remote NUMA node to " 220 "RX thread.\n\tPerformance will not " 221 "be optimal.\n", port); 222 } 223 224 printf("\nCore %u doing packet RX.\n", rte_lcore_id()); 225 port = 0; 226 while (!quit_signal_rx) { 227 228 /* skip ports that are not enabled */ 229 if ((enabled_port_mask & (1 << port)) == 0) { 230 if (++port == nb_ports) 231 port = 0; 232 continue; 233 } 234 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 235 BURST_SIZE); 236 if (unlikely(nb_rx == 0)) { 237 if (++port == nb_ports) 238 port = 0; 239 continue; 240 } 241 app_stats.rx.rx_pkts += nb_rx; 242 243 /* 244 * You can run the distributor on the rx core with this code. Returned 245 * packets are then send straight to the tx core. 246 */ 247 #if 0 248 rte_distributor_process(d, bufs, nb_rx); 249 const uint16_t nb_ret = rte_distributor_returned_pktsd, 250 bufs, BURST_SIZE*2); 251 252 app_stats.rx.returned_pkts += nb_ret; 253 if (unlikely(nb_ret == 0)) { 254 if (++port == nb_ports) 255 port = 0; 256 continue; 257 } 258 259 struct rte_ring *tx_ring = p->dist_tx_ring; 260 uint16_t sent = rte_ring_enqueue_burst(tx_ring, 261 (void *)bufs, nb_ret, NULL); 262 #else 263 uint16_t nb_ret = nb_rx; 264 /* 265 * Swap the following two lines if you want the rx traffic 266 * to go directly to tx, no distribution. 267 */ 268 struct rte_ring *out_ring = p->rx_dist_ring; 269 /* struct rte_ring *out_ring = p->dist_tx_ring; */ 270 271 uint16_t sent = rte_ring_enqueue_burst(out_ring, 272 (void *)bufs, nb_ret, NULL); 273 #endif 274 275 app_stats.rx.enqueued_pkts += sent; 276 if (unlikely(sent < nb_ret)) { 277 app_stats.rx.enqdrop_pkts += nb_ret - sent; 278 RTE_LOG_DP(DEBUG, DISTRAPP, 279 "%s:Packet loss due to full ring\n", __func__); 280 while (sent < nb_ret) 281 rte_pktmbuf_free(bufs[sent++]); 282 } 283 if (++port == nb_ports) 284 port = 0; 285 } 286 if (power_lib_initialised) 287 rte_power_exit(rte_lcore_id()); 288 /* set worker & tx threads quit flag */ 289 printf("\nCore %u exiting rx task.\n", rte_lcore_id()); 290 quit_signal = 1; 291 return 0; 292 } 293 294 static inline void 295 flush_one_port(struct output_buffer *outbuf, uint8_t outp) 296 { 297 unsigned int nb_tx = rte_eth_tx_burst(outp, 0, 298 outbuf->mbufs, outbuf->count); 299 app_stats.tx.tx_pkts += outbuf->count; 300 301 if (unlikely(nb_tx < outbuf->count)) { 302 app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx; 303 do { 304 rte_pktmbuf_free(outbuf->mbufs[nb_tx]); 305 } while (++nb_tx < outbuf->count); 306 } 307 outbuf->count = 0; 308 } 309 310 static inline void 311 flush_all_ports(struct output_buffer *tx_buffers) 312 { 313 uint16_t outp; 314 315 RTE_ETH_FOREACH_DEV(outp) { 316 /* skip ports that are not enabled */ 317 if ((enabled_port_mask & (1 << outp)) == 0) 318 continue; 319 320 if (tx_buffers[outp].count == 0) 321 continue; 322 323 flush_one_port(&tx_buffers[outp], outp); 324 } 325 } 326 327 328 329 static int 330 lcore_distributor(struct lcore_params *p) 331 { 332 struct rte_ring *in_r = p->rx_dist_ring; 333 struct rte_ring *out_r = p->dist_tx_ring; 334 struct rte_mbuf *bufs[BURST_SIZE * 4]; 335 struct rte_distributor *d = p->d; 336 337 printf("\nCore %u acting as distributor core.\n", rte_lcore_id()); 338 while (!quit_signal_dist) { 339 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 340 (void *)bufs, BURST_SIZE*1, NULL); 341 if (nb_rx) { 342 app_stats.dist.in_pkts += nb_rx; 343 344 /* Distribute the packets */ 345 rte_distributor_process(d, bufs, nb_rx); 346 /* Handle Returns */ 347 const uint16_t nb_ret = 348 rte_distributor_returned_pkts(d, 349 bufs, BURST_SIZE*2); 350 351 if (unlikely(nb_ret == 0)) 352 continue; 353 app_stats.dist.ret_pkts += nb_ret; 354 355 uint16_t sent = rte_ring_enqueue_burst(out_r, 356 (void *)bufs, nb_ret, NULL); 357 app_stats.dist.sent_pkts += sent; 358 if (unlikely(sent < nb_ret)) { 359 app_stats.dist.enqdrop_pkts += nb_ret - sent; 360 RTE_LOG(DEBUG, DISTRAPP, 361 "%s:Packet loss due to full out ring\n", 362 __func__); 363 while (sent < nb_ret) 364 rte_pktmbuf_free(bufs[sent++]); 365 } 366 } 367 } 368 printf("\nCore %u exiting distributor task.\n", rte_lcore_id()); 369 quit_signal_work = 1; 370 if (power_lib_initialised) 371 rte_power_exit(rte_lcore_id()); 372 rte_distributor_flush(d); 373 /* Unblock any returns so workers can exit */ 374 rte_distributor_clear_returns(d); 375 quit_signal_rx = 1; 376 return 0; 377 } 378 379 380 static int 381 lcore_tx(struct rte_ring *in_r) 382 { 383 static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; 384 const int socket_id = rte_socket_id(); 385 uint16_t port; 386 387 RTE_ETH_FOREACH_DEV(port) { 388 /* skip ports that are not enabled */ 389 if ((enabled_port_mask & (1 << port)) == 0) 390 continue; 391 392 if (rte_eth_dev_socket_id(port) > 0 && 393 rte_eth_dev_socket_id(port) != socket_id) 394 printf("WARNING, port %u is on remote NUMA node to " 395 "TX thread.\n\tPerformance will not " 396 "be optimal.\n", port); 397 } 398 399 printf("\nCore %u doing packet TX.\n", rte_lcore_id()); 400 while (!quit_signal) { 401 402 RTE_ETH_FOREACH_DEV(port) { 403 /* skip ports that are not enabled */ 404 if ((enabled_port_mask & (1 << port)) == 0) 405 continue; 406 407 struct rte_mbuf *bufs[BURST_SIZE_TX]; 408 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 409 (void *)bufs, BURST_SIZE_TX, NULL); 410 app_stats.tx.dequeue_pkts += nb_rx; 411 412 /* if we get no traffic, flush anything we have */ 413 if (unlikely(nb_rx == 0)) { 414 flush_all_ports(tx_buffers); 415 continue; 416 } 417 418 /* for traffic we receive, queue it up for transmit */ 419 uint16_t i; 420 rte_prefetch_non_temporal((void *)bufs[0]); 421 rte_prefetch_non_temporal((void *)bufs[1]); 422 rte_prefetch_non_temporal((void *)bufs[2]); 423 for (i = 0; i < nb_rx; i++) { 424 struct output_buffer *outbuf; 425 uint8_t outp; 426 rte_prefetch_non_temporal((void *)bufs[i + 3]); 427 /* 428 * workers should update in_port to hold the 429 * output port value 430 */ 431 outp = bufs[i]->port; 432 /* skip ports that are not enabled */ 433 if ((enabled_port_mask & (1 << outp)) == 0) 434 continue; 435 436 outbuf = &tx_buffers[outp]; 437 outbuf->mbufs[outbuf->count++] = bufs[i]; 438 if (outbuf->count == BURST_SIZE_TX) 439 flush_one_port(outbuf, outp); 440 } 441 } 442 } 443 if (power_lib_initialised) 444 rte_power_exit(rte_lcore_id()); 445 printf("\nCore %u exiting tx task.\n", rte_lcore_id()); 446 return 0; 447 } 448 449 static void 450 int_handler(int sig_num) 451 { 452 printf("Exiting on signal %d\n", sig_num); 453 /* set quit flag for rx thread to exit */ 454 quit_signal_dist = 1; 455 } 456 457 static void 458 print_stats(void) 459 { 460 struct rte_eth_stats eth_stats; 461 unsigned int i, j; 462 const unsigned int num_workers = rte_lcore_count() - 4; 463 464 RTE_ETH_FOREACH_DEV(i) { 465 rte_eth_stats_get(i, ð_stats); 466 app_stats.port_rx_pkts[i] = eth_stats.ipackets; 467 app_stats.port_tx_pkts[i] = eth_stats.opackets; 468 } 469 470 printf("\n\nRX Thread:\n"); 471 RTE_ETH_FOREACH_DEV(i) { 472 printf("Port %u Pktsin : %5.2f\n", i, 473 (app_stats.port_rx_pkts[i] - 474 prev_app_stats.port_rx_pkts[i])/1000000.0); 475 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i]; 476 } 477 printf(" - Received: %5.2f\n", 478 (app_stats.rx.rx_pkts - 479 prev_app_stats.rx.rx_pkts)/1000000.0); 480 printf(" - Returned: %5.2f\n", 481 (app_stats.rx.returned_pkts - 482 prev_app_stats.rx.returned_pkts)/1000000.0); 483 printf(" - Enqueued: %5.2f\n", 484 (app_stats.rx.enqueued_pkts - 485 prev_app_stats.rx.enqueued_pkts)/1000000.0); 486 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 487 (app_stats.rx.enqdrop_pkts - 488 prev_app_stats.rx.enqdrop_pkts)/1000000.0, 489 ANSI_COLOR_RESET); 490 491 printf("Distributor thread:\n"); 492 printf(" - In: %5.2f\n", 493 (app_stats.dist.in_pkts - 494 prev_app_stats.dist.in_pkts)/1000000.0); 495 printf(" - Returned: %5.2f\n", 496 (app_stats.dist.ret_pkts - 497 prev_app_stats.dist.ret_pkts)/1000000.0); 498 printf(" - Sent: %5.2f\n", 499 (app_stats.dist.sent_pkts - 500 prev_app_stats.dist.sent_pkts)/1000000.0); 501 printf(" - Dropped %s%5.2f%s\n", ANSI_COLOR_RED, 502 (app_stats.dist.enqdrop_pkts - 503 prev_app_stats.dist.enqdrop_pkts)/1000000.0, 504 ANSI_COLOR_RESET); 505 506 printf("TX thread:\n"); 507 printf(" - Dequeued: %5.2f\n", 508 (app_stats.tx.dequeue_pkts - 509 prev_app_stats.tx.dequeue_pkts)/1000000.0); 510 RTE_ETH_FOREACH_DEV(i) { 511 printf("Port %u Pktsout: %5.2f\n", 512 i, (app_stats.port_tx_pkts[i] - 513 prev_app_stats.port_tx_pkts[i])/1000000.0); 514 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i]; 515 } 516 printf(" - Transmitted: %5.2f\n", 517 (app_stats.tx.tx_pkts - 518 prev_app_stats.tx.tx_pkts)/1000000.0); 519 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 520 (app_stats.tx.enqdrop_pkts - 521 prev_app_stats.tx.enqdrop_pkts)/1000000.0, 522 ANSI_COLOR_RESET); 523 524 prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts; 525 prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts; 526 prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts; 527 prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts; 528 prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts; 529 prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts; 530 prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts; 531 prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts; 532 prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts; 533 prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts; 534 prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts; 535 536 for (i = 0; i < num_workers; i++) { 537 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i, 538 (app_stats.worker_pkts[i] - 539 prev_app_stats.worker_pkts[i])/1000000.0); 540 for (j = 0; j < 8; j++) { 541 printf("%"PRIu64" ", app_stats.worker_bursts[i][j]); 542 app_stats.worker_bursts[i][j] = 0; 543 } 544 printf("\n"); 545 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i]; 546 } 547 } 548 549 static int 550 lcore_worker(struct lcore_params *p) 551 { 552 struct rte_distributor *d = p->d; 553 const unsigned id = p->worker_id; 554 unsigned int num = 0; 555 unsigned int i; 556 557 /* 558 * for single port, xor_val will be zero so we won't modify the output 559 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa 560 */ 561 const unsigned xor_val = (rte_eth_dev_count_avail() > 1); 562 struct rte_mbuf *buf[8] __rte_cache_aligned; 563 564 for (i = 0; i < 8; i++) 565 buf[i] = NULL; 566 567 app_stats.worker_pkts[p->worker_id] = 1; 568 569 printf("\nCore %u acting as worker core.\n", rte_lcore_id()); 570 while (!quit_signal_work) { 571 num = rte_distributor_get_pkt(d, id, buf, buf, num); 572 /* Do a little bit of work for each packet */ 573 for (i = 0; i < num; i++) { 574 uint64_t t = rte_rdtsc()+100; 575 576 while (rte_rdtsc() < t) 577 rte_pause(); 578 buf[i]->port ^= xor_val; 579 } 580 581 app_stats.worker_pkts[p->worker_id] += num; 582 if (num > 0) 583 app_stats.worker_bursts[p->worker_id][num-1]++; 584 } 585 if (power_lib_initialised) 586 rte_power_exit(rte_lcore_id()); 587 rte_free(p); 588 return 0; 589 } 590 591 static int 592 init_power_library(void) 593 { 594 int ret = 0, lcore_id; 595 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 596 /* init power management library */ 597 ret = rte_power_init(lcore_id); 598 if (ret) { 599 RTE_LOG(ERR, POWER, 600 "Library initialization failed on core %u\n", 601 lcore_id); 602 /* 603 * Return on first failure, we'll fall back 604 * to non-power operation 605 */ 606 return ret; 607 } 608 } 609 return ret; 610 } 611 612 /* display usage */ 613 static void 614 print_usage(const char *prgname) 615 { 616 printf("%s [EAL options] -- -p PORTMASK\n" 617 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 618 prgname); 619 } 620 621 static int 622 parse_portmask(const char *portmask) 623 { 624 char *end = NULL; 625 unsigned long pm; 626 627 /* parse hexadecimal string */ 628 pm = strtoul(portmask, &end, 16); 629 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 630 return -1; 631 632 if (pm == 0) 633 return -1; 634 635 return pm; 636 } 637 638 /* Parse the argument given in the command line of the application */ 639 static int 640 parse_args(int argc, char **argv) 641 { 642 int opt; 643 char **argvopt; 644 int option_index; 645 char *prgname = argv[0]; 646 static struct option lgopts[] = { 647 {NULL, 0, 0, 0} 648 }; 649 650 argvopt = argv; 651 652 while ((opt = getopt_long(argc, argvopt, "p:", 653 lgopts, &option_index)) != EOF) { 654 655 switch (opt) { 656 /* portmask */ 657 case 'p': 658 enabled_port_mask = parse_portmask(optarg); 659 if (enabled_port_mask == 0) { 660 printf("invalid portmask\n"); 661 print_usage(prgname); 662 return -1; 663 } 664 break; 665 666 default: 667 print_usage(prgname); 668 return -1; 669 } 670 } 671 672 if (optind <= 1) { 673 print_usage(prgname); 674 return -1; 675 } 676 677 argv[optind-1] = prgname; 678 679 optind = 1; /* reset getopt lib */ 680 return 0; 681 } 682 683 /* Main function, does initialization and calls the per-lcore functions */ 684 int 685 main(int argc, char *argv[]) 686 { 687 struct rte_mempool *mbuf_pool; 688 struct rte_distributor *d; 689 struct rte_ring *dist_tx_ring; 690 struct rte_ring *rx_dist_ring; 691 struct rte_power_core_capabilities lcore_cap; 692 unsigned int lcore_id, worker_id = 0; 693 int distr_core_id = -1, rx_core_id = -1, tx_core_id = -1; 694 unsigned nb_ports; 695 uint16_t portid; 696 uint16_t nb_ports_available; 697 uint64_t t, freq; 698 699 /* catch ctrl-c so we can print on exit */ 700 signal(SIGINT, int_handler); 701 702 /* init EAL */ 703 int ret = rte_eal_init(argc, argv); 704 if (ret < 0) 705 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); 706 argc -= ret; 707 argv += ret; 708 709 /* parse application arguments (after the EAL ones) */ 710 ret = parse_args(argc, argv); 711 if (ret < 0) 712 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); 713 714 if (rte_lcore_count() < 5) 715 rte_exit(EXIT_FAILURE, "Error, This application needs at " 716 "least 5 logical cores to run:\n" 717 "1 lcore for stats (can be core 0)\n" 718 "1 lcore for packet RX\n" 719 "1 lcore for distribution\n" 720 "1 lcore for packet TX\n" 721 "and at least 1 lcore for worker threads\n"); 722 723 if (init_power_library() == 0) 724 power_lib_initialised = 1; 725 726 nb_ports = rte_eth_dev_count_avail(); 727 if (nb_ports == 0) 728 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 729 if (nb_ports != 1 && (nb_ports & 1)) 730 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 731 "when using a single port\n"); 732 733 mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", 734 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, 735 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 736 if (mbuf_pool == NULL) 737 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); 738 nb_ports_available = nb_ports; 739 740 /* initialize all ports */ 741 RTE_ETH_FOREACH_DEV(portid) { 742 /* skip ports that are not enabled */ 743 if ((enabled_port_mask & (1 << portid)) == 0) { 744 printf("\nSkipping disabled port %d\n", portid); 745 nb_ports_available--; 746 continue; 747 } 748 /* init port */ 749 printf("Initializing port %u... done\n", portid); 750 751 if (port_init(portid, mbuf_pool) != 0) 752 rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n", 753 portid); 754 } 755 756 if (!nb_ports_available) { 757 rte_exit(EXIT_FAILURE, 758 "All available ports are disabled. Please set portmask.\n"); 759 } 760 761 d = rte_distributor_create("PKT_DIST", rte_socket_id(), 762 rte_lcore_count() - 4, 763 RTE_DIST_ALG_BURST); 764 if (d == NULL) 765 rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); 766 767 /* 768 * scheduler ring is read by the transmitter core, and written to 769 * by scheduler core 770 */ 771 dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ, 772 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 773 if (dist_tx_ring == NULL) 774 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 775 776 rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ, 777 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 778 if (rx_dist_ring == NULL) 779 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 780 781 if (power_lib_initialised) { 782 /* 783 * Here we'll pre-assign lcore ids to the rx, tx and 784 * distributor workloads if there's higher frequency 785 * on those cores e.g. if Turbo Boost is enabled. 786 * It's also worth mentioning that it will assign cores in a 787 * specific order, so that if there's less than three 788 * available, the higher frequency cores will go to the 789 * distributor first, then rx, then tx. 790 */ 791 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 792 793 rte_power_get_capabilities(lcore_id, &lcore_cap); 794 795 if (lcore_cap.priority != 1) 796 continue; 797 798 if (distr_core_id < 0) { 799 distr_core_id = lcore_id; 800 printf("Distributor on priority core %d\n", 801 lcore_id); 802 continue; 803 } 804 if (rx_core_id < 0) { 805 rx_core_id = lcore_id; 806 printf("Rx on priority core %d\n", 807 lcore_id); 808 continue; 809 } 810 if (tx_core_id < 0) { 811 tx_core_id = lcore_id; 812 printf("Tx on priority core %d\n", 813 lcore_id); 814 continue; 815 } 816 } 817 } 818 819 /* 820 * If there's any of the key workloads left without an lcore_id 821 * after the high performing core assignment above, pre-assign 822 * them here. 823 */ 824 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 825 if (lcore_id == (unsigned int)distr_core_id || 826 lcore_id == (unsigned int)rx_core_id || 827 lcore_id == (unsigned int)tx_core_id) 828 continue; 829 if (distr_core_id < 0) { 830 distr_core_id = lcore_id; 831 printf("Distributor on core %d\n", lcore_id); 832 continue; 833 } 834 if (rx_core_id < 0) { 835 rx_core_id = lcore_id; 836 printf("Rx on core %d\n", lcore_id); 837 continue; 838 } 839 if (tx_core_id < 0) { 840 tx_core_id = lcore_id; 841 printf("Tx on core %d\n", lcore_id); 842 continue; 843 } 844 } 845 846 printf(" tx id %d, dist id %d, rx id %d\n", 847 tx_core_id, 848 distr_core_id, 849 rx_core_id); 850 851 /* 852 * Kick off all the worker threads first, avoiding the pre-assigned 853 * lcore_ids for tx, rx and distributor workloads. 854 */ 855 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 856 if (lcore_id == (unsigned int)distr_core_id || 857 lcore_id == (unsigned int)rx_core_id || 858 lcore_id == (unsigned int)tx_core_id) 859 continue; 860 printf("Starting thread %d as worker, lcore_id %d\n", 861 worker_id, lcore_id); 862 struct lcore_params *p = 863 rte_malloc(NULL, sizeof(*p), 0); 864 if (!p) 865 rte_panic("malloc failure\n"); 866 *p = (struct lcore_params){worker_id++, d, rx_dist_ring, 867 dist_tx_ring, mbuf_pool}; 868 869 rte_eal_remote_launch((lcore_function_t *)lcore_worker, 870 p, lcore_id); 871 } 872 873 /* Start tx core */ 874 rte_eal_remote_launch((lcore_function_t *)lcore_tx, 875 dist_tx_ring, tx_core_id); 876 877 /* Start distributor core */ 878 struct lcore_params *pd = 879 rte_malloc(NULL, sizeof(*pd), 0); 880 if (!pd) 881 rte_panic("malloc failure\n"); 882 *pd = (struct lcore_params){worker_id++, d, 883 rx_dist_ring, dist_tx_ring, mbuf_pool}; 884 rte_eal_remote_launch( 885 (lcore_function_t *)lcore_distributor, 886 pd, distr_core_id); 887 888 /* Start rx core */ 889 struct lcore_params *pr = 890 rte_malloc(NULL, sizeof(*pr), 0); 891 if (!pr) 892 rte_panic("malloc failure\n"); 893 *pr = (struct lcore_params){worker_id++, d, rx_dist_ring, 894 dist_tx_ring, mbuf_pool}; 895 rte_eal_remote_launch((lcore_function_t *)lcore_rx, 896 pr, rx_core_id); 897 898 freq = rte_get_timer_hz(); 899 t = rte_rdtsc() + freq; 900 while (!quit_signal_dist) { 901 if (t < rte_rdtsc()) { 902 print_stats(); 903 t = rte_rdtsc() + freq; 904 } 905 usleep(1000); 906 } 907 908 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 909 if (rte_eal_wait_lcore(lcore_id) < 0) 910 return -1; 911 } 912 913 print_stats(); 914 915 rte_free(pd); 916 rte_free(pr); 917 918 return 0; 919 } 920