1 /*- 2 * BSD LICENSE 3 * 4 * Copyright(c) 2010-2017 Intel Corporation. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 10 * * Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * * Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in 14 * the documentation and/or other materials provided with the 15 * distribution. 16 * * Neither the name of Intel Corporation nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <stdint.h> 34 #include <inttypes.h> 35 #include <unistd.h> 36 #include <signal.h> 37 #include <getopt.h> 38 39 #include <rte_eal.h> 40 #include <rte_ethdev.h> 41 #include <rte_cycles.h> 42 #include <rte_malloc.h> 43 #include <rte_debug.h> 44 #include <rte_prefetch.h> 45 #include <rte_distributor.h> 46 #include <rte_pause.h> 47 48 #define RX_RING_SIZE 512 49 #define TX_RING_SIZE 512 50 #define NUM_MBUFS ((64*1024)-1) 51 #define MBUF_CACHE_SIZE 128 52 #define BURST_SIZE 64 53 #define SCHED_RX_RING_SZ 8192 54 #define SCHED_TX_RING_SZ 65536 55 #define BURST_SIZE_TX 32 56 57 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 58 59 #define ANSI_COLOR_RED "\x1b[31m" 60 #define ANSI_COLOR_RESET "\x1b[0m" 61 62 /* mask of enabled ports */ 63 static uint32_t enabled_port_mask; 64 volatile uint8_t quit_signal; 65 volatile uint8_t quit_signal_rx; 66 volatile uint8_t quit_signal_dist; 67 volatile uint8_t quit_signal_work; 68 69 static volatile struct app_stats { 70 struct { 71 uint64_t rx_pkts; 72 uint64_t returned_pkts; 73 uint64_t enqueued_pkts; 74 uint64_t enqdrop_pkts; 75 } rx __rte_cache_aligned; 76 int pad1 __rte_cache_aligned; 77 78 struct { 79 uint64_t in_pkts; 80 uint64_t ret_pkts; 81 uint64_t sent_pkts; 82 uint64_t enqdrop_pkts; 83 } dist __rte_cache_aligned; 84 int pad2 __rte_cache_aligned; 85 86 struct { 87 uint64_t dequeue_pkts; 88 uint64_t tx_pkts; 89 uint64_t enqdrop_pkts; 90 } tx __rte_cache_aligned; 91 int pad3 __rte_cache_aligned; 92 93 uint64_t worker_pkts[64] __rte_cache_aligned; 94 95 int pad4 __rte_cache_aligned; 96 97 uint64_t worker_bursts[64][8] __rte_cache_aligned; 98 99 int pad5 __rte_cache_aligned; 100 101 uint64_t port_rx_pkts[64] __rte_cache_aligned; 102 uint64_t port_tx_pkts[64] __rte_cache_aligned; 103 } app_stats; 104 105 struct app_stats prev_app_stats; 106 107 static const struct rte_eth_conf port_conf_default = { 108 .rxmode = { 109 .mq_mode = ETH_MQ_RX_RSS, 110 .max_rx_pkt_len = ETHER_MAX_LEN, 111 }, 112 .txmode = { 113 .mq_mode = ETH_MQ_TX_NONE, 114 }, 115 .rx_adv_conf = { 116 .rss_conf = { 117 .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | 118 ETH_RSS_TCP | ETH_RSS_SCTP, 119 } 120 }, 121 }; 122 123 struct output_buffer { 124 unsigned count; 125 struct rte_mbuf *mbufs[BURST_SIZE]; 126 }; 127 128 static void print_stats(void); 129 130 /* 131 * Initialises a given port using global settings and with the rx buffers 132 * coming from the mbuf_pool passed as parameter 133 */ 134 static inline int 135 port_init(uint8_t port, struct rte_mempool *mbuf_pool) 136 { 137 struct rte_eth_conf port_conf = port_conf_default; 138 const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; 139 int retval; 140 uint16_t q; 141 142 if (port >= rte_eth_dev_count()) 143 return -1; 144 145 retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); 146 if (retval != 0) 147 return retval; 148 149 for (q = 0; q < rxRings; q++) { 150 retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, 151 rte_eth_dev_socket_id(port), 152 NULL, mbuf_pool); 153 if (retval < 0) 154 return retval; 155 } 156 157 for (q = 0; q < txRings; q++) { 158 retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, 159 rte_eth_dev_socket_id(port), 160 NULL); 161 if (retval < 0) 162 return retval; 163 } 164 165 retval = rte_eth_dev_start(port); 166 if (retval < 0) 167 return retval; 168 169 struct rte_eth_link link; 170 rte_eth_link_get_nowait(port, &link); 171 while (!link.link_status) { 172 printf("Waiting for Link up on port %"PRIu8"\n", port); 173 sleep(1); 174 rte_eth_link_get_nowait(port, &link); 175 } 176 177 if (!link.link_status) { 178 printf("Link down on port %"PRIu8"\n", port); 179 return 0; 180 } 181 182 struct ether_addr addr; 183 rte_eth_macaddr_get(port, &addr); 184 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 185 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 186 (unsigned)port, 187 addr.addr_bytes[0], addr.addr_bytes[1], 188 addr.addr_bytes[2], addr.addr_bytes[3], 189 addr.addr_bytes[4], addr.addr_bytes[5]); 190 191 rte_eth_promiscuous_enable(port); 192 193 return 0; 194 } 195 196 struct lcore_params { 197 unsigned worker_id; 198 struct rte_distributor *d; 199 struct rte_ring *rx_dist_ring; 200 struct rte_ring *dist_tx_ring; 201 struct rte_mempool *mem_pool; 202 }; 203 204 static int 205 lcore_rx(struct lcore_params *p) 206 { 207 const uint8_t nb_ports = rte_eth_dev_count(); 208 const int socket_id = rte_socket_id(); 209 uint8_t port; 210 struct rte_mbuf *bufs[BURST_SIZE*2]; 211 212 for (port = 0; port < nb_ports; port++) { 213 /* skip ports that are not enabled */ 214 if ((enabled_port_mask & (1 << port)) == 0) 215 continue; 216 217 if (rte_eth_dev_socket_id(port) > 0 && 218 rte_eth_dev_socket_id(port) != socket_id) 219 printf("WARNING, port %u is on remote NUMA node to " 220 "RX thread.\n\tPerformance will not " 221 "be optimal.\n", port); 222 } 223 224 printf("\nCore %u doing packet RX.\n", rte_lcore_id()); 225 port = 0; 226 while (!quit_signal_rx) { 227 228 /* skip ports that are not enabled */ 229 if ((enabled_port_mask & (1 << port)) == 0) { 230 if (++port == nb_ports) 231 port = 0; 232 continue; 233 } 234 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 235 BURST_SIZE); 236 if (unlikely(nb_rx == 0)) { 237 if (++port == nb_ports) 238 port = 0; 239 continue; 240 } 241 app_stats.rx.rx_pkts += nb_rx; 242 243 /* 244 * You can run the distributor on the rx core with this code. Returned 245 * packets are then send straight to the tx core. 246 */ 247 #if 0 248 rte_distributor_process(d, bufs, nb_rx); 249 const uint16_t nb_ret = rte_distributor_returned_pktsd, 250 bufs, BURST_SIZE*2); 251 252 app_stats.rx.returned_pkts += nb_ret; 253 if (unlikely(nb_ret == 0)) { 254 if (++port == nb_ports) 255 port = 0; 256 continue; 257 } 258 259 struct rte_ring *tx_ring = p->dist_tx_ring; 260 uint16_t sent = rte_ring_enqueue_burst(tx_ring, 261 (void *)bufs, nb_ret, NULL); 262 #else 263 uint16_t nb_ret = nb_rx; 264 /* 265 * Swap the following two lines if you want the rx traffic 266 * to go directly to tx, no distribution. 267 */ 268 struct rte_ring *out_ring = p->rx_dist_ring; 269 /* struct rte_ring *out_ring = p->dist_tx_ring; */ 270 271 uint16_t sent = rte_ring_enqueue_burst(out_ring, 272 (void *)bufs, nb_ret, NULL); 273 #endif 274 275 app_stats.rx.enqueued_pkts += sent; 276 if (unlikely(sent < nb_ret)) { 277 app_stats.rx.enqdrop_pkts += nb_ret - sent; 278 RTE_LOG_DP(DEBUG, DISTRAPP, 279 "%s:Packet loss due to full ring\n", __func__); 280 while (sent < nb_ret) 281 rte_pktmbuf_free(bufs[sent++]); 282 } 283 if (++port == nb_ports) 284 port = 0; 285 } 286 /* set worker & tx threads quit flag */ 287 printf("\nCore %u exiting rx task.\n", rte_lcore_id()); 288 quit_signal = 1; 289 return 0; 290 } 291 292 static inline void 293 flush_one_port(struct output_buffer *outbuf, uint8_t outp) 294 { 295 unsigned int nb_tx = rte_eth_tx_burst(outp, 0, 296 outbuf->mbufs, outbuf->count); 297 app_stats.tx.tx_pkts += outbuf->count; 298 299 if (unlikely(nb_tx < outbuf->count)) { 300 app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx; 301 do { 302 rte_pktmbuf_free(outbuf->mbufs[nb_tx]); 303 } while (++nb_tx < outbuf->count); 304 } 305 outbuf->count = 0; 306 } 307 308 static inline void 309 flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports) 310 { 311 uint8_t outp; 312 313 for (outp = 0; outp < nb_ports; outp++) { 314 /* skip ports that are not enabled */ 315 if ((enabled_port_mask & (1 << outp)) == 0) 316 continue; 317 318 if (tx_buffers[outp].count == 0) 319 continue; 320 321 flush_one_port(&tx_buffers[outp], outp); 322 } 323 } 324 325 326 327 static int 328 lcore_distributor(struct lcore_params *p) 329 { 330 struct rte_ring *in_r = p->rx_dist_ring; 331 struct rte_ring *out_r = p->dist_tx_ring; 332 struct rte_mbuf *bufs[BURST_SIZE * 4]; 333 struct rte_distributor *d = p->d; 334 335 printf("\nCore %u acting as distributor core.\n", rte_lcore_id()); 336 while (!quit_signal_dist) { 337 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 338 (void *)bufs, BURST_SIZE*1, NULL); 339 if (nb_rx) { 340 app_stats.dist.in_pkts += nb_rx; 341 342 /* Distribute the packets */ 343 rte_distributor_process(d, bufs, nb_rx); 344 /* Handle Returns */ 345 const uint16_t nb_ret = 346 rte_distributor_returned_pkts(d, 347 bufs, BURST_SIZE*2); 348 349 if (unlikely(nb_ret == 0)) 350 continue; 351 app_stats.dist.ret_pkts += nb_ret; 352 353 uint16_t sent = rte_ring_enqueue_burst(out_r, 354 (void *)bufs, nb_ret, NULL); 355 app_stats.dist.sent_pkts += sent; 356 if (unlikely(sent < nb_ret)) { 357 app_stats.dist.enqdrop_pkts += nb_ret - sent; 358 RTE_LOG(DEBUG, DISTRAPP, 359 "%s:Packet loss due to full out ring\n", 360 __func__); 361 while (sent < nb_ret) 362 rte_pktmbuf_free(bufs[sent++]); 363 } 364 } 365 } 366 printf("\nCore %u exiting distributor task.\n", rte_lcore_id()); 367 quit_signal_work = 1; 368 369 rte_distributor_flush(d); 370 /* Unblock any returns so workers can exit */ 371 rte_distributor_clear_returns(d); 372 quit_signal_rx = 1; 373 return 0; 374 } 375 376 377 static int 378 lcore_tx(struct rte_ring *in_r) 379 { 380 static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; 381 const uint8_t nb_ports = rte_eth_dev_count(); 382 const int socket_id = rte_socket_id(); 383 uint8_t port; 384 385 for (port = 0; port < nb_ports; port++) { 386 /* skip ports that are not enabled */ 387 if ((enabled_port_mask & (1 << port)) == 0) 388 continue; 389 390 if (rte_eth_dev_socket_id(port) > 0 && 391 rte_eth_dev_socket_id(port) != socket_id) 392 printf("WARNING, port %u is on remote NUMA node to " 393 "TX thread.\n\tPerformance will not " 394 "be optimal.\n", port); 395 } 396 397 printf("\nCore %u doing packet TX.\n", rte_lcore_id()); 398 while (!quit_signal) { 399 400 for (port = 0; port < nb_ports; port++) { 401 /* skip ports that are not enabled */ 402 if ((enabled_port_mask & (1 << port)) == 0) 403 continue; 404 405 struct rte_mbuf *bufs[BURST_SIZE_TX]; 406 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 407 (void *)bufs, BURST_SIZE_TX, NULL); 408 app_stats.tx.dequeue_pkts += nb_rx; 409 410 /* if we get no traffic, flush anything we have */ 411 if (unlikely(nb_rx == 0)) { 412 flush_all_ports(tx_buffers, nb_ports); 413 continue; 414 } 415 416 /* for traffic we receive, queue it up for transmit */ 417 uint16_t i; 418 rte_prefetch_non_temporal((void *)bufs[0]); 419 rte_prefetch_non_temporal((void *)bufs[1]); 420 rte_prefetch_non_temporal((void *)bufs[2]); 421 for (i = 0; i < nb_rx; i++) { 422 struct output_buffer *outbuf; 423 uint8_t outp; 424 rte_prefetch_non_temporal((void *)bufs[i + 3]); 425 /* 426 * workers should update in_port to hold the 427 * output port value 428 */ 429 outp = bufs[i]->port; 430 /* skip ports that are not enabled */ 431 if ((enabled_port_mask & (1 << outp)) == 0) 432 continue; 433 434 outbuf = &tx_buffers[outp]; 435 outbuf->mbufs[outbuf->count++] = bufs[i]; 436 if (outbuf->count == BURST_SIZE_TX) 437 flush_one_port(outbuf, outp); 438 } 439 } 440 } 441 printf("\nCore %u exiting tx task.\n", rte_lcore_id()); 442 return 0; 443 } 444 445 static void 446 int_handler(int sig_num) 447 { 448 printf("Exiting on signal %d\n", sig_num); 449 /* set quit flag for rx thread to exit */ 450 quit_signal_dist = 1; 451 } 452 453 static void 454 print_stats(void) 455 { 456 struct rte_eth_stats eth_stats; 457 unsigned int i, j; 458 const unsigned int num_workers = rte_lcore_count() - 4; 459 460 for (i = 0; i < rte_eth_dev_count(); i++) { 461 rte_eth_stats_get(i, ð_stats); 462 app_stats.port_rx_pkts[i] = eth_stats.ipackets; 463 app_stats.port_tx_pkts[i] = eth_stats.opackets; 464 } 465 466 printf("\n\nRX Thread:\n"); 467 for (i = 0; i < rte_eth_dev_count(); i++) { 468 printf("Port %u Pktsin : %5.2f\n", i, 469 (app_stats.port_rx_pkts[i] - 470 prev_app_stats.port_rx_pkts[i])/1000000.0); 471 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i]; 472 } 473 printf(" - Received: %5.2f\n", 474 (app_stats.rx.rx_pkts - 475 prev_app_stats.rx.rx_pkts)/1000000.0); 476 printf(" - Returned: %5.2f\n", 477 (app_stats.rx.returned_pkts - 478 prev_app_stats.rx.returned_pkts)/1000000.0); 479 printf(" - Enqueued: %5.2f\n", 480 (app_stats.rx.enqueued_pkts - 481 prev_app_stats.rx.enqueued_pkts)/1000000.0); 482 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 483 (app_stats.rx.enqdrop_pkts - 484 prev_app_stats.rx.enqdrop_pkts)/1000000.0, 485 ANSI_COLOR_RESET); 486 487 printf("Distributor thread:\n"); 488 printf(" - In: %5.2f\n", 489 (app_stats.dist.in_pkts - 490 prev_app_stats.dist.in_pkts)/1000000.0); 491 printf(" - Returned: %5.2f\n", 492 (app_stats.dist.ret_pkts - 493 prev_app_stats.dist.ret_pkts)/1000000.0); 494 printf(" - Sent: %5.2f\n", 495 (app_stats.dist.sent_pkts - 496 prev_app_stats.dist.sent_pkts)/1000000.0); 497 printf(" - Dropped %s%5.2f%s\n", ANSI_COLOR_RED, 498 (app_stats.dist.enqdrop_pkts - 499 prev_app_stats.dist.enqdrop_pkts)/1000000.0, 500 ANSI_COLOR_RESET); 501 502 printf("TX thread:\n"); 503 printf(" - Dequeued: %5.2f\n", 504 (app_stats.tx.dequeue_pkts - 505 prev_app_stats.tx.dequeue_pkts)/1000000.0); 506 for (i = 0; i < rte_eth_dev_count(); i++) { 507 printf("Port %u Pktsout: %5.2f\n", 508 i, (app_stats.port_tx_pkts[i] - 509 prev_app_stats.port_tx_pkts[i])/1000000.0); 510 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i]; 511 } 512 printf(" - Transmitted: %5.2f\n", 513 (app_stats.tx.tx_pkts - 514 prev_app_stats.tx.tx_pkts)/1000000.0); 515 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 516 (app_stats.tx.enqdrop_pkts - 517 prev_app_stats.tx.enqdrop_pkts)/1000000.0, 518 ANSI_COLOR_RESET); 519 520 prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts; 521 prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts; 522 prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts; 523 prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts; 524 prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts; 525 prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts; 526 prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts; 527 prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts; 528 prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts; 529 prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts; 530 prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts; 531 532 for (i = 0; i < num_workers; i++) { 533 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i, 534 (app_stats.worker_pkts[i] - 535 prev_app_stats.worker_pkts[i])/1000000.0); 536 for (j = 0; j < 8; j++) { 537 printf("%"PRIu64" ", app_stats.worker_bursts[i][j]); 538 app_stats.worker_bursts[i][j] = 0; 539 } 540 printf("\n"); 541 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i]; 542 } 543 } 544 545 static int 546 lcore_worker(struct lcore_params *p) 547 { 548 struct rte_distributor *d = p->d; 549 const unsigned id = p->worker_id; 550 unsigned int num = 0; 551 unsigned int i; 552 553 /* 554 * for single port, xor_val will be zero so we won't modify the output 555 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa 556 */ 557 const unsigned xor_val = (rte_eth_dev_count() > 1); 558 struct rte_mbuf *buf[8] __rte_cache_aligned; 559 560 for (i = 0; i < 8; i++) 561 buf[i] = NULL; 562 563 app_stats.worker_pkts[p->worker_id] = 1; 564 565 printf("\nCore %u acting as worker core.\n", rte_lcore_id()); 566 while (!quit_signal_work) { 567 num = rte_distributor_get_pkt(d, id, buf, buf, num); 568 /* Do a little bit of work for each packet */ 569 for (i = 0; i < num; i++) { 570 uint64_t t = rte_rdtsc()+100; 571 572 while (rte_rdtsc() < t) 573 rte_pause(); 574 buf[i]->port ^= xor_val; 575 } 576 577 app_stats.worker_pkts[p->worker_id] += num; 578 if (num > 0) 579 app_stats.worker_bursts[p->worker_id][num-1]++; 580 } 581 return 0; 582 } 583 584 /* display usage */ 585 static void 586 print_usage(const char *prgname) 587 { 588 printf("%s [EAL options] -- -p PORTMASK\n" 589 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 590 prgname); 591 } 592 593 static int 594 parse_portmask(const char *portmask) 595 { 596 char *end = NULL; 597 unsigned long pm; 598 599 /* parse hexadecimal string */ 600 pm = strtoul(portmask, &end, 16); 601 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 602 return -1; 603 604 if (pm == 0) 605 return -1; 606 607 return pm; 608 } 609 610 /* Parse the argument given in the command line of the application */ 611 static int 612 parse_args(int argc, char **argv) 613 { 614 int opt; 615 char **argvopt; 616 int option_index; 617 char *prgname = argv[0]; 618 static struct option lgopts[] = { 619 {NULL, 0, 0, 0} 620 }; 621 622 argvopt = argv; 623 624 while ((opt = getopt_long(argc, argvopt, "p:", 625 lgopts, &option_index)) != EOF) { 626 627 switch (opt) { 628 /* portmask */ 629 case 'p': 630 enabled_port_mask = parse_portmask(optarg); 631 if (enabled_port_mask == 0) { 632 printf("invalid portmask\n"); 633 print_usage(prgname); 634 return -1; 635 } 636 break; 637 638 default: 639 print_usage(prgname); 640 return -1; 641 } 642 } 643 644 if (optind <= 1) { 645 print_usage(prgname); 646 return -1; 647 } 648 649 argv[optind-1] = prgname; 650 651 optind = 1; /* reset getopt lib */ 652 return 0; 653 } 654 655 /* Main function, does initialization and calls the per-lcore functions */ 656 int 657 main(int argc, char *argv[]) 658 { 659 struct rte_mempool *mbuf_pool; 660 struct rte_distributor *d; 661 struct rte_ring *dist_tx_ring; 662 struct rte_ring *rx_dist_ring; 663 unsigned lcore_id, worker_id = 0; 664 unsigned nb_ports; 665 uint8_t portid; 666 uint8_t nb_ports_available; 667 uint64_t t, freq; 668 669 /* catch ctrl-c so we can print on exit */ 670 signal(SIGINT, int_handler); 671 672 /* init EAL */ 673 int ret = rte_eal_init(argc, argv); 674 if (ret < 0) 675 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); 676 argc -= ret; 677 argv += ret; 678 679 /* parse application arguments (after the EAL ones) */ 680 ret = parse_args(argc, argv); 681 if (ret < 0) 682 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); 683 684 if (rte_lcore_count() < 5) 685 rte_exit(EXIT_FAILURE, "Error, This application needs at " 686 "least 5 logical cores to run:\n" 687 "1 lcore for stats (can be core 0)\n" 688 "1 lcore for packet RX\n" 689 "1 lcore for distribution\n" 690 "1 lcore for packet TX\n" 691 "and at least 1 lcore for worker threads\n"); 692 693 nb_ports = rte_eth_dev_count(); 694 if (nb_ports == 0) 695 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 696 if (nb_ports != 1 && (nb_ports & 1)) 697 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 698 "when using a single port\n"); 699 700 mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", 701 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, 702 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 703 if (mbuf_pool == NULL) 704 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); 705 nb_ports_available = nb_ports; 706 707 /* initialize all ports */ 708 for (portid = 0; portid < nb_ports; portid++) { 709 /* skip ports that are not enabled */ 710 if ((enabled_port_mask & (1 << portid)) == 0) { 711 printf("\nSkipping disabled port %d\n", portid); 712 nb_ports_available--; 713 continue; 714 } 715 /* init port */ 716 printf("Initializing port %u... done\n", (unsigned) portid); 717 718 if (port_init(portid, mbuf_pool) != 0) 719 rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n", 720 portid); 721 } 722 723 if (!nb_ports_available) { 724 rte_exit(EXIT_FAILURE, 725 "All available ports are disabled. Please set portmask.\n"); 726 } 727 728 d = rte_distributor_create("PKT_DIST", rte_socket_id(), 729 rte_lcore_count() - 4, 730 RTE_DIST_ALG_BURST); 731 if (d == NULL) 732 rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); 733 734 /* 735 * scheduler ring is read by the transmitter core, and written to 736 * by scheduler core 737 */ 738 dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ, 739 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 740 if (dist_tx_ring == NULL) 741 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 742 743 rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ, 744 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 745 if (rx_dist_ring == NULL) 746 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 747 748 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 749 if (worker_id == rte_lcore_count() - 3) { 750 printf("Starting distributor on lcore_id %d\n", 751 lcore_id); 752 /* distributor core */ 753 struct lcore_params *p = 754 rte_malloc(NULL, sizeof(*p), 0); 755 if (!p) 756 rte_panic("malloc failure\n"); 757 *p = (struct lcore_params){worker_id, d, 758 rx_dist_ring, dist_tx_ring, mbuf_pool}; 759 rte_eal_remote_launch( 760 (lcore_function_t *)lcore_distributor, 761 p, lcore_id); 762 } else if (worker_id == rte_lcore_count() - 4) { 763 printf("Starting tx on worker_id %d, lcore_id %d\n", 764 worker_id, lcore_id); 765 /* tx core */ 766 rte_eal_remote_launch((lcore_function_t *)lcore_tx, 767 dist_tx_ring, lcore_id); 768 } else if (worker_id == rte_lcore_count() - 2) { 769 printf("Starting rx on worker_id %d, lcore_id %d\n", 770 worker_id, lcore_id); 771 /* rx core */ 772 struct lcore_params *p = 773 rte_malloc(NULL, sizeof(*p), 0); 774 if (!p) 775 rte_panic("malloc failure\n"); 776 *p = (struct lcore_params){worker_id, d, rx_dist_ring, 777 dist_tx_ring, mbuf_pool}; 778 rte_eal_remote_launch((lcore_function_t *)lcore_rx, 779 p, lcore_id); 780 } else { 781 printf("Starting worker on worker_id %d, lcore_id %d\n", 782 worker_id, lcore_id); 783 struct lcore_params *p = 784 rte_malloc(NULL, sizeof(*p), 0); 785 if (!p) 786 rte_panic("malloc failure\n"); 787 *p = (struct lcore_params){worker_id, d, rx_dist_ring, 788 dist_tx_ring, mbuf_pool}; 789 790 rte_eal_remote_launch((lcore_function_t *)lcore_worker, 791 p, lcore_id); 792 } 793 worker_id++; 794 } 795 796 freq = rte_get_timer_hz(); 797 t = rte_rdtsc() + freq; 798 while (!quit_signal_dist) { 799 if (t < rte_rdtsc()) { 800 print_stats(); 801 t = rte_rdtsc() + freq; 802 } 803 usleep(1000); 804 } 805 806 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 807 if (rte_eal_wait_lcore(lcore_id) < 0) 808 return -1; 809 } 810 811 print_stats(); 812 return 0; 813 } 814