1 /*- 2 * BSD LICENSE 3 * 4 * Copyright(c) 2010-2017 Intel Corporation. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 10 * * Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * * Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in 14 * the documentation and/or other materials provided with the 15 * distribution. 16 * * Neither the name of Intel Corporation nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <stdint.h> 34 #include <inttypes.h> 35 #include <unistd.h> 36 #include <signal.h> 37 #include <getopt.h> 38 39 #include <rte_eal.h> 40 #include <rte_ethdev.h> 41 #include <rte_cycles.h> 42 #include <rte_malloc.h> 43 #include <rte_debug.h> 44 #include <rte_prefetch.h> 45 #include <rte_distributor.h> 46 #include <rte_pause.h> 47 48 #define RX_RING_SIZE 512 49 #define TX_RING_SIZE 512 50 #define NUM_MBUFS ((64*1024)-1) 51 #define MBUF_CACHE_SIZE 128 52 #define BURST_SIZE 64 53 #define SCHED_RX_RING_SZ 8192 54 #define SCHED_TX_RING_SZ 65536 55 #define BURST_SIZE_TX 32 56 57 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 58 59 #define ANSI_COLOR_RED "\x1b[31m" 60 #define ANSI_COLOR_RESET "\x1b[0m" 61 62 /* mask of enabled ports */ 63 static uint32_t enabled_port_mask; 64 volatile uint8_t quit_signal; 65 volatile uint8_t quit_signal_rx; 66 volatile uint8_t quit_signal_dist; 67 volatile uint8_t quit_signal_work; 68 69 static volatile struct app_stats { 70 struct { 71 uint64_t rx_pkts; 72 uint64_t returned_pkts; 73 uint64_t enqueued_pkts; 74 uint64_t enqdrop_pkts; 75 } rx __rte_cache_aligned; 76 int pad1 __rte_cache_aligned; 77 78 struct { 79 uint64_t in_pkts; 80 uint64_t ret_pkts; 81 uint64_t sent_pkts; 82 uint64_t enqdrop_pkts; 83 } dist __rte_cache_aligned; 84 int pad2 __rte_cache_aligned; 85 86 struct { 87 uint64_t dequeue_pkts; 88 uint64_t tx_pkts; 89 uint64_t enqdrop_pkts; 90 } tx __rte_cache_aligned; 91 int pad3 __rte_cache_aligned; 92 93 uint64_t worker_pkts[64] __rte_cache_aligned; 94 95 int pad4 __rte_cache_aligned; 96 97 uint64_t worker_bursts[64][8] __rte_cache_aligned; 98 99 int pad5 __rte_cache_aligned; 100 101 uint64_t port_rx_pkts[64] __rte_cache_aligned; 102 uint64_t port_tx_pkts[64] __rte_cache_aligned; 103 } app_stats; 104 105 struct app_stats prev_app_stats; 106 107 static const struct rte_eth_conf port_conf_default = { 108 .rxmode = { 109 .mq_mode = ETH_MQ_RX_RSS, 110 .max_rx_pkt_len = ETHER_MAX_LEN, 111 }, 112 .txmode = { 113 .mq_mode = ETH_MQ_TX_NONE, 114 }, 115 .rx_adv_conf = { 116 .rss_conf = { 117 .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | 118 ETH_RSS_TCP | ETH_RSS_SCTP, 119 } 120 }, 121 }; 122 123 struct output_buffer { 124 unsigned count; 125 struct rte_mbuf *mbufs[BURST_SIZE]; 126 }; 127 128 static void print_stats(void); 129 130 /* 131 * Initialises a given port using global settings and with the rx buffers 132 * coming from the mbuf_pool passed as parameter 133 */ 134 static inline int 135 port_init(uint16_t port, struct rte_mempool *mbuf_pool) 136 { 137 struct rte_eth_conf port_conf = port_conf_default; 138 const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; 139 int retval; 140 uint16_t q; 141 uint16_t nb_rxd = RX_RING_SIZE; 142 uint16_t nb_txd = TX_RING_SIZE; 143 144 if (port >= rte_eth_dev_count()) 145 return -1; 146 147 retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); 148 if (retval != 0) 149 return retval; 150 151 retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd); 152 if (retval != 0) 153 return retval; 154 155 for (q = 0; q < rxRings; q++) { 156 retval = rte_eth_rx_queue_setup(port, q, nb_rxd, 157 rte_eth_dev_socket_id(port), 158 NULL, mbuf_pool); 159 if (retval < 0) 160 return retval; 161 } 162 163 for (q = 0; q < txRings; q++) { 164 retval = rte_eth_tx_queue_setup(port, q, nb_txd, 165 rte_eth_dev_socket_id(port), 166 NULL); 167 if (retval < 0) 168 return retval; 169 } 170 171 retval = rte_eth_dev_start(port); 172 if (retval < 0) 173 return retval; 174 175 struct rte_eth_link link; 176 rte_eth_link_get_nowait(port, &link); 177 while (!link.link_status) { 178 printf("Waiting for Link up on port %"PRIu16"\n", port); 179 sleep(1); 180 rte_eth_link_get_nowait(port, &link); 181 } 182 183 if (!link.link_status) { 184 printf("Link down on port %"PRIu16"\n", port); 185 return 0; 186 } 187 188 struct ether_addr addr; 189 rte_eth_macaddr_get(port, &addr); 190 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 191 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 192 port, 193 addr.addr_bytes[0], addr.addr_bytes[1], 194 addr.addr_bytes[2], addr.addr_bytes[3], 195 addr.addr_bytes[4], addr.addr_bytes[5]); 196 197 rte_eth_promiscuous_enable(port); 198 199 return 0; 200 } 201 202 struct lcore_params { 203 unsigned worker_id; 204 struct rte_distributor *d; 205 struct rte_ring *rx_dist_ring; 206 struct rte_ring *dist_tx_ring; 207 struct rte_mempool *mem_pool; 208 }; 209 210 static int 211 lcore_rx(struct lcore_params *p) 212 { 213 const uint16_t nb_ports = rte_eth_dev_count(); 214 const int socket_id = rte_socket_id(); 215 uint8_t port; 216 struct rte_mbuf *bufs[BURST_SIZE*2]; 217 218 for (port = 0; port < nb_ports; port++) { 219 /* skip ports that are not enabled */ 220 if ((enabled_port_mask & (1 << port)) == 0) 221 continue; 222 223 if (rte_eth_dev_socket_id(port) > 0 && 224 rte_eth_dev_socket_id(port) != socket_id) 225 printf("WARNING, port %u is on remote NUMA node to " 226 "RX thread.\n\tPerformance will not " 227 "be optimal.\n", port); 228 } 229 230 printf("\nCore %u doing packet RX.\n", rte_lcore_id()); 231 port = 0; 232 while (!quit_signal_rx) { 233 234 /* skip ports that are not enabled */ 235 if ((enabled_port_mask & (1 << port)) == 0) { 236 if (++port == nb_ports) 237 port = 0; 238 continue; 239 } 240 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 241 BURST_SIZE); 242 if (unlikely(nb_rx == 0)) { 243 if (++port == nb_ports) 244 port = 0; 245 continue; 246 } 247 app_stats.rx.rx_pkts += nb_rx; 248 249 /* 250 * You can run the distributor on the rx core with this code. Returned 251 * packets are then send straight to the tx core. 252 */ 253 #if 0 254 rte_distributor_process(d, bufs, nb_rx); 255 const uint16_t nb_ret = rte_distributor_returned_pktsd, 256 bufs, BURST_SIZE*2); 257 258 app_stats.rx.returned_pkts += nb_ret; 259 if (unlikely(nb_ret == 0)) { 260 if (++port == nb_ports) 261 port = 0; 262 continue; 263 } 264 265 struct rte_ring *tx_ring = p->dist_tx_ring; 266 uint16_t sent = rte_ring_enqueue_burst(tx_ring, 267 (void *)bufs, nb_ret, NULL); 268 #else 269 uint16_t nb_ret = nb_rx; 270 /* 271 * Swap the following two lines if you want the rx traffic 272 * to go directly to tx, no distribution. 273 */ 274 struct rte_ring *out_ring = p->rx_dist_ring; 275 /* struct rte_ring *out_ring = p->dist_tx_ring; */ 276 277 uint16_t sent = rte_ring_enqueue_burst(out_ring, 278 (void *)bufs, nb_ret, NULL); 279 #endif 280 281 app_stats.rx.enqueued_pkts += sent; 282 if (unlikely(sent < nb_ret)) { 283 app_stats.rx.enqdrop_pkts += nb_ret - sent; 284 RTE_LOG_DP(DEBUG, DISTRAPP, 285 "%s:Packet loss due to full ring\n", __func__); 286 while (sent < nb_ret) 287 rte_pktmbuf_free(bufs[sent++]); 288 } 289 if (++port == nb_ports) 290 port = 0; 291 } 292 /* set worker & tx threads quit flag */ 293 printf("\nCore %u exiting rx task.\n", rte_lcore_id()); 294 quit_signal = 1; 295 return 0; 296 } 297 298 static inline void 299 flush_one_port(struct output_buffer *outbuf, uint8_t outp) 300 { 301 unsigned int nb_tx = rte_eth_tx_burst(outp, 0, 302 outbuf->mbufs, outbuf->count); 303 app_stats.tx.tx_pkts += outbuf->count; 304 305 if (unlikely(nb_tx < outbuf->count)) { 306 app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx; 307 do { 308 rte_pktmbuf_free(outbuf->mbufs[nb_tx]); 309 } while (++nb_tx < outbuf->count); 310 } 311 outbuf->count = 0; 312 } 313 314 static inline void 315 flush_all_ports(struct output_buffer *tx_buffers, uint16_t nb_ports) 316 { 317 uint16_t outp; 318 319 for (outp = 0; outp < nb_ports; outp++) { 320 /* skip ports that are not enabled */ 321 if ((enabled_port_mask & (1 << outp)) == 0) 322 continue; 323 324 if (tx_buffers[outp].count == 0) 325 continue; 326 327 flush_one_port(&tx_buffers[outp], outp); 328 } 329 } 330 331 332 333 static int 334 lcore_distributor(struct lcore_params *p) 335 { 336 struct rte_ring *in_r = p->rx_dist_ring; 337 struct rte_ring *out_r = p->dist_tx_ring; 338 struct rte_mbuf *bufs[BURST_SIZE * 4]; 339 struct rte_distributor *d = p->d; 340 341 printf("\nCore %u acting as distributor core.\n", rte_lcore_id()); 342 while (!quit_signal_dist) { 343 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 344 (void *)bufs, BURST_SIZE*1, NULL); 345 if (nb_rx) { 346 app_stats.dist.in_pkts += nb_rx; 347 348 /* Distribute the packets */ 349 rte_distributor_process(d, bufs, nb_rx); 350 /* Handle Returns */ 351 const uint16_t nb_ret = 352 rte_distributor_returned_pkts(d, 353 bufs, BURST_SIZE*2); 354 355 if (unlikely(nb_ret == 0)) 356 continue; 357 app_stats.dist.ret_pkts += nb_ret; 358 359 uint16_t sent = rte_ring_enqueue_burst(out_r, 360 (void *)bufs, nb_ret, NULL); 361 app_stats.dist.sent_pkts += sent; 362 if (unlikely(sent < nb_ret)) { 363 app_stats.dist.enqdrop_pkts += nb_ret - sent; 364 RTE_LOG(DEBUG, DISTRAPP, 365 "%s:Packet loss due to full out ring\n", 366 __func__); 367 while (sent < nb_ret) 368 rte_pktmbuf_free(bufs[sent++]); 369 } 370 } 371 } 372 printf("\nCore %u exiting distributor task.\n", rte_lcore_id()); 373 quit_signal_work = 1; 374 375 rte_distributor_flush(d); 376 /* Unblock any returns so workers can exit */ 377 rte_distributor_clear_returns(d); 378 quit_signal_rx = 1; 379 return 0; 380 } 381 382 383 static int 384 lcore_tx(struct rte_ring *in_r) 385 { 386 static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; 387 const uint16_t nb_ports = rte_eth_dev_count(); 388 const int socket_id = rte_socket_id(); 389 uint16_t port; 390 391 for (port = 0; port < nb_ports; port++) { 392 /* skip ports that are not enabled */ 393 if ((enabled_port_mask & (1 << port)) == 0) 394 continue; 395 396 if (rte_eth_dev_socket_id(port) > 0 && 397 rte_eth_dev_socket_id(port) != socket_id) 398 printf("WARNING, port %u is on remote NUMA node to " 399 "TX thread.\n\tPerformance will not " 400 "be optimal.\n", port); 401 } 402 403 printf("\nCore %u doing packet TX.\n", rte_lcore_id()); 404 while (!quit_signal) { 405 406 for (port = 0; port < nb_ports; port++) { 407 /* skip ports that are not enabled */ 408 if ((enabled_port_mask & (1 << port)) == 0) 409 continue; 410 411 struct rte_mbuf *bufs[BURST_SIZE_TX]; 412 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 413 (void *)bufs, BURST_SIZE_TX, NULL); 414 app_stats.tx.dequeue_pkts += nb_rx; 415 416 /* if we get no traffic, flush anything we have */ 417 if (unlikely(nb_rx == 0)) { 418 flush_all_ports(tx_buffers, nb_ports); 419 continue; 420 } 421 422 /* for traffic we receive, queue it up for transmit */ 423 uint16_t i; 424 rte_prefetch_non_temporal((void *)bufs[0]); 425 rte_prefetch_non_temporal((void *)bufs[1]); 426 rte_prefetch_non_temporal((void *)bufs[2]); 427 for (i = 0; i < nb_rx; i++) { 428 struct output_buffer *outbuf; 429 uint8_t outp; 430 rte_prefetch_non_temporal((void *)bufs[i + 3]); 431 /* 432 * workers should update in_port to hold the 433 * output port value 434 */ 435 outp = bufs[i]->port; 436 /* skip ports that are not enabled */ 437 if ((enabled_port_mask & (1 << outp)) == 0) 438 continue; 439 440 outbuf = &tx_buffers[outp]; 441 outbuf->mbufs[outbuf->count++] = bufs[i]; 442 if (outbuf->count == BURST_SIZE_TX) 443 flush_one_port(outbuf, outp); 444 } 445 } 446 } 447 printf("\nCore %u exiting tx task.\n", rte_lcore_id()); 448 return 0; 449 } 450 451 static void 452 int_handler(int sig_num) 453 { 454 printf("Exiting on signal %d\n", sig_num); 455 /* set quit flag for rx thread to exit */ 456 quit_signal_dist = 1; 457 } 458 459 static void 460 print_stats(void) 461 { 462 struct rte_eth_stats eth_stats; 463 unsigned int i, j; 464 const unsigned int num_workers = rte_lcore_count() - 4; 465 466 for (i = 0; i < rte_eth_dev_count(); i++) { 467 rte_eth_stats_get(i, ð_stats); 468 app_stats.port_rx_pkts[i] = eth_stats.ipackets; 469 app_stats.port_tx_pkts[i] = eth_stats.opackets; 470 } 471 472 printf("\n\nRX Thread:\n"); 473 for (i = 0; i < rte_eth_dev_count(); i++) { 474 printf("Port %u Pktsin : %5.2f\n", i, 475 (app_stats.port_rx_pkts[i] - 476 prev_app_stats.port_rx_pkts[i])/1000000.0); 477 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i]; 478 } 479 printf(" - Received: %5.2f\n", 480 (app_stats.rx.rx_pkts - 481 prev_app_stats.rx.rx_pkts)/1000000.0); 482 printf(" - Returned: %5.2f\n", 483 (app_stats.rx.returned_pkts - 484 prev_app_stats.rx.returned_pkts)/1000000.0); 485 printf(" - Enqueued: %5.2f\n", 486 (app_stats.rx.enqueued_pkts - 487 prev_app_stats.rx.enqueued_pkts)/1000000.0); 488 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 489 (app_stats.rx.enqdrop_pkts - 490 prev_app_stats.rx.enqdrop_pkts)/1000000.0, 491 ANSI_COLOR_RESET); 492 493 printf("Distributor thread:\n"); 494 printf(" - In: %5.2f\n", 495 (app_stats.dist.in_pkts - 496 prev_app_stats.dist.in_pkts)/1000000.0); 497 printf(" - Returned: %5.2f\n", 498 (app_stats.dist.ret_pkts - 499 prev_app_stats.dist.ret_pkts)/1000000.0); 500 printf(" - Sent: %5.2f\n", 501 (app_stats.dist.sent_pkts - 502 prev_app_stats.dist.sent_pkts)/1000000.0); 503 printf(" - Dropped %s%5.2f%s\n", ANSI_COLOR_RED, 504 (app_stats.dist.enqdrop_pkts - 505 prev_app_stats.dist.enqdrop_pkts)/1000000.0, 506 ANSI_COLOR_RESET); 507 508 printf("TX thread:\n"); 509 printf(" - Dequeued: %5.2f\n", 510 (app_stats.tx.dequeue_pkts - 511 prev_app_stats.tx.dequeue_pkts)/1000000.0); 512 for (i = 0; i < rte_eth_dev_count(); i++) { 513 printf("Port %u Pktsout: %5.2f\n", 514 i, (app_stats.port_tx_pkts[i] - 515 prev_app_stats.port_tx_pkts[i])/1000000.0); 516 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i]; 517 } 518 printf(" - Transmitted: %5.2f\n", 519 (app_stats.tx.tx_pkts - 520 prev_app_stats.tx.tx_pkts)/1000000.0); 521 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 522 (app_stats.tx.enqdrop_pkts - 523 prev_app_stats.tx.enqdrop_pkts)/1000000.0, 524 ANSI_COLOR_RESET); 525 526 prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts; 527 prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts; 528 prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts; 529 prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts; 530 prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts; 531 prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts; 532 prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts; 533 prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts; 534 prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts; 535 prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts; 536 prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts; 537 538 for (i = 0; i < num_workers; i++) { 539 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i, 540 (app_stats.worker_pkts[i] - 541 prev_app_stats.worker_pkts[i])/1000000.0); 542 for (j = 0; j < 8; j++) { 543 printf("%"PRIu64" ", app_stats.worker_bursts[i][j]); 544 app_stats.worker_bursts[i][j] = 0; 545 } 546 printf("\n"); 547 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i]; 548 } 549 } 550 551 static int 552 lcore_worker(struct lcore_params *p) 553 { 554 struct rte_distributor *d = p->d; 555 const unsigned id = p->worker_id; 556 unsigned int num = 0; 557 unsigned int i; 558 559 /* 560 * for single port, xor_val will be zero so we won't modify the output 561 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa 562 */ 563 const unsigned xor_val = (rte_eth_dev_count() > 1); 564 struct rte_mbuf *buf[8] __rte_cache_aligned; 565 566 for (i = 0; i < 8; i++) 567 buf[i] = NULL; 568 569 app_stats.worker_pkts[p->worker_id] = 1; 570 571 printf("\nCore %u acting as worker core.\n", rte_lcore_id()); 572 while (!quit_signal_work) { 573 num = rte_distributor_get_pkt(d, id, buf, buf, num); 574 /* Do a little bit of work for each packet */ 575 for (i = 0; i < num; i++) { 576 uint64_t t = rte_rdtsc()+100; 577 578 while (rte_rdtsc() < t) 579 rte_pause(); 580 buf[i]->port ^= xor_val; 581 } 582 583 app_stats.worker_pkts[p->worker_id] += num; 584 if (num > 0) 585 app_stats.worker_bursts[p->worker_id][num-1]++; 586 } 587 return 0; 588 } 589 590 /* display usage */ 591 static void 592 print_usage(const char *prgname) 593 { 594 printf("%s [EAL options] -- -p PORTMASK\n" 595 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 596 prgname); 597 } 598 599 static int 600 parse_portmask(const char *portmask) 601 { 602 char *end = NULL; 603 unsigned long pm; 604 605 /* parse hexadecimal string */ 606 pm = strtoul(portmask, &end, 16); 607 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 608 return -1; 609 610 if (pm == 0) 611 return -1; 612 613 return pm; 614 } 615 616 /* Parse the argument given in the command line of the application */ 617 static int 618 parse_args(int argc, char **argv) 619 { 620 int opt; 621 char **argvopt; 622 int option_index; 623 char *prgname = argv[0]; 624 static struct option lgopts[] = { 625 {NULL, 0, 0, 0} 626 }; 627 628 argvopt = argv; 629 630 while ((opt = getopt_long(argc, argvopt, "p:", 631 lgopts, &option_index)) != EOF) { 632 633 switch (opt) { 634 /* portmask */ 635 case 'p': 636 enabled_port_mask = parse_portmask(optarg); 637 if (enabled_port_mask == 0) { 638 printf("invalid portmask\n"); 639 print_usage(prgname); 640 return -1; 641 } 642 break; 643 644 default: 645 print_usage(prgname); 646 return -1; 647 } 648 } 649 650 if (optind <= 1) { 651 print_usage(prgname); 652 return -1; 653 } 654 655 argv[optind-1] = prgname; 656 657 optind = 1; /* reset getopt lib */ 658 return 0; 659 } 660 661 /* Main function, does initialization and calls the per-lcore functions */ 662 int 663 main(int argc, char *argv[]) 664 { 665 struct rte_mempool *mbuf_pool; 666 struct rte_distributor *d; 667 struct rte_ring *dist_tx_ring; 668 struct rte_ring *rx_dist_ring; 669 unsigned lcore_id, worker_id = 0; 670 unsigned nb_ports; 671 uint16_t portid; 672 uint16_t nb_ports_available; 673 uint64_t t, freq; 674 675 /* catch ctrl-c so we can print on exit */ 676 signal(SIGINT, int_handler); 677 678 /* init EAL */ 679 int ret = rte_eal_init(argc, argv); 680 if (ret < 0) 681 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); 682 argc -= ret; 683 argv += ret; 684 685 /* parse application arguments (after the EAL ones) */ 686 ret = parse_args(argc, argv); 687 if (ret < 0) 688 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); 689 690 if (rte_lcore_count() < 5) 691 rte_exit(EXIT_FAILURE, "Error, This application needs at " 692 "least 5 logical cores to run:\n" 693 "1 lcore for stats (can be core 0)\n" 694 "1 lcore for packet RX\n" 695 "1 lcore for distribution\n" 696 "1 lcore for packet TX\n" 697 "and at least 1 lcore for worker threads\n"); 698 699 nb_ports = rte_eth_dev_count(); 700 if (nb_ports == 0) 701 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 702 if (nb_ports != 1 && (nb_ports & 1)) 703 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 704 "when using a single port\n"); 705 706 mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", 707 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, 708 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 709 if (mbuf_pool == NULL) 710 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); 711 nb_ports_available = nb_ports; 712 713 /* initialize all ports */ 714 for (portid = 0; portid < nb_ports; portid++) { 715 /* skip ports that are not enabled */ 716 if ((enabled_port_mask & (1 << portid)) == 0) { 717 printf("\nSkipping disabled port %d\n", portid); 718 nb_ports_available--; 719 continue; 720 } 721 /* init port */ 722 printf("Initializing port %u... done\n", portid); 723 724 if (port_init(portid, mbuf_pool) != 0) 725 rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n", 726 portid); 727 } 728 729 if (!nb_ports_available) { 730 rte_exit(EXIT_FAILURE, 731 "All available ports are disabled. Please set portmask.\n"); 732 } 733 734 d = rte_distributor_create("PKT_DIST", rte_socket_id(), 735 rte_lcore_count() - 4, 736 RTE_DIST_ALG_BURST); 737 if (d == NULL) 738 rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); 739 740 /* 741 * scheduler ring is read by the transmitter core, and written to 742 * by scheduler core 743 */ 744 dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ, 745 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 746 if (dist_tx_ring == NULL) 747 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 748 749 rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ, 750 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 751 if (rx_dist_ring == NULL) 752 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 753 754 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 755 if (worker_id == rte_lcore_count() - 3) { 756 printf("Starting distributor on lcore_id %d\n", 757 lcore_id); 758 /* distributor core */ 759 struct lcore_params *p = 760 rte_malloc(NULL, sizeof(*p), 0); 761 if (!p) 762 rte_panic("malloc failure\n"); 763 *p = (struct lcore_params){worker_id, d, 764 rx_dist_ring, dist_tx_ring, mbuf_pool}; 765 rte_eal_remote_launch( 766 (lcore_function_t *)lcore_distributor, 767 p, lcore_id); 768 } else if (worker_id == rte_lcore_count() - 4) { 769 printf("Starting tx on worker_id %d, lcore_id %d\n", 770 worker_id, lcore_id); 771 /* tx core */ 772 rte_eal_remote_launch((lcore_function_t *)lcore_tx, 773 dist_tx_ring, lcore_id); 774 } else if (worker_id == rte_lcore_count() - 2) { 775 printf("Starting rx on worker_id %d, lcore_id %d\n", 776 worker_id, lcore_id); 777 /* rx core */ 778 struct lcore_params *p = 779 rte_malloc(NULL, sizeof(*p), 0); 780 if (!p) 781 rte_panic("malloc failure\n"); 782 *p = (struct lcore_params){worker_id, d, rx_dist_ring, 783 dist_tx_ring, mbuf_pool}; 784 rte_eal_remote_launch((lcore_function_t *)lcore_rx, 785 p, lcore_id); 786 } else { 787 printf("Starting worker on worker_id %d, lcore_id %d\n", 788 worker_id, lcore_id); 789 struct lcore_params *p = 790 rte_malloc(NULL, sizeof(*p), 0); 791 if (!p) 792 rte_panic("malloc failure\n"); 793 *p = (struct lcore_params){worker_id, d, rx_dist_ring, 794 dist_tx_ring, mbuf_pool}; 795 796 rte_eal_remote_launch((lcore_function_t *)lcore_worker, 797 p, lcore_id); 798 } 799 worker_id++; 800 } 801 802 freq = rte_get_timer_hz(); 803 t = rte_rdtsc() + freq; 804 while (!quit_signal_dist) { 805 if (t < rte_rdtsc()) { 806 print_stats(); 807 t = rte_rdtsc() + freq; 808 } 809 usleep(1000); 810 } 811 812 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 813 if (rte_eal_wait_lcore(lcore_id) < 0) 814 return -1; 815 } 816 817 print_stats(); 818 return 0; 819 } 820