1 /*- 2 * BSD LICENSE 3 * 4 * Copyright(c) 2010-2017 Intel Corporation. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 10 * * Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * * Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in 14 * the documentation and/or other materials provided with the 15 * distribution. 16 * * Neither the name of Intel Corporation nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <stdint.h> 34 #include <inttypes.h> 35 #include <unistd.h> 36 #include <signal.h> 37 #include <getopt.h> 38 39 #include <rte_eal.h> 40 #include <rte_ethdev.h> 41 #include <rte_cycles.h> 42 #include <rte_malloc.h> 43 #include <rte_debug.h> 44 #include <rte_prefetch.h> 45 #include <rte_distributor.h> 46 47 #define RX_RING_SIZE 512 48 #define TX_RING_SIZE 512 49 #define NUM_MBUFS ((64*1024)-1) 50 #define MBUF_CACHE_SIZE 128 51 #define BURST_SIZE 64 52 #define SCHED_RX_RING_SZ 8192 53 #define SCHED_TX_RING_SZ 65536 54 #define BURST_SIZE_TX 32 55 56 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 57 58 #define ANSI_COLOR_RED "\x1b[31m" 59 #define ANSI_COLOR_RESET "\x1b[0m" 60 61 /* mask of enabled ports */ 62 static uint32_t enabled_port_mask; 63 volatile uint8_t quit_signal; 64 volatile uint8_t quit_signal_rx; 65 volatile uint8_t quit_signal_dist; 66 volatile uint8_t quit_signal_work; 67 68 static volatile struct app_stats { 69 struct { 70 uint64_t rx_pkts; 71 uint64_t returned_pkts; 72 uint64_t enqueued_pkts; 73 uint64_t enqdrop_pkts; 74 } rx __rte_cache_aligned; 75 int pad1 __rte_cache_aligned; 76 77 struct { 78 uint64_t in_pkts; 79 uint64_t ret_pkts; 80 uint64_t sent_pkts; 81 uint64_t enqdrop_pkts; 82 } dist __rte_cache_aligned; 83 int pad2 __rte_cache_aligned; 84 85 struct { 86 uint64_t dequeue_pkts; 87 uint64_t tx_pkts; 88 uint64_t enqdrop_pkts; 89 } tx __rte_cache_aligned; 90 int pad3 __rte_cache_aligned; 91 92 uint64_t worker_pkts[64] __rte_cache_aligned; 93 94 int pad4 __rte_cache_aligned; 95 96 uint64_t worker_bursts[64][8] __rte_cache_aligned; 97 98 int pad5 __rte_cache_aligned; 99 100 uint64_t port_rx_pkts[64] __rte_cache_aligned; 101 uint64_t port_tx_pkts[64] __rte_cache_aligned; 102 } app_stats; 103 104 struct app_stats prev_app_stats; 105 106 static const struct rte_eth_conf port_conf_default = { 107 .rxmode = { 108 .mq_mode = ETH_MQ_RX_RSS, 109 .max_rx_pkt_len = ETHER_MAX_LEN, 110 }, 111 .txmode = { 112 .mq_mode = ETH_MQ_TX_NONE, 113 }, 114 .rx_adv_conf = { 115 .rss_conf = { 116 .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | 117 ETH_RSS_TCP | ETH_RSS_SCTP, 118 } 119 }, 120 }; 121 122 struct output_buffer { 123 unsigned count; 124 struct rte_mbuf *mbufs[BURST_SIZE]; 125 }; 126 127 static void print_stats(void); 128 129 /* 130 * Initialises a given port using global settings and with the rx buffers 131 * coming from the mbuf_pool passed as parameter 132 */ 133 static inline int 134 port_init(uint8_t port, struct rte_mempool *mbuf_pool) 135 { 136 struct rte_eth_conf port_conf = port_conf_default; 137 const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; 138 int retval; 139 uint16_t q; 140 141 if (port >= rte_eth_dev_count()) 142 return -1; 143 144 retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); 145 if (retval != 0) 146 return retval; 147 148 for (q = 0; q < rxRings; q++) { 149 retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, 150 rte_eth_dev_socket_id(port), 151 NULL, mbuf_pool); 152 if (retval < 0) 153 return retval; 154 } 155 156 for (q = 0; q < txRings; q++) { 157 retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, 158 rte_eth_dev_socket_id(port), 159 NULL); 160 if (retval < 0) 161 return retval; 162 } 163 164 retval = rte_eth_dev_start(port); 165 if (retval < 0) 166 return retval; 167 168 struct rte_eth_link link; 169 rte_eth_link_get_nowait(port, &link); 170 while (!link.link_status) { 171 printf("Waiting for Link up on port %"PRIu8"\n", port); 172 sleep(1); 173 rte_eth_link_get_nowait(port, &link); 174 } 175 176 if (!link.link_status) { 177 printf("Link down on port %"PRIu8"\n", port); 178 return 0; 179 } 180 181 struct ether_addr addr; 182 rte_eth_macaddr_get(port, &addr); 183 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 184 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 185 (unsigned)port, 186 addr.addr_bytes[0], addr.addr_bytes[1], 187 addr.addr_bytes[2], addr.addr_bytes[3], 188 addr.addr_bytes[4], addr.addr_bytes[5]); 189 190 rte_eth_promiscuous_enable(port); 191 192 return 0; 193 } 194 195 struct lcore_params { 196 unsigned worker_id; 197 struct rte_distributor *d; 198 struct rte_ring *rx_dist_ring; 199 struct rte_ring *dist_tx_ring; 200 struct rte_mempool *mem_pool; 201 }; 202 203 static int 204 lcore_rx(struct lcore_params *p) 205 { 206 const uint8_t nb_ports = rte_eth_dev_count(); 207 const int socket_id = rte_socket_id(); 208 uint8_t port; 209 struct rte_mbuf *bufs[BURST_SIZE*2]; 210 211 for (port = 0; port < nb_ports; port++) { 212 /* skip ports that are not enabled */ 213 if ((enabled_port_mask & (1 << port)) == 0) 214 continue; 215 216 if (rte_eth_dev_socket_id(port) > 0 && 217 rte_eth_dev_socket_id(port) != socket_id) 218 printf("WARNING, port %u is on remote NUMA node to " 219 "RX thread.\n\tPerformance will not " 220 "be optimal.\n", port); 221 } 222 223 printf("\nCore %u doing packet RX.\n", rte_lcore_id()); 224 port = 0; 225 while (!quit_signal_rx) { 226 227 /* skip ports that are not enabled */ 228 if ((enabled_port_mask & (1 << port)) == 0) { 229 if (++port == nb_ports) 230 port = 0; 231 continue; 232 } 233 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 234 BURST_SIZE); 235 if (unlikely(nb_rx == 0)) { 236 if (++port == nb_ports) 237 port = 0; 238 continue; 239 } 240 app_stats.rx.rx_pkts += nb_rx; 241 242 /* 243 * You can run the distributor on the rx core with this code. Returned 244 * packets are then send straight to the tx core. 245 */ 246 #if 0 247 rte_distributor_process(d, bufs, nb_rx); 248 const uint16_t nb_ret = rte_distributor_returned_pktsd, 249 bufs, BURST_SIZE*2); 250 251 app_stats.rx.returned_pkts += nb_ret; 252 if (unlikely(nb_ret == 0)) { 253 if (++port == nb_ports) 254 port = 0; 255 continue; 256 } 257 258 struct rte_ring *tx_ring = p->dist_tx_ring; 259 uint16_t sent = rte_ring_enqueue_burst(tx_ring, 260 (void *)bufs, nb_ret, NULL); 261 #else 262 uint16_t nb_ret = nb_rx; 263 /* 264 * Swap the following two lines if you want the rx traffic 265 * to go directly to tx, no distribution. 266 */ 267 struct rte_ring *out_ring = p->rx_dist_ring; 268 /* struct rte_ring *out_ring = p->dist_tx_ring; */ 269 270 uint16_t sent = rte_ring_enqueue_burst(out_ring, 271 (void *)bufs, nb_ret, NULL); 272 #endif 273 274 app_stats.rx.enqueued_pkts += sent; 275 if (unlikely(sent < nb_ret)) { 276 app_stats.rx.enqdrop_pkts += nb_ret - sent; 277 RTE_LOG_DP(DEBUG, DISTRAPP, 278 "%s:Packet loss due to full ring\n", __func__); 279 while (sent < nb_ret) 280 rte_pktmbuf_free(bufs[sent++]); 281 } 282 if (++port == nb_ports) 283 port = 0; 284 } 285 /* set worker & tx threads quit flag */ 286 printf("\nCore %u exiting rx task.\n", rte_lcore_id()); 287 quit_signal = 1; 288 return 0; 289 } 290 291 static inline void 292 flush_one_port(struct output_buffer *outbuf, uint8_t outp) 293 { 294 unsigned int nb_tx = rte_eth_tx_burst(outp, 0, 295 outbuf->mbufs, outbuf->count); 296 app_stats.tx.tx_pkts += outbuf->count; 297 298 if (unlikely(nb_tx < outbuf->count)) { 299 app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx; 300 do { 301 rte_pktmbuf_free(outbuf->mbufs[nb_tx]); 302 } while (++nb_tx < outbuf->count); 303 } 304 outbuf->count = 0; 305 } 306 307 static inline void 308 flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports) 309 { 310 uint8_t outp; 311 312 for (outp = 0; outp < nb_ports; outp++) { 313 /* skip ports that are not enabled */ 314 if ((enabled_port_mask & (1 << outp)) == 0) 315 continue; 316 317 if (tx_buffers[outp].count == 0) 318 continue; 319 320 flush_one_port(&tx_buffers[outp], outp); 321 } 322 } 323 324 325 326 static int 327 lcore_distributor(struct lcore_params *p) 328 { 329 struct rte_ring *in_r = p->rx_dist_ring; 330 struct rte_ring *out_r = p->dist_tx_ring; 331 struct rte_mbuf *bufs[BURST_SIZE * 4]; 332 struct rte_distributor *d = p->d; 333 334 printf("\nCore %u acting as distributor core.\n", rte_lcore_id()); 335 while (!quit_signal_dist) { 336 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 337 (void *)bufs, BURST_SIZE*1, NULL); 338 if (nb_rx) { 339 app_stats.dist.in_pkts += nb_rx; 340 341 /* Distribute the packets */ 342 rte_distributor_process(d, bufs, nb_rx); 343 /* Handle Returns */ 344 const uint16_t nb_ret = 345 rte_distributor_returned_pkts(d, 346 bufs, BURST_SIZE*2); 347 348 if (unlikely(nb_ret == 0)) 349 continue; 350 app_stats.dist.ret_pkts += nb_ret; 351 352 uint16_t sent = rte_ring_enqueue_burst(out_r, 353 (void *)bufs, nb_ret, NULL); 354 app_stats.dist.sent_pkts += sent; 355 if (unlikely(sent < nb_ret)) { 356 app_stats.dist.enqdrop_pkts += nb_ret - sent; 357 RTE_LOG(DEBUG, DISTRAPP, 358 "%s:Packet loss due to full out ring\n", 359 __func__); 360 while (sent < nb_ret) 361 rte_pktmbuf_free(bufs[sent++]); 362 } 363 } 364 } 365 printf("\nCore %u exiting distributor task.\n", rte_lcore_id()); 366 quit_signal_work = 1; 367 368 rte_distributor_flush(d); 369 /* Unblock any returns so workers can exit */ 370 rte_distributor_clear_returns(d); 371 quit_signal_rx = 1; 372 return 0; 373 } 374 375 376 static int 377 lcore_tx(struct rte_ring *in_r) 378 { 379 static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; 380 const uint8_t nb_ports = rte_eth_dev_count(); 381 const int socket_id = rte_socket_id(); 382 uint8_t port; 383 384 for (port = 0; port < nb_ports; port++) { 385 /* skip ports that are not enabled */ 386 if ((enabled_port_mask & (1 << port)) == 0) 387 continue; 388 389 if (rte_eth_dev_socket_id(port) > 0 && 390 rte_eth_dev_socket_id(port) != socket_id) 391 printf("WARNING, port %u is on remote NUMA node to " 392 "TX thread.\n\tPerformance will not " 393 "be optimal.\n", port); 394 } 395 396 printf("\nCore %u doing packet TX.\n", rte_lcore_id()); 397 while (!quit_signal) { 398 399 for (port = 0; port < nb_ports; port++) { 400 /* skip ports that are not enabled */ 401 if ((enabled_port_mask & (1 << port)) == 0) 402 continue; 403 404 struct rte_mbuf *bufs[BURST_SIZE_TX]; 405 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 406 (void *)bufs, BURST_SIZE_TX, NULL); 407 app_stats.tx.dequeue_pkts += nb_rx; 408 409 /* if we get no traffic, flush anything we have */ 410 if (unlikely(nb_rx == 0)) { 411 flush_all_ports(tx_buffers, nb_ports); 412 continue; 413 } 414 415 /* for traffic we receive, queue it up for transmit */ 416 uint16_t i; 417 rte_prefetch_non_temporal((void *)bufs[0]); 418 rte_prefetch_non_temporal((void *)bufs[1]); 419 rte_prefetch_non_temporal((void *)bufs[2]); 420 for (i = 0; i < nb_rx; i++) { 421 struct output_buffer *outbuf; 422 uint8_t outp; 423 rte_prefetch_non_temporal((void *)bufs[i + 3]); 424 /* 425 * workers should update in_port to hold the 426 * output port value 427 */ 428 outp = bufs[i]->port; 429 /* skip ports that are not enabled */ 430 if ((enabled_port_mask & (1 << outp)) == 0) 431 continue; 432 433 outbuf = &tx_buffers[outp]; 434 outbuf->mbufs[outbuf->count++] = bufs[i]; 435 if (outbuf->count == BURST_SIZE_TX) 436 flush_one_port(outbuf, outp); 437 } 438 } 439 } 440 printf("\nCore %u exiting tx task.\n", rte_lcore_id()); 441 return 0; 442 } 443 444 static void 445 int_handler(int sig_num) 446 { 447 printf("Exiting on signal %d\n", sig_num); 448 /* set quit flag for rx thread to exit */ 449 quit_signal_dist = 1; 450 } 451 452 static void 453 print_stats(void) 454 { 455 struct rte_eth_stats eth_stats; 456 unsigned int i, j; 457 const unsigned int num_workers = rte_lcore_count() - 4; 458 459 for (i = 0; i < rte_eth_dev_count(); i++) { 460 rte_eth_stats_get(i, ð_stats); 461 app_stats.port_rx_pkts[i] = eth_stats.ipackets; 462 app_stats.port_tx_pkts[i] = eth_stats.opackets; 463 } 464 465 printf("\n\nRX Thread:\n"); 466 for (i = 0; i < rte_eth_dev_count(); i++) { 467 printf("Port %u Pktsin : %5.2f\n", i, 468 (app_stats.port_rx_pkts[i] - 469 prev_app_stats.port_rx_pkts[i])/1000000.0); 470 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i]; 471 } 472 printf(" - Received: %5.2f\n", 473 (app_stats.rx.rx_pkts - 474 prev_app_stats.rx.rx_pkts)/1000000.0); 475 printf(" - Returned: %5.2f\n", 476 (app_stats.rx.returned_pkts - 477 prev_app_stats.rx.returned_pkts)/1000000.0); 478 printf(" - Enqueued: %5.2f\n", 479 (app_stats.rx.enqueued_pkts - 480 prev_app_stats.rx.enqueued_pkts)/1000000.0); 481 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 482 (app_stats.rx.enqdrop_pkts - 483 prev_app_stats.rx.enqdrop_pkts)/1000000.0, 484 ANSI_COLOR_RESET); 485 486 printf("Distributor thread:\n"); 487 printf(" - In: %5.2f\n", 488 (app_stats.dist.in_pkts - 489 prev_app_stats.dist.in_pkts)/1000000.0); 490 printf(" - Returned: %5.2f\n", 491 (app_stats.dist.ret_pkts - 492 prev_app_stats.dist.ret_pkts)/1000000.0); 493 printf(" - Sent: %5.2f\n", 494 (app_stats.dist.sent_pkts - 495 prev_app_stats.dist.sent_pkts)/1000000.0); 496 printf(" - Dropped %s%5.2f%s\n", ANSI_COLOR_RED, 497 (app_stats.dist.enqdrop_pkts - 498 prev_app_stats.dist.enqdrop_pkts)/1000000.0, 499 ANSI_COLOR_RESET); 500 501 printf("TX thread:\n"); 502 printf(" - Dequeued: %5.2f\n", 503 (app_stats.tx.dequeue_pkts - 504 prev_app_stats.tx.dequeue_pkts)/1000000.0); 505 for (i = 0; i < rte_eth_dev_count(); i++) { 506 printf("Port %u Pktsout: %5.2f\n", 507 i, (app_stats.port_tx_pkts[i] - 508 prev_app_stats.port_tx_pkts[i])/1000000.0); 509 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i]; 510 } 511 printf(" - Transmitted: %5.2f\n", 512 (app_stats.tx.tx_pkts - 513 prev_app_stats.tx.tx_pkts)/1000000.0); 514 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED, 515 (app_stats.tx.enqdrop_pkts - 516 prev_app_stats.tx.enqdrop_pkts)/1000000.0, 517 ANSI_COLOR_RESET); 518 519 prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts; 520 prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts; 521 prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts; 522 prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts; 523 prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts; 524 prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts; 525 prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts; 526 prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts; 527 prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts; 528 prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts; 529 prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts; 530 531 for (i = 0; i < num_workers; i++) { 532 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i, 533 (app_stats.worker_pkts[i] - 534 prev_app_stats.worker_pkts[i])/1000000.0); 535 for (j = 0; j < 8; j++) { 536 printf("%"PRIu64" ", app_stats.worker_bursts[i][j]); 537 app_stats.worker_bursts[i][j] = 0; 538 } 539 printf("\n"); 540 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i]; 541 } 542 } 543 544 static int 545 lcore_worker(struct lcore_params *p) 546 { 547 struct rte_distributor *d = p->d; 548 const unsigned id = p->worker_id; 549 unsigned int num = 0; 550 unsigned int i; 551 552 /* 553 * for single port, xor_val will be zero so we won't modify the output 554 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa 555 */ 556 const unsigned xor_val = (rte_eth_dev_count() > 1); 557 struct rte_mbuf *buf[8] __rte_cache_aligned; 558 559 for (i = 0; i < 8; i++) 560 buf[i] = NULL; 561 562 app_stats.worker_pkts[p->worker_id] = 1; 563 564 printf("\nCore %u acting as worker core.\n", rte_lcore_id()); 565 while (!quit_signal_work) { 566 num = rte_distributor_get_pkt(d, id, buf, buf, num); 567 /* Do a little bit of work for each packet */ 568 for (i = 0; i < num; i++) { 569 uint64_t t = rte_rdtsc()+100; 570 571 while (rte_rdtsc() < t) 572 rte_pause(); 573 buf[i]->port ^= xor_val; 574 } 575 576 app_stats.worker_pkts[p->worker_id] += num; 577 if (num > 0) 578 app_stats.worker_bursts[p->worker_id][num-1]++; 579 } 580 return 0; 581 } 582 583 /* display usage */ 584 static void 585 print_usage(const char *prgname) 586 { 587 printf("%s [EAL options] -- -p PORTMASK\n" 588 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 589 prgname); 590 } 591 592 static int 593 parse_portmask(const char *portmask) 594 { 595 char *end = NULL; 596 unsigned long pm; 597 598 /* parse hexadecimal string */ 599 pm = strtoul(portmask, &end, 16); 600 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 601 return -1; 602 603 if (pm == 0) 604 return -1; 605 606 return pm; 607 } 608 609 /* Parse the argument given in the command line of the application */ 610 static int 611 parse_args(int argc, char **argv) 612 { 613 int opt; 614 char **argvopt; 615 int option_index; 616 char *prgname = argv[0]; 617 static struct option lgopts[] = { 618 {NULL, 0, 0, 0} 619 }; 620 621 argvopt = argv; 622 623 while ((opt = getopt_long(argc, argvopt, "p:", 624 lgopts, &option_index)) != EOF) { 625 626 switch (opt) { 627 /* portmask */ 628 case 'p': 629 enabled_port_mask = parse_portmask(optarg); 630 if (enabled_port_mask == 0) { 631 printf("invalid portmask\n"); 632 print_usage(prgname); 633 return -1; 634 } 635 break; 636 637 default: 638 print_usage(prgname); 639 return -1; 640 } 641 } 642 643 if (optind <= 1) { 644 print_usage(prgname); 645 return -1; 646 } 647 648 argv[optind-1] = prgname; 649 650 optind = 1; /* reset getopt lib */ 651 return 0; 652 } 653 654 /* Main function, does initialization and calls the per-lcore functions */ 655 int 656 main(int argc, char *argv[]) 657 { 658 struct rte_mempool *mbuf_pool; 659 struct rte_distributor *d; 660 struct rte_ring *dist_tx_ring; 661 struct rte_ring *rx_dist_ring; 662 unsigned lcore_id, worker_id = 0; 663 unsigned nb_ports; 664 uint8_t portid; 665 uint8_t nb_ports_available; 666 uint64_t t, freq; 667 668 /* catch ctrl-c so we can print on exit */ 669 signal(SIGINT, int_handler); 670 671 /* init EAL */ 672 int ret = rte_eal_init(argc, argv); 673 if (ret < 0) 674 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); 675 argc -= ret; 676 argv += ret; 677 678 /* parse application arguments (after the EAL ones) */ 679 ret = parse_args(argc, argv); 680 if (ret < 0) 681 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); 682 683 if (rte_lcore_count() < 5) 684 rte_exit(EXIT_FAILURE, "Error, This application needs at " 685 "least 5 logical cores to run:\n" 686 "1 lcore for stats (can be core 0)\n" 687 "1 lcore for packet RX\n" 688 "1 lcore for distribution\n" 689 "1 lcore for packet TX\n" 690 "and at least 1 lcore for worker threads\n"); 691 692 nb_ports = rte_eth_dev_count(); 693 if (nb_ports == 0) 694 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 695 if (nb_ports != 1 && (nb_ports & 1)) 696 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 697 "when using a single port\n"); 698 699 mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", 700 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, 701 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 702 if (mbuf_pool == NULL) 703 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); 704 nb_ports_available = nb_ports; 705 706 /* initialize all ports */ 707 for (portid = 0; portid < nb_ports; portid++) { 708 /* skip ports that are not enabled */ 709 if ((enabled_port_mask & (1 << portid)) == 0) { 710 printf("\nSkipping disabled port %d\n", portid); 711 nb_ports_available--; 712 continue; 713 } 714 /* init port */ 715 printf("Initializing port %u... done\n", (unsigned) portid); 716 717 if (port_init(portid, mbuf_pool) != 0) 718 rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n", 719 portid); 720 } 721 722 if (!nb_ports_available) { 723 rte_exit(EXIT_FAILURE, 724 "All available ports are disabled. Please set portmask.\n"); 725 } 726 727 d = rte_distributor_create("PKT_DIST", rte_socket_id(), 728 rte_lcore_count() - 4, 729 RTE_DIST_ALG_BURST); 730 if (d == NULL) 731 rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); 732 733 /* 734 * scheduler ring is read by the transmitter core, and written to 735 * by scheduler core 736 */ 737 dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ, 738 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 739 if (dist_tx_ring == NULL) 740 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 741 742 rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ, 743 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); 744 if (rx_dist_ring == NULL) 745 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 746 747 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 748 if (worker_id == rte_lcore_count() - 3) { 749 printf("Starting distributor on lcore_id %d\n", 750 lcore_id); 751 /* distributor core */ 752 struct lcore_params *p = 753 rte_malloc(NULL, sizeof(*p), 0); 754 if (!p) 755 rte_panic("malloc failure\n"); 756 *p = (struct lcore_params){worker_id, d, 757 rx_dist_ring, dist_tx_ring, mbuf_pool}; 758 rte_eal_remote_launch( 759 (lcore_function_t *)lcore_distributor, 760 p, lcore_id); 761 } else if (worker_id == rte_lcore_count() - 4) { 762 printf("Starting tx on worker_id %d, lcore_id %d\n", 763 worker_id, lcore_id); 764 /* tx core */ 765 rte_eal_remote_launch((lcore_function_t *)lcore_tx, 766 dist_tx_ring, lcore_id); 767 } else if (worker_id == rte_lcore_count() - 2) { 768 printf("Starting rx on worker_id %d, lcore_id %d\n", 769 worker_id, lcore_id); 770 /* rx core */ 771 struct lcore_params *p = 772 rte_malloc(NULL, sizeof(*p), 0); 773 if (!p) 774 rte_panic("malloc failure\n"); 775 *p = (struct lcore_params){worker_id, d, rx_dist_ring, 776 dist_tx_ring, mbuf_pool}; 777 rte_eal_remote_launch((lcore_function_t *)lcore_rx, 778 p, lcore_id); 779 } else { 780 printf("Starting worker on worker_id %d, lcore_id %d\n", 781 worker_id, lcore_id); 782 struct lcore_params *p = 783 rte_malloc(NULL, sizeof(*p), 0); 784 if (!p) 785 rte_panic("malloc failure\n"); 786 *p = (struct lcore_params){worker_id, d, rx_dist_ring, 787 dist_tx_ring, mbuf_pool}; 788 789 rte_eal_remote_launch((lcore_function_t *)lcore_worker, 790 p, lcore_id); 791 } 792 worker_id++; 793 } 794 795 freq = rte_get_timer_hz(); 796 t = rte_rdtsc() + freq; 797 while (!quit_signal_dist) { 798 if (t < rte_rdtsc()) { 799 print_stats(); 800 t = rte_rdtsc() + freq; 801 } 802 usleep(1000); 803 } 804 805 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 806 if (rte_eal_wait_lcore(lcore_id) < 0) 807 return -1; 808 } 809 810 print_stats(); 811 return 0; 812 } 813