1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2017 Intel Corporation 3 */ 4 5 #include <stdint.h> 6 #include <inttypes.h> 7 #include <unistd.h> 8 #include <signal.h> 9 #include <getopt.h> 10 11 #include <rte_eal.h> 12 #include <rte_ethdev.h> 13 #include <rte_cycles.h> 14 #include <rte_malloc.h> 15 #include <rte_debug.h> 16 #include <rte_prefetch.h> 17 #include <rte_distributor.h> 18 #include <rte_pause.h> 19 20 #define RX_RING_SIZE 512 21 #define TX_RING_SIZE 512 22 #define NUM_MBUFS ((64*1024)-1) 23 #define MBUF_CACHE_SIZE 128 24 #define BURST_SIZE 64 25 #define SCHED_RX_RING_SZ 8192 26 #define SCHED_TX_RING_SZ 65536 27 #define BURST_SIZE_TX 32 28 29 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 30 31 #define ANSI_COLOR_RED "\x1b[31m" 32 #define ANSI_COLOR_RESET "\x1b[0m" 33 34 /* mask of enabled ports */ 35 static uint32_t enabled_port_mask; 36 volatile uint8_t quit_signal; 37 volatile uint8_t quit_signal_rx; 38 volatile uint8_t quit_signal_dist; 39 volatile uint8_t quit_signal_work; 40 41 static volatile struct app_stats { 42 struct { 43 uint64_t rx_pkts; 44 uint64_t returned_pkts; 45 uint64_t enqueued_pkts; 46 uint64_t enqdrop_pkts; 47 } rx __rte_cache_aligned; 48 int pad1 __rte_cache_aligned; 49 50 struct { 51 uint64_t in_pkts; 52 uint64_t ret_pkts; 53 uint64_t sent_pkts; 54 uint64_t enqdrop_pkts; 55 } dist __rte_cache_aligned; 56 int pad2 __rte_cache_aligned; 57 58 struct { 59 uint64_t dequeue_pkts; 60 uint64_t tx_pkts; 61 uint64_t enqdrop_pkts; 62 } tx __rte_cache_aligned; 63 int pad3 __rte_cache_aligned; 64 65 uint64_t worker_pkts[64] __rte_cache_aligned; 66 67 int pad4 __rte_cache_aligned; 68 69 uint64_t worker_bursts[64][8] __rte_cache_aligned; 70 71 int pad5 __rte_cache_aligned; 72 73 uint64_t port_rx_pkts[64] __rte_cache_aligned; 74 uint64_t port_tx_pkts[64] __rte_cache_aligned; 75 } app_stats; 76 77 struct app_stats prev_app_stats; 78 79 static const struct rte_eth_conf port_conf_default = { 80 .rxmode = { 81 .mq_mode = ETH_MQ_RX_RSS, 82 .max_rx_pkt_len = ETHER_MAX_LEN, 83 .ignore_offload_bitfield = 1, 84 }, 85 .txmode = { 86 .mq_mode = ETH_MQ_TX_NONE, 87 }, 88 .rx_adv_conf = { 89 .rss_conf = { 90 .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | 91 ETH_RSS_TCP | ETH_RSS_SCTP, 92 } 93 }, 94 }; 95 96 struct output_buffer { 97 unsigned count; 98 struct rte_mbuf *mbufs[BURST_SIZE]; 99 }; 100 101 static void print_stats(void); 102 103 /* 104 * Initialises a given port using global settings and with the rx buffers 105 * coming from the mbuf_pool passed as parameter 106 */ 107 static inline int 108 port_init(uint16_t port, struct rte_mempool *mbuf_pool) 109 { 110 struct rte_eth_conf port_conf = port_conf_default; 111 const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; 112 int retval; 113 uint16_t q; 114 uint16_t nb_rxd = RX_RING_SIZE; 115 uint16_t nb_txd = TX_RING_SIZE; 116 struct rte_eth_dev_info dev_info; 117 struct rte_eth_txconf txconf; 118 119 if (port >= rte_eth_dev_count()) 120 return -1; 121 122 rte_eth_dev_info_get(port, &dev_info); 123 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 124 port_conf.txmode.offloads |= 125 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 126 127 retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); 128 if (retval != 0) 129 return retval; 130 131 retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd); 132 if (retval != 0) 133 return retval; 134 135 for (q = 0; q < rxRings; q++) { 136 retval = rte_eth_rx_queue_setup(port, q, nb_rxd, 137 rte_eth_dev_socket_id(port), 138 NULL, mbuf_pool); 139 if (retval < 0) 140 return retval; 141 } 142 143 txconf = dev_info.default_txconf; 144 txconf.txq_flags = ETH_TXQ_FLAGS_IGNORE; 145 txconf.offloads = port_conf.txmode.offloads; 146 for (q = 0; q < txRings; q++) { 147 retval = rte_eth_tx_queue_setup(port, q, nb_txd, 148 rte_eth_dev_socket_id(port), 149 &txconf); 150 if (retval < 0) 151 return retval; 152 } 153 154 retval = rte_eth_dev_start(port); 155 if (retval < 0) 156 return retval; 157 158 struct rte_eth_link link; 159 rte_eth_link_get_nowait(port, &link); 160 while (!link.link_status) { 161 printf("Waiting for Link up on port %"PRIu16"\n", port); 162 sleep(1); 163 rte_eth_link_get_nowait(port, &link); 164 } 165 166 if (!link.link_status) { 167 printf("Link down on port %"PRIu16"\n", port); 168 return 0; 169 } 170 171 struct ether_addr addr; 172 rte_eth_macaddr_get(port, &addr); 173 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 174 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 175 port, 176 addr.addr_bytes[0], addr.addr_bytes[1], 177 addr.addr_bytes[2], addr.addr_bytes[3], 178 addr.addr_bytes[4], addr.addr_bytes[5]); 179 180 rte_eth_promiscuous_enable(port); 181 182 return 0; 183 } 184 185 struct lcore_params { 186 unsigned worker_id; 187 struct rte_distributor *d; 188 struct rte_ring *rx_dist_ring; 189 struct rte_ring *dist_tx_ring; 190 struct rte_mempool *mem_pool; 191 }; 192 193 static int 194 lcore_rx(struct lcore_params *p) 195 { 196 const uint16_t nb_ports = rte_eth_dev_count(); 197 const int socket_id = rte_socket_id(); 198 uint16_t port; 199 struct rte_mbuf *bufs[BURST_SIZE*2]; 200 201 for (port = 0; port < nb_ports; port++) { 202 /* skip ports that are not enabled */ 203 if ((enabled_port_mask & (1 << port)) == 0) 204 continue; 205 206 if (rte_eth_dev_socket_id(port) > 0 && 207 rte_eth_dev_socket_id(port) != socket_id) 208 printf("WARNING, port %u is on remote NUMA node to " 209 "RX thread.\n\tPerformance will not " 210 "be optimal.\n", port); 211 } 212 213 printf("\nCore %u doing packet RX.\n", rte_lcore_id()); 214 port = 0; 215 while (!quit_signal_rx) { 216 217 /* skip ports that are not enabled */ 218 if ((enabled_port_mask & (1 << port)) == 0) { 219 if (++port == nb_ports) 220 port = 0; 221 continue; 222 } 223 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 224 BURST_SIZE); 225 if (unlikely(nb_rx == 0)) { 226 if (++port == nb_ports) 227 port = 0; 228 continue; 229 } 230 app_stats.rx.rx_pkts += nb_rx; 231 232 /* 233 * You can run the distributor on the rx core with this code. Returned 234 * packets are then send straight to the tx core. 235 */ 236 #if 0 237 rte_distributor_process(d, bufs, nb_rx); 238 const uint16_t nb_ret = rte_distributor_returned_pktsd, 239 bufs, BURST_SIZE*2); 240 241 app_stats.rx.returned_pkts += nb_ret; 242 if (unlikely(nb_ret == 0)) { 243 if (++port == nb_ports) 244 port = 0; 245 continue; 246 } 247 248 struct rte_ring *tx_ring = p->dist_tx_ring; 249 uint16_t sent = rte_ring_enqueue_burst(tx_ring, 250 (void *)bufs, nb_ret, NULL); 251 #else 252 uint16_t nb_ret = nb_rx; 253 /* 254 * Swap the following two lines if you want the rx traffic 255 * to go directly to tx, no distribution. 256 */ 257 struct rte_ring *out_ring = p->rx_dist_ring; 258 /* struct rte_ring *out_ring = p->dist_tx_ring; */ 259 260 uint16_t sent = rte_ring_enqueue_burst(out_ring, 261 (void *)bufs, nb_ret, NULL); 262 #endif 263 264 app_stats.rx.enqueued_pkts += sent; 265 if (unlikely(sent < nb_ret)) { 266 app_stats.rx.enqdrop_pkts += nb_ret - sent; 267 RTE_LOG_DP(DEBUG, DISTRAPP, 268 "%s:Packet loss due to full ring\n", __func__); 269 while (sent < nb_ret) 270 rte_pktmbuf_free(bufs[sent++]); 271 } 272 if (++port == nb_ports) 273 port = 0; 274 } 275 /* set worker & tx threads quit flag */ 276 printf("\nCore %u exiting rx task.\n", rte_lcore_id()); 277 quit_signal = 1; 278 return 0; 279 } 280 281 static inline void 282 flush_one_port(struct output_buffer *outbuf, uint8_t outp) 283 { 284 unsigned int nb_tx = rte_eth_tx_burst(outp, 0, 285 outbuf->mbufs, outbuf->count); 286 app_stats.tx.tx_pkts += outbuf->count; 287 288 if (unlikely(nb_tx < outbuf->count)) { 289 app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx; 290 do { 291 rte_pktmbuf_free(outbuf->mbufs[nb_tx]); 292 } while (++nb_tx < outbuf->count); 293 } 294 outbuf->count = 0; 295 } 296 297 static inline void 298 flush_all_ports(struct output_buffer *tx_buffers, uint16_t nb_ports) 299 { 300 uint16_t outp; 301 302 for (outp = 0; outp < nb_ports; outp++) { 303 /* skip ports that are not enabled */ 304 if ((enabled_port_mask & (1 << outp)) == 0) 305 continue; 306 307 if (tx_buffers[outp].count == 0) 308 continue; 309 310 flush_one_port(&tx_buffers[outp], outp); 311 } 312 } 313 314 315 316 static int 317 lcore_distributor(struct lcore_params *p) 318 { 319 struct rte_ring *in_r = p->rx_dist_ring; 320 struct rte_ring *out_r = p->dist_tx_ring; 321 struct rte_mbuf *bufs[BURST_SIZE * 4]; 322 struct rte_distributor *d = p->d; 323 324 printf("\nCore %u acting as distributor core.\n", rte_lcore_id()); 325 while (!quit_signal_dist) { 326 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 327 (void *)bufs, BURST_SIZE*1, NULL); 328 if (nb_rx) { 329 app_stats.dist.in_pkts += nb_rx; 330 331 /* Distribute the packets */ 332 rte_distributor_process(d, bufs, nb_rx); 333 /* Handle Returns */ 334 const uint16_t nb_ret = 335 rte_distributor_returned_pkts(d, 336 bufs, BURST_SIZE*2); 337 338 if (unlikely(nb_ret == 0)) 339 continue; 340 app_stats.dist.ret_pkts += nb_ret; 341 342 uint16_t sent = rte_ring_enqueue_burst(out_r, 343 (void *)bufs, nb_ret, NULL); 344 app_stats.dist.sent_pkts += sent; 345 if (unlikely(sent < nb_ret)) { 346 app_stats.dist.enqdrop_pkts += nb_ret - sent; 347 RTE_LOG(DEBUG, DISTRAPP, 348 "%s:Packet loss due to full out ring\n", 349 __func__); 350 while (sent < nb_ret) 351 rte_pktmbuf_free(bufs[sent++]); 352 } 353 } 354 } 355 printf("\nCore %u exiting distributor task.\n", rte_lcore_id()); 356 quit_signal_work = 1; 357 358 rte_distributor_flush(d); 359 /* Unblock any returns so workers can exit */ 360 rte_distributor_clear_returns(d); 361 quit_signal_rx = 1; 362 return 0; 363 } 364 365 366 static int 367 lcore_tx(struct rte_ring *in_r) 368 { 369 static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; 370 const uint16_t nb_ports = rte_eth_dev_count(); 371 const int socket_id = rte_socket_id(); 372 uint16_t port; 373 374 for (port = 0; port < nb_ports; port++) { 375 /* skip ports that are not enabled */ 376 if ((enabled_port_mask & (1 << port)) == 0) 377 continue; 378 379 if (rte_eth_dev_socket_id(port) > 0 && 380 rte_eth_dev_socket_id(port) != socket_id) 381 printf("WARNING, port %u is on remote NUMA node to " 382 "TX thread.\n\tPerformance will not " 383 "be optimal.\n", port); 384 } 385 386 printf("\nCore %u doing packet TX.\n", rte_lcore_id()); 387 while (!quit_signal) { 388 389 for (port = 0; port < nb_ports; port++) { 390 /* skip ports that are not enabled */ 391 if ((enabled_port_mask & (1 << port)) == 0) 392 continue; 393 394 struct rte_mbuf *bufs[BURST_SIZE_TX]; 395 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 396 (void *)bufs, BURST_SIZE_TX, NULL); 397 app_stats.tx.dequeue_pkts += nb_rx; 398 399 /* if we get no traffic, flush anything we have */ 400 if (unlikely(nb_rx == 0)) { 401 flush_all_ports(tx_buffers, nb_ports); 402 continue; 403 } 404 405 /* for traffic we receive, queue it up for transmit */ 406 uint16_t i; 407 rte_prefetch_non_temporal((void *)bufs[0]); 408 rte_prefetch_non_temporal((void *)bufs[1]); 409 rte_prefetch_non_temporal((void *)bufs[2]); 410 for (i = 0; i < nb_rx; i++) { 411 struct output_buffer *outbuf; 412 uint8_t outp; 413 rte_prefetch_non_temporal((void *)bufs[i + 3]); 414 /* 415 * workers should update in_port to hold the 416 * output port value 417 */ 418 outp = bufs[i]->port; 419 /* skip ports that are not enabled */ 420 if ((enabled_port_mask & (1 << outp)) == 0) 421 continue; 422 423 outbuf = &tx_buffers[outp]; 424 outbuf->mbufs[outbuf->count++] = bufs[i]; 425 if (outbuf->count == BURST_SIZE_TX) 426 flush_one_port(outbuf, outp); 427 } 428 } 429 } 430 printf("\nCore %u exiting tx task.\n", rte_lcore_id()); 431 return 0; 432 } 433 434 static void 435 int_handler(int sig_num) 436 { 437 printf("Exiting on signal %d\n", sig_num); 438 /* set quit flag for rx thread to exit */ 439 quit_signal_dist = 1; 440 } 441 442 static void 443 print_stats(void) 444 { 445 struct rte_eth_stats eth_stats; 446 unsigned int i, j; 447 const unsigned int num_workers = rte_lcore_count() - 4; 448 449 for (i = 0; i < rte_eth_dev_count(); i++) { 450 rte_eth_stats_get(i, ð_stats); 451 app_stats.port_rx_pkts[i] = eth_stats.ipackets; 452 app_stats.port_tx_pkts[i] = eth_stats.opackets; 453 } 454 455 printf("\n\nRX Thread:\n"); 456 for (i = 0; i < rte_eth_dev_count(); i++) { 457 printf("Port %u Pktsin : %5.2f\n", i, 458 (app_stats.port_rx_pkts[i] - 459 prev_app_stats.port_rx_pkts[i])/1000000.0); 460 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i]; 461 } 462 printf(" - Received: %5.2f\n", 463 (app_stats.rx.rx_pkts - 464 prev_app_stats.rx.rx_pkts)/1000000.0); 465 printf(" - Returned: %5.2f\n", 466 (app_stats.rx.returned_pkts - 467 prev_app_stats.rx.returned_pkts)/1000000.0); 468 printf(" - Enqueued: %5.2f\n", 469 (app_stats.rx.enqueued_pkts - 470 prev_app_stats.rx.enqueued_pkts)/1000000.0); 471 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 472 (app_stats.rx.enqdrop_pkts - 473 prev_app_stats.rx.enqdrop_pkts)/1000000.0, 474 ANSI_COLOR_RESET); 475 476 printf("Distributor thread:\n"); 477 printf(" - In: %5.2f\n", 478 (app_stats.dist.in_pkts - 479 prev_app_stats.dist.in_pkts)/1000000.0); 480 printf(" - Returned: %5.2f\n", 481 (app_stats.dist.ret_pkts - 482 prev_app_stats.dist.ret_pkts)/1000000.0); 483 printf(" - Sent: %5.2f\n", 484 (app_stats.dist.sent_pkts - 485 prev_app_stats.dist.sent_pkts)/1000000.0); 486 printf(" - Dropped %s%5.2f%s\n", ANSI_COLOR_RED, 487 (app_stats.dist.enqdrop_pkts - 488 prev_app_stats.dist.enqdrop_pkts)/1000000.0, 489 ANSI_COLOR_RESET); 490 491 printf("TX thread:\n"); 492 printf(" - Dequeued: %5.2f\n", 493 (app_stats.tx.dequeue_pkts - 494 prev_app_stats.tx.dequeue_pkts)/1000000.0); 495 for (i = 0; i < rte_eth_dev_count(); i++) { 496 printf("Port %u Pktsout: %5.2f\n", 497 i, (app_stats.port_tx_pkts[i] - 498 prev_app_stats.port_tx_pkts[i])/1000000.0); 499 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i]; 500 } 501 printf(" - Transmitted: %5.2f\n", 502 (app_stats.tx.tx_pkts - 503 prev_app_stats.tx.tx_pkts)/1000000.0); 504 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 505 (app_stats.tx.enqdrop_pkts - 506 prev_app_stats.tx.enqdrop_pkts)/1000000.0, 507 ANSI_COLOR_RESET); 508 509 prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts; 510 prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts; 511 prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts; 512 prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts; 513 prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts; 514 prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts; 515 prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts; 516 prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts; 517 prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts; 518 prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts; 519 prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts; 520 521 for (i = 0; i < num_workers; i++) { 522 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i, 523 (app_stats.worker_pkts[i] - 524 prev_app_stats.worker_pkts[i])/1000000.0); 525 for (j = 0; j < 8; j++) { 526 printf("%"PRIu64" ", app_stats.worker_bursts[i][j]); 527 app_stats.worker_bursts[i][j] = 0; 528 } 529 printf("\n"); 530 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i]; 531 } 532 } 533 534 static int 535 lcore_worker(struct lcore_params *p) 536 { 537 struct rte_distributor *d = p->d; 538 const unsigned id = p->worker_id; 539 unsigned int num = 0; 540 unsigned int i; 541 542 /* 543 * for single port, xor_val will be zero so we won't modify the output 544 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa 545 */ 546 const unsigned xor_val = (rte_eth_dev_count() > 1); 547 struct rte_mbuf *buf[8] __rte_cache_aligned; 548 549 for (i = 0; i < 8; i++) 550 buf[i] = NULL; 551 552 app_stats.worker_pkts[p->worker_id] = 1; 553 554 printf("\nCore %u acting as worker core.\n", rte_lcore_id()); 555 while (!quit_signal_work) { 556 num = rte_distributor_get_pkt(d, id, buf, buf, num); 557 /* Do a little bit of work for each packet */ 558 for (i = 0; i < num; i++) { 559 uint64_t t = rte_rdtsc()+100; 560 561 while (rte_rdtsc() < t) 562 rte_pause(); 563 buf[i]->port ^= xor_val; 564 } 565 566 app_stats.worker_pkts[p->worker_id] += num; 567 if (num > 0) 568 app_stats.worker_bursts[p->worker_id][num-1]++; 569 } 570 return 0; 571 } 572 573 /* display usage */ 574 static void 575 print_usage(const char *prgname) 576 { 577 printf("%s [EAL options] -- -p PORTMASK\n" 578 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 579 prgname); 580 } 581 582 static int 583 parse_portmask(const char *portmask) 584 { 585 char *end = NULL; 586 unsigned long pm; 587 588 /* parse hexadecimal string */ 589 pm = strtoul(portmask, &end, 16); 590 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 591 return -1; 592 593 if (pm == 0) 594 return -1; 595 596 return pm; 597 } 598 599 /* Parse the argument given in the command line of the application */ 600 static int 601 parse_args(int argc, char **argv) 602 { 603 int opt; 604 char **argvopt; 605 int option_index; 606 char *prgname = argv[0]; 607 static struct option lgopts[] = { 608 {NULL, 0, 0, 0} 609 }; 610 611 argvopt = argv; 612 613 while ((opt = getopt_long(argc, argvopt, "p:", 614 lgopts, &option_index)) != EOF) { 615 616 switch (opt) { 617 /* portmask */ 618 case 'p': 619 enabled_port_mask = parse_portmask(optarg); 620 if (enabled_port_mask == 0) { 621 printf("invalid portmask\n"); 622 print_usage(prgname); 623 return -1; 624 } 625 break; 626 627 default: 628 print_usage(prgname); 629 return -1; 630 } 631 } 632 633 if (optind <= 1) { 634 print_usage(prgname); 635 return -1; 636 } 637 638 argv[optind-1] = prgname; 639 640 optind = 1; /* reset getopt lib */ 641 return 0; 642 } 643 644 /* Main function, does initialization and calls the per-lcore functions */ 645 int 646 main(int argc, char *argv[]) 647 { 648 struct rte_mempool *mbuf_pool; 649 struct rte_distributor *d; 650 struct rte_ring *dist_tx_ring; 651 struct rte_ring *rx_dist_ring; 652 unsigned lcore_id, worker_id = 0; 653 unsigned nb_ports; 654 uint16_t portid; 655 uint16_t nb_ports_available; 656 uint64_t t, freq; 657 658 /* catch ctrl-c so we can print on exit */ 659 signal(SIGINT, int_handler); 660 661 /* init EAL */ 662 int ret = rte_eal_init(argc, argv); 663 if (ret < 0) 664 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); 665 argc -= ret; 666 argv += ret; 667 668 /* parse application arguments (after the EAL ones) */ 669 ret = parse_args(argc, argv); 670 if (ret < 0) 671 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); 672 673 if (rte_lcore_count() < 5) 674 rte_exit(EXIT_FAILURE, "Error, This application needs at " 675 "least 5 logical cores to run:\n" 676 "1 lcore for stats (can be core 0)\n" 677 "1 lcore for packet RX\n" 678 "1 lcore for distribution\n" 679 "1 lcore for packet TX\n" 680 "and at least 1 lcore for worker threads\n"); 681 682 nb_ports = rte_eth_dev_count(); 683 if (nb_ports == 0) 684 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 685 if (nb_ports != 1 && (nb_ports & 1)) 686 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 687 "when using a single port\n"); 688 689 mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", 690 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, 691 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 692 if (mbuf_pool == NULL) 693 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); 694 nb_ports_available = nb_ports; 695 696 /* initialize all ports */ 697 for (portid = 0; portid < nb_ports; portid++) { 698 /* skip ports that are not enabled */ 699 if ((enabled_port_mask & (1 << portid)) == 0) { 700 printf("\nSkipping disabled port %d\n", portid); 701 nb_ports_available--; 702 continue; 703 } 704 /* init port */ 705 printf("Initializing port %u... done\n", portid); 706 707 if (port_init(portid, mbuf_pool) != 0) 708 rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n", 709 portid); 710 } 711 712 if (!nb_ports_available) { 713 rte_exit(EXIT_FAILURE, 714 "All available ports are disabled. Please set portmask.\n"); 715 } 716 717 d = rte_distributor_create("PKT_DIST", rte_socket_id(), 718 rte_lcore_count() - 4, 719 RTE_DIST_ALG_BURST); 720 if (d == NULL) 721 rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); 722 723 /* 724 * scheduler ring is read by the transmitter core, and written to 725 * by scheduler core 726 */ 727 dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ, 728 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 729 if (dist_tx_ring == NULL) 730 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 731 732 rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ, 733 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 734 if (rx_dist_ring == NULL) 735 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 736 737 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 738 if (worker_id == rte_lcore_count() - 3) { 739 printf("Starting distributor on lcore_id %d\n", 740 lcore_id); 741 /* distributor core */ 742 struct lcore_params *p = 743 rte_malloc(NULL, sizeof(*p), 0); 744 if (!p) 745 rte_panic("malloc failure\n"); 746 *p = (struct lcore_params){worker_id, d, 747 rx_dist_ring, dist_tx_ring, mbuf_pool}; 748 rte_eal_remote_launch( 749 (lcore_function_t *)lcore_distributor, 750 p, lcore_id); 751 } else if (worker_id == rte_lcore_count() - 4) { 752 printf("Starting tx on worker_id %d, lcore_id %d\n", 753 worker_id, lcore_id); 754 /* tx core */ 755 rte_eal_remote_launch((lcore_function_t *)lcore_tx, 756 dist_tx_ring, lcore_id); 757 } else if (worker_id == rte_lcore_count() - 2) { 758 printf("Starting rx on worker_id %d, lcore_id %d\n", 759 worker_id, lcore_id); 760 /* rx core */ 761 struct lcore_params *p = 762 rte_malloc(NULL, sizeof(*p), 0); 763 if (!p) 764 rte_panic("malloc failure\n"); 765 *p = (struct lcore_params){worker_id, d, rx_dist_ring, 766 dist_tx_ring, mbuf_pool}; 767 rte_eal_remote_launch((lcore_function_t *)lcore_rx, 768 p, lcore_id); 769 } else { 770 printf("Starting worker on worker_id %d, lcore_id %d\n", 771 worker_id, lcore_id); 772 struct lcore_params *p = 773 rte_malloc(NULL, sizeof(*p), 0); 774 if (!p) 775 rte_panic("malloc failure\n"); 776 *p = (struct lcore_params){worker_id, d, rx_dist_ring, 777 dist_tx_ring, mbuf_pool}; 778 779 rte_eal_remote_launch((lcore_function_t *)lcore_worker, 780 p, lcore_id); 781 } 782 worker_id++; 783 } 784 785 freq = rte_get_timer_hz(); 786 t = rte_rdtsc() + freq; 787 while (!quit_signal_dist) { 788 if (t < rte_rdtsc()) { 789 print_stats(); 790 t = rte_rdtsc() + freq; 791 } 792 usleep(1000); 793 } 794 795 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 796 if (rte_eal_wait_lcore(lcore_id) < 0) 797 return -1; 798 } 799 800 print_stats(); 801 return 0; 802 } 803