1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2016 Intel Corporation 3 */ 4 5 #include <stdlib.h> 6 #include <signal.h> 7 #include <getopt.h> 8 9 #include <rte_eal.h> 10 #include <rte_common.h> 11 #include <rte_errno.h> 12 #include <rte_ethdev.h> 13 #include <rte_lcore.h> 14 #include <rte_malloc.h> 15 #include <rte_mbuf.h> 16 #include <rte_mempool.h> 17 #include <rte_ring.h> 18 #include <rte_reorder.h> 19 20 #define RX_DESC_PER_QUEUE 1024 21 #define TX_DESC_PER_QUEUE 1024 22 23 #define MAX_PKTS_BURST 32 24 #define REORDER_BUFFER_SIZE 8192 25 #define MBUF_PER_POOL 65535 26 #define MBUF_POOL_CACHE_SIZE 250 27 28 #define RING_SIZE 16384 29 30 /* Macros for printing using RTE_LOG */ 31 #define RTE_LOGTYPE_REORDERAPP RTE_LOGTYPE_USER1 32 33 enum { 34 #define OPT_DISABLE_REORDER "disable-reorder" 35 OPT_DISABLE_REORDER_NUM = 256, 36 #define OPT_INSIGHT_WORKER "insight-worker" 37 OPT_INSIGHT_WORKER_NUM, 38 }; 39 40 unsigned int portmask; 41 unsigned int disable_reorder; 42 unsigned int insight_worker; 43 volatile uint8_t quit_signal; 44 45 static struct rte_mempool *mbuf_pool; 46 47 static struct rte_eth_conf port_conf_default; 48 49 struct worker_thread_args { 50 struct rte_ring *ring_in; 51 struct rte_ring *ring_out; 52 }; 53 54 struct send_thread_args { 55 struct rte_ring *ring_in; 56 struct rte_reorder_buffer *buffer; 57 }; 58 59 volatile struct app_stats { 60 struct { 61 uint64_t rx_pkts; 62 uint64_t enqueue_pkts; 63 uint64_t enqueue_failed_pkts; 64 } rx __rte_cache_aligned; 65 66 struct { 67 uint64_t dequeue_pkts; 68 uint64_t enqueue_pkts; 69 uint64_t enqueue_failed_pkts; 70 } wkr __rte_cache_aligned; 71 72 struct { 73 uint64_t dequeue_pkts; 74 /* Too early pkts transmitted directly w/o reordering */ 75 uint64_t early_pkts_txtd_woro; 76 /* Too early pkts failed from direct transmit */ 77 uint64_t early_pkts_tx_failed_woro; 78 uint64_t ro_tx_pkts; 79 uint64_t ro_tx_failed_pkts; 80 } tx __rte_cache_aligned; 81 } app_stats; 82 83 /* per worker lcore stats */ 84 struct wkr_stats_per { 85 uint64_t deq_pkts; 86 uint64_t enq_pkts; 87 uint64_t enq_failed_pkts; 88 } __rte_cache_aligned; 89 90 static struct wkr_stats_per wkr_stats[RTE_MAX_LCORE] = { {0} }; 91 /** 92 * Get the last enabled lcore ID 93 * 94 * @return 95 * The last enabled lcore ID. 96 */ 97 static unsigned int 98 get_last_lcore_id(void) 99 { 100 int i; 101 102 for (i = RTE_MAX_LCORE - 1; i >= 0; i--) 103 if (rte_lcore_is_enabled(i)) 104 return i; 105 return 0; 106 } 107 108 /** 109 * Get the previous enabled lcore ID 110 * @param id 111 * The current lcore ID 112 * @return 113 * The previous enabled lcore ID or the current lcore 114 * ID if it is the first available core. 115 */ 116 static unsigned int 117 get_previous_lcore_id(unsigned int id) 118 { 119 int i; 120 121 for (i = id - 1; i >= 0; i--) 122 if (rte_lcore_is_enabled(i)) 123 return i; 124 return id; 125 } 126 127 static inline void 128 pktmbuf_free_bulk(struct rte_mbuf *mbuf_table[], unsigned n) 129 { 130 unsigned int i; 131 132 for (i = 0; i < n; i++) 133 rte_pktmbuf_free(mbuf_table[i]); 134 } 135 136 /* display usage */ 137 static void 138 print_usage(const char *prgname) 139 { 140 printf("%s [EAL options] -- -p PORTMASK\n" 141 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 142 prgname); 143 } 144 145 static int 146 parse_portmask(const char *portmask) 147 { 148 unsigned long pm; 149 char *end = NULL; 150 151 /* parse hexadecimal string */ 152 pm = strtoul(portmask, &end, 16); 153 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 154 return 0; 155 156 return pm; 157 } 158 159 /* Parse the argument given in the command line of the application */ 160 static int 161 parse_args(int argc, char **argv) 162 { 163 int opt; 164 int option_index; 165 char **argvopt; 166 char *prgname = argv[0]; 167 static struct option lgopts[] = { 168 {OPT_DISABLE_REORDER, 0, NULL, OPT_DISABLE_REORDER_NUM}, 169 {OPT_INSIGHT_WORKER, 0, NULL, OPT_INSIGHT_WORKER_NUM }, 170 {NULL, 0, 0, 0 } 171 }; 172 173 argvopt = argv; 174 175 while ((opt = getopt_long(argc, argvopt, "p:", 176 lgopts, &option_index)) != EOF) { 177 switch (opt) { 178 /* portmask */ 179 case 'p': 180 portmask = parse_portmask(optarg); 181 if (portmask == 0) { 182 printf("invalid portmask\n"); 183 print_usage(prgname); 184 return -1; 185 } 186 break; 187 188 /* long options */ 189 case OPT_DISABLE_REORDER_NUM: 190 printf("reorder disabled\n"); 191 disable_reorder = 1; 192 break; 193 194 case OPT_INSIGHT_WORKER_NUM: 195 printf("print all worker statistics\n"); 196 insight_worker = 1; 197 break; 198 199 default: 200 print_usage(prgname); 201 return -1; 202 } 203 } 204 if (optind <= 1) { 205 print_usage(prgname); 206 return -1; 207 } 208 209 argv[optind-1] = prgname; 210 optind = 1; /* reset getopt lib */ 211 return 0; 212 } 213 214 /* 215 * Tx buffer error callback 216 */ 217 static void 218 flush_tx_error_callback(struct rte_mbuf **unsent, uint16_t count, 219 void *userdata __rte_unused) { 220 221 /* free the mbufs which failed from transmit */ 222 app_stats.tx.ro_tx_failed_pkts += count; 223 RTE_LOG_DP(DEBUG, REORDERAPP, "%s:Packet loss with tx_burst\n", __func__); 224 pktmbuf_free_bulk(unsent, count); 225 226 } 227 228 static inline int 229 free_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[]) { 230 uint16_t port_id; 231 232 /* initialize buffers for all ports */ 233 RTE_ETH_FOREACH_DEV(port_id) { 234 /* skip ports that are not enabled */ 235 if ((portmask & (1 << port_id)) == 0) 236 continue; 237 238 rte_free(tx_buffer[port_id]); 239 } 240 return 0; 241 } 242 243 static inline int 244 configure_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[]) 245 { 246 uint16_t port_id; 247 int ret; 248 249 /* initialize buffers for all ports */ 250 RTE_ETH_FOREACH_DEV(port_id) { 251 /* skip ports that are not enabled */ 252 if ((portmask & (1 << port_id)) == 0) 253 continue; 254 255 /* Initialize TX buffers */ 256 tx_buffer[port_id] = rte_zmalloc_socket("tx_buffer", 257 RTE_ETH_TX_BUFFER_SIZE(MAX_PKTS_BURST), 0, 258 rte_eth_dev_socket_id(port_id)); 259 if (tx_buffer[port_id] == NULL) 260 rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n", 261 port_id); 262 263 rte_eth_tx_buffer_init(tx_buffer[port_id], MAX_PKTS_BURST); 264 265 ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[port_id], 266 flush_tx_error_callback, NULL); 267 if (ret < 0) 268 rte_exit(EXIT_FAILURE, 269 "Cannot set error callback for tx buffer on port %u\n", 270 port_id); 271 } 272 return 0; 273 } 274 275 static inline int 276 configure_eth_port(uint16_t port_id) 277 { 278 struct rte_ether_addr addr; 279 const uint16_t rxRings = 1, txRings = 1; 280 int ret; 281 uint16_t q; 282 uint16_t nb_rxd = RX_DESC_PER_QUEUE; 283 uint16_t nb_txd = TX_DESC_PER_QUEUE; 284 struct rte_eth_dev_info dev_info; 285 struct rte_eth_txconf txconf; 286 struct rte_eth_conf port_conf = port_conf_default; 287 288 if (!rte_eth_dev_is_valid_port(port_id)) 289 return -1; 290 291 ret = rte_eth_dev_info_get(port_id, &dev_info); 292 if (ret != 0) { 293 printf("Error during getting device (port %u) info: %s\n", 294 port_id, strerror(-ret)); 295 return ret; 296 } 297 298 if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE) 299 port_conf.txmode.offloads |= 300 RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE; 301 ret = rte_eth_dev_configure(port_id, rxRings, txRings, &port_conf); 302 if (ret != 0) 303 return ret; 304 305 ret = rte_eth_dev_adjust_nb_rx_tx_desc(port_id, &nb_rxd, &nb_txd); 306 if (ret != 0) 307 return ret; 308 309 for (q = 0; q < rxRings; q++) { 310 ret = rte_eth_rx_queue_setup(port_id, q, nb_rxd, 311 rte_eth_dev_socket_id(port_id), NULL, 312 mbuf_pool); 313 if (ret < 0) 314 return ret; 315 } 316 317 txconf = dev_info.default_txconf; 318 txconf.offloads = port_conf.txmode.offloads; 319 for (q = 0; q < txRings; q++) { 320 ret = rte_eth_tx_queue_setup(port_id, q, nb_txd, 321 rte_eth_dev_socket_id(port_id), &txconf); 322 if (ret < 0) 323 return ret; 324 } 325 326 ret = rte_eth_dev_start(port_id); 327 if (ret < 0) 328 return ret; 329 330 ret = rte_eth_macaddr_get(port_id, &addr); 331 if (ret != 0) { 332 printf("Failed to get MAC address (port %u): %s\n", 333 port_id, rte_strerror(-ret)); 334 return ret; 335 } 336 337 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 338 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 339 port_id, RTE_ETHER_ADDR_BYTES(&addr)); 340 341 ret = rte_eth_promiscuous_enable(port_id); 342 if (ret != 0) 343 return ret; 344 345 return 0; 346 } 347 348 static void 349 print_stats(void) 350 { 351 uint16_t i; 352 struct rte_eth_stats eth_stats; 353 unsigned int lcore_id, last_lcore_id, main_lcore_id, end_w_lcore_id; 354 355 last_lcore_id = get_last_lcore_id(); 356 main_lcore_id = rte_get_main_lcore(); 357 end_w_lcore_id = get_previous_lcore_id(last_lcore_id); 358 359 printf("\nRX thread stats:\n"); 360 printf(" - Pkts rxd: %"PRIu64"\n", 361 app_stats.rx.rx_pkts); 362 printf(" - Pkts enqd to workers ring: %"PRIu64"\n", 363 app_stats.rx.enqueue_pkts); 364 365 for (lcore_id = 0; lcore_id <= end_w_lcore_id; lcore_id++) { 366 if (insight_worker 367 && rte_lcore_is_enabled(lcore_id) 368 && lcore_id != main_lcore_id) { 369 printf("\nWorker thread stats on core [%u]:\n", 370 lcore_id); 371 printf(" - Pkts deqd from workers ring: %"PRIu64"\n", 372 wkr_stats[lcore_id].deq_pkts); 373 printf(" - Pkts enqd to tx ring: %"PRIu64"\n", 374 wkr_stats[lcore_id].enq_pkts); 375 printf(" - Pkts enq to tx failed: %"PRIu64"\n", 376 wkr_stats[lcore_id].enq_failed_pkts); 377 } 378 379 app_stats.wkr.dequeue_pkts += wkr_stats[lcore_id].deq_pkts; 380 app_stats.wkr.enqueue_pkts += wkr_stats[lcore_id].enq_pkts; 381 app_stats.wkr.enqueue_failed_pkts += 382 wkr_stats[lcore_id].enq_failed_pkts; 383 } 384 385 printf("\nWorker thread stats:\n"); 386 printf(" - Pkts deqd from workers ring: %"PRIu64"\n", 387 app_stats.wkr.dequeue_pkts); 388 printf(" - Pkts enqd to tx ring: %"PRIu64"\n", 389 app_stats.wkr.enqueue_pkts); 390 printf(" - Pkts enq to tx failed: %"PRIu64"\n", 391 app_stats.wkr.enqueue_failed_pkts); 392 393 printf("\nTX stats:\n"); 394 printf(" - Pkts deqd from tx ring: %"PRIu64"\n", 395 app_stats.tx.dequeue_pkts); 396 printf(" - Ro Pkts transmitted: %"PRIu64"\n", 397 app_stats.tx.ro_tx_pkts); 398 printf(" - Ro Pkts tx failed: %"PRIu64"\n", 399 app_stats.tx.ro_tx_failed_pkts); 400 printf(" - Pkts transmitted w/o reorder: %"PRIu64"\n", 401 app_stats.tx.early_pkts_txtd_woro); 402 printf(" - Pkts tx failed w/o reorder: %"PRIu64"\n", 403 app_stats.tx.early_pkts_tx_failed_woro); 404 405 RTE_ETH_FOREACH_DEV(i) { 406 rte_eth_stats_get(i, ð_stats); 407 printf("\nPort %u stats:\n", i); 408 printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); 409 printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); 410 printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); 411 printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); 412 printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); 413 } 414 } 415 416 static void 417 int_handler(int sig_num) 418 { 419 printf("Exiting on signal %d\n", sig_num); 420 quit_signal = 1; 421 } 422 423 /** 424 * This thread receives mbufs from the port and affects them an internal 425 * sequence number to keep track of their order of arrival through an 426 * mbuf structure. 427 * The mbufs are then passed to the worker threads via the rx_to_workers 428 * ring. 429 */ 430 static int 431 rx_thread(struct rte_ring *ring_out) 432 { 433 uint32_t seqn = 0; 434 uint16_t i, ret = 0; 435 uint16_t nb_rx_pkts; 436 uint16_t port_id; 437 struct rte_mbuf *pkts[MAX_PKTS_BURST]; 438 439 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 440 rte_lcore_id()); 441 442 while (!quit_signal) { 443 444 RTE_ETH_FOREACH_DEV(port_id) { 445 if ((portmask & (1 << port_id)) != 0) { 446 447 /* receive packets */ 448 nb_rx_pkts = rte_eth_rx_burst(port_id, 0, 449 pkts, MAX_PKTS_BURST); 450 if (nb_rx_pkts == 0) { 451 RTE_LOG_DP(DEBUG, REORDERAPP, 452 "%s():Received zero packets\n", __func__); 453 continue; 454 } 455 app_stats.rx.rx_pkts += nb_rx_pkts; 456 457 /* mark sequence number */ 458 for (i = 0; i < nb_rx_pkts; ) 459 *rte_reorder_seqn(pkts[i++]) = seqn++; 460 461 /* enqueue to rx_to_workers ring */ 462 ret = rte_ring_enqueue_burst(ring_out, 463 (void *)pkts, nb_rx_pkts, NULL); 464 app_stats.rx.enqueue_pkts += ret; 465 if (unlikely(ret < nb_rx_pkts)) { 466 app_stats.rx.enqueue_failed_pkts += 467 (nb_rx_pkts-ret); 468 pktmbuf_free_bulk(&pkts[ret], nb_rx_pkts - ret); 469 } 470 } 471 } 472 } 473 return 0; 474 } 475 476 /** 477 * This thread takes bursts of packets from the rx_to_workers ring and 478 * Changes the input port value to output port value. And feds it to 479 * workers_to_tx 480 */ 481 static int 482 worker_thread(void *args_ptr) 483 { 484 const uint16_t nb_ports = rte_eth_dev_count_avail(); 485 uint16_t i, ret = 0; 486 uint16_t burst_size = 0; 487 struct worker_thread_args *args; 488 struct rte_mbuf *burst_buffer[MAX_PKTS_BURST] = { NULL }; 489 struct rte_ring *ring_in, *ring_out; 490 const unsigned xor_val = (nb_ports > 1); 491 unsigned int core_id = rte_lcore_id(); 492 493 args = (struct worker_thread_args *) args_ptr; 494 ring_in = args->ring_in; 495 ring_out = args->ring_out; 496 497 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 498 core_id); 499 500 while (!quit_signal) { 501 502 /* dequeue the mbufs from rx_to_workers ring */ 503 burst_size = rte_ring_dequeue_burst(ring_in, 504 (void *)burst_buffer, MAX_PKTS_BURST, NULL); 505 if (unlikely(burst_size == 0)) 506 continue; 507 508 wkr_stats[core_id].deq_pkts += burst_size; 509 510 /* just do some operation on mbuf */ 511 for (i = 0; i < burst_size;) 512 burst_buffer[i++]->port ^= xor_val; 513 514 /* enqueue the modified mbufs to workers_to_tx ring */ 515 ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, 516 burst_size, NULL); 517 wkr_stats[core_id].enq_pkts += ret; 518 if (unlikely(ret < burst_size)) { 519 /* Return the mbufs to their respective pool, dropping packets */ 520 wkr_stats[core_id].enq_failed_pkts += burst_size - ret; 521 pktmbuf_free_bulk(&burst_buffer[ret], burst_size - ret); 522 } 523 } 524 return 0; 525 } 526 527 /** 528 * Dequeue mbufs from the workers_to_tx ring and reorder them before 529 * transmitting. 530 */ 531 static int 532 send_thread(struct send_thread_args *args) 533 { 534 int ret; 535 unsigned int i, dret; 536 uint16_t nb_dq_mbufs; 537 uint8_t outp; 538 unsigned sent; 539 struct rte_mbuf *mbufs[MAX_PKTS_BURST]; 540 struct rte_mbuf *rombufs[MAX_PKTS_BURST] = {NULL}; 541 static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS]; 542 543 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, rte_lcore_id()); 544 545 configure_tx_buffers(tx_buffer); 546 547 while (!quit_signal) { 548 549 /* deque the mbufs from workers_to_tx ring */ 550 nb_dq_mbufs = rte_ring_dequeue_burst(args->ring_in, 551 (void *)mbufs, MAX_PKTS_BURST, NULL); 552 553 if (unlikely(nb_dq_mbufs == 0)) 554 continue; 555 556 app_stats.tx.dequeue_pkts += nb_dq_mbufs; 557 558 for (i = 0; i < nb_dq_mbufs; i++) { 559 /* send dequeued mbufs for reordering */ 560 ret = rte_reorder_insert(args->buffer, mbufs[i]); 561 562 if (ret == -1 && rte_errno == ERANGE) { 563 /* Too early pkts should be transmitted out directly */ 564 RTE_LOG_DP(DEBUG, REORDERAPP, 565 "%s():Cannot reorder early packet " 566 "direct enqueuing to TX\n", __func__); 567 outp = mbufs[i]->port; 568 if ((portmask & (1 << outp)) == 0) { 569 rte_pktmbuf_free(mbufs[i]); 570 continue; 571 } 572 if (rte_eth_tx_burst(outp, 0, (void *)mbufs[i], 1) != 1) { 573 rte_pktmbuf_free(mbufs[i]); 574 app_stats.tx.early_pkts_tx_failed_woro++; 575 } else 576 app_stats.tx.early_pkts_txtd_woro++; 577 } else if (ret == -1 && rte_errno == ENOSPC) { 578 /** 579 * Early pkts just outside of window should be dropped 580 */ 581 rte_pktmbuf_free(mbufs[i]); 582 } 583 } 584 585 /* 586 * drain MAX_PKTS_BURST of reordered 587 * mbufs for transmit 588 */ 589 dret = rte_reorder_drain(args->buffer, rombufs, MAX_PKTS_BURST); 590 for (i = 0; i < dret; i++) { 591 592 struct rte_eth_dev_tx_buffer *outbuf; 593 uint8_t outp1; 594 595 outp1 = rombufs[i]->port; 596 /* skip ports that are not enabled */ 597 if ((portmask & (1 << outp1)) == 0) { 598 rte_pktmbuf_free(rombufs[i]); 599 continue; 600 } 601 602 outbuf = tx_buffer[outp1]; 603 sent = rte_eth_tx_buffer(outp1, 0, outbuf, rombufs[i]); 604 if (sent) 605 app_stats.tx.ro_tx_pkts += sent; 606 } 607 } 608 609 free_tx_buffers(tx_buffer); 610 611 return 0; 612 } 613 614 /** 615 * Dequeue mbufs from the workers_to_tx ring and transmit them 616 */ 617 static int 618 tx_thread(struct rte_ring *ring_in) 619 { 620 uint32_t i, dqnum; 621 uint8_t outp; 622 unsigned sent; 623 struct rte_mbuf *mbufs[MAX_PKTS_BURST]; 624 struct rte_eth_dev_tx_buffer *outbuf; 625 static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS]; 626 627 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 628 rte_lcore_id()); 629 630 configure_tx_buffers(tx_buffer); 631 632 while (!quit_signal) { 633 634 /* deque the mbufs from workers_to_tx ring */ 635 dqnum = rte_ring_dequeue_burst(ring_in, 636 (void *)mbufs, MAX_PKTS_BURST, NULL); 637 638 if (unlikely(dqnum == 0)) 639 continue; 640 641 app_stats.tx.dequeue_pkts += dqnum; 642 643 for (i = 0; i < dqnum; i++) { 644 outp = mbufs[i]->port; 645 /* skip ports that are not enabled */ 646 if ((portmask & (1 << outp)) == 0) { 647 rte_pktmbuf_free(mbufs[i]); 648 continue; 649 } 650 651 outbuf = tx_buffer[outp]; 652 sent = rte_eth_tx_buffer(outp, 0, outbuf, mbufs[i]); 653 if (sent) 654 app_stats.tx.ro_tx_pkts += sent; 655 } 656 } 657 658 return 0; 659 } 660 661 int 662 main(int argc, char **argv) 663 { 664 int ret; 665 unsigned nb_ports; 666 unsigned int lcore_id, last_lcore_id, main_lcore_id; 667 uint16_t port_id; 668 uint16_t nb_ports_available; 669 struct worker_thread_args worker_args = {NULL, NULL}; 670 struct send_thread_args send_args = {NULL, NULL}; 671 struct rte_ring *rx_to_workers; 672 struct rte_ring *workers_to_tx; 673 674 /* catch ctrl-c so we can print on exit */ 675 signal(SIGINT, int_handler); 676 677 /* Initialize EAL */ 678 ret = rte_eal_init(argc, argv); 679 if (ret < 0) 680 rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n"); 681 682 argc -= ret; 683 argv += ret; 684 685 /* Parse the application specific arguments */ 686 ret = parse_args(argc, argv); 687 if (ret < 0) 688 rte_exit(EXIT_FAILURE, "Invalid packet_ordering arguments\n"); 689 690 /* Check if we have enough cores */ 691 if (rte_lcore_count() < 3) 692 rte_exit(EXIT_FAILURE, "Error, This application needs at " 693 "least 3 logical cores to run:\n" 694 "1 lcore for packet RX\n" 695 "1 lcore for packet TX\n" 696 "and at least 1 lcore for worker threads\n"); 697 698 nb_ports = rte_eth_dev_count_avail(); 699 if (nb_ports == 0) 700 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 701 if (nb_ports != 1 && (nb_ports & 1)) 702 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 703 "when using a single port\n"); 704 705 mbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", MBUF_PER_POOL, 706 MBUF_POOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, 707 rte_socket_id()); 708 if (mbuf_pool == NULL) 709 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 710 711 nb_ports_available = nb_ports; 712 713 /* initialize all ports */ 714 RTE_ETH_FOREACH_DEV(port_id) { 715 /* skip ports that are not enabled */ 716 if ((portmask & (1 << port_id)) == 0) { 717 printf("\nSkipping disabled port %d\n", port_id); 718 nb_ports_available--; 719 continue; 720 } 721 /* init port */ 722 printf("Initializing port %u... done\n", port_id); 723 724 if (configure_eth_port(port_id) != 0) 725 rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n", 726 port_id); 727 } 728 729 if (!nb_ports_available) { 730 rte_exit(EXIT_FAILURE, 731 "All available ports are disabled. Please set portmask.\n"); 732 } 733 734 /* Create rings for inter core communication */ 735 rx_to_workers = rte_ring_create("rx_to_workers", RING_SIZE, rte_socket_id(), 736 RING_F_SP_ENQ); 737 if (rx_to_workers == NULL) 738 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 739 740 workers_to_tx = rte_ring_create("workers_to_tx", RING_SIZE, rte_socket_id(), 741 RING_F_SC_DEQ); 742 if (workers_to_tx == NULL) 743 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 744 745 if (!disable_reorder) { 746 send_args.buffer = rte_reorder_create("PKT_RO", rte_socket_id(), 747 REORDER_BUFFER_SIZE); 748 if (send_args.buffer == NULL) 749 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 750 } 751 752 last_lcore_id = get_last_lcore_id(); 753 main_lcore_id = rte_get_main_lcore(); 754 755 worker_args.ring_in = rx_to_workers; 756 worker_args.ring_out = workers_to_tx; 757 758 /* Start worker_thread() on all the available worker cores but the last 1 */ 759 for (lcore_id = 0; lcore_id <= get_previous_lcore_id(last_lcore_id); lcore_id++) 760 if (rte_lcore_is_enabled(lcore_id) && lcore_id != main_lcore_id) 761 rte_eal_remote_launch(worker_thread, (void *)&worker_args, 762 lcore_id); 763 764 if (disable_reorder) { 765 /* Start tx_thread() on the last worker core */ 766 rte_eal_remote_launch((lcore_function_t *)tx_thread, workers_to_tx, 767 last_lcore_id); 768 } else { 769 send_args.ring_in = workers_to_tx; 770 /* Start send_thread() on the last worker core */ 771 rte_eal_remote_launch((lcore_function_t *)send_thread, 772 (void *)&send_args, last_lcore_id); 773 } 774 775 /* Start rx_thread() on the main core */ 776 rx_thread(rx_to_workers); 777 778 RTE_LCORE_FOREACH_WORKER(lcore_id) { 779 if (rte_eal_wait_lcore(lcore_id) < 0) 780 return -1; 781 } 782 783 print_stats(); 784 785 /* clean up the EAL */ 786 rte_eal_cleanup(); 787 788 return 0; 789 } 790