1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2016 Intel Corporation 3 */ 4 5 #include <signal.h> 6 #include <getopt.h> 7 8 #include <rte_eal.h> 9 #include <rte_common.h> 10 #include <rte_errno.h> 11 #include <rte_ethdev.h> 12 #include <rte_lcore.h> 13 #include <rte_malloc.h> 14 #include <rte_mbuf.h> 15 #include <rte_mempool.h> 16 #include <rte_ring.h> 17 #include <rte_reorder.h> 18 19 #define RX_DESC_PER_QUEUE 1024 20 #define TX_DESC_PER_QUEUE 1024 21 22 #define MAX_PKTS_BURST 32 23 #define REORDER_BUFFER_SIZE 8192 24 #define MBUF_PER_POOL 65535 25 #define MBUF_POOL_CACHE_SIZE 250 26 27 #define RING_SIZE 16384 28 29 /* Macros for printing using RTE_LOG */ 30 #define RTE_LOGTYPE_REORDERAPP RTE_LOGTYPE_USER1 31 32 unsigned int portmask; 33 unsigned int disable_reorder; 34 volatile uint8_t quit_signal; 35 36 static struct rte_mempool *mbuf_pool; 37 38 static struct rte_eth_conf port_conf_default; 39 40 struct worker_thread_args { 41 struct rte_ring *ring_in; 42 struct rte_ring *ring_out; 43 }; 44 45 struct send_thread_args { 46 struct rte_ring *ring_in; 47 struct rte_reorder_buffer *buffer; 48 }; 49 50 volatile struct app_stats { 51 struct { 52 uint64_t rx_pkts; 53 uint64_t enqueue_pkts; 54 uint64_t enqueue_failed_pkts; 55 } rx __rte_cache_aligned; 56 57 struct { 58 uint64_t dequeue_pkts; 59 uint64_t enqueue_pkts; 60 uint64_t enqueue_failed_pkts; 61 } wkr __rte_cache_aligned; 62 63 struct { 64 uint64_t dequeue_pkts; 65 /* Too early pkts transmitted directly w/o reordering */ 66 uint64_t early_pkts_txtd_woro; 67 /* Too early pkts failed from direct transmit */ 68 uint64_t early_pkts_tx_failed_woro; 69 uint64_t ro_tx_pkts; 70 uint64_t ro_tx_failed_pkts; 71 } tx __rte_cache_aligned; 72 } app_stats; 73 74 /** 75 * Get the last enabled lcore ID 76 * 77 * @return 78 * The last enabled lcore ID. 79 */ 80 static unsigned int 81 get_last_lcore_id(void) 82 { 83 int i; 84 85 for (i = RTE_MAX_LCORE - 1; i >= 0; i--) 86 if (rte_lcore_is_enabled(i)) 87 return i; 88 return 0; 89 } 90 91 /** 92 * Get the previous enabled lcore ID 93 * @param id 94 * The current lcore ID 95 * @return 96 * The previous enabled lcore ID or the current lcore 97 * ID if it is the first available core. 98 */ 99 static unsigned int 100 get_previous_lcore_id(unsigned int id) 101 { 102 int i; 103 104 for (i = id - 1; i >= 0; i--) 105 if (rte_lcore_is_enabled(i)) 106 return i; 107 return id; 108 } 109 110 static inline void 111 pktmbuf_free_bulk(struct rte_mbuf *mbuf_table[], unsigned n) 112 { 113 unsigned int i; 114 115 for (i = 0; i < n; i++) 116 rte_pktmbuf_free(mbuf_table[i]); 117 } 118 119 /* display usage */ 120 static void 121 print_usage(const char *prgname) 122 { 123 printf("%s [EAL options] -- -p PORTMASK\n" 124 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 125 prgname); 126 } 127 128 static int 129 parse_portmask(const char *portmask) 130 { 131 unsigned long pm; 132 char *end = NULL; 133 134 /* parse hexadecimal string */ 135 pm = strtoul(portmask, &end, 16); 136 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 137 return -1; 138 139 if (pm == 0) 140 return -1; 141 142 return pm; 143 } 144 145 /* Parse the argument given in the command line of the application */ 146 static int 147 parse_args(int argc, char **argv) 148 { 149 int opt; 150 int option_index; 151 char **argvopt; 152 char *prgname = argv[0]; 153 static struct option lgopts[] = { 154 {"disable-reorder", 0, 0, 0}, 155 {NULL, 0, 0, 0} 156 }; 157 158 argvopt = argv; 159 160 while ((opt = getopt_long(argc, argvopt, "p:", 161 lgopts, &option_index)) != EOF) { 162 switch (opt) { 163 /* portmask */ 164 case 'p': 165 portmask = parse_portmask(optarg); 166 if (portmask == 0) { 167 printf("invalid portmask\n"); 168 print_usage(prgname); 169 return -1; 170 } 171 break; 172 /* long options */ 173 case 0: 174 if (!strcmp(lgopts[option_index].name, "disable-reorder")) { 175 printf("reorder disabled\n"); 176 disable_reorder = 1; 177 } 178 break; 179 default: 180 print_usage(prgname); 181 return -1; 182 } 183 } 184 if (optind <= 1) { 185 print_usage(prgname); 186 return -1; 187 } 188 189 argv[optind-1] = prgname; 190 optind = 1; /* reset getopt lib */ 191 return 0; 192 } 193 194 /* 195 * Tx buffer error callback 196 */ 197 static void 198 flush_tx_error_callback(struct rte_mbuf **unsent, uint16_t count, 199 void *userdata __rte_unused) { 200 201 /* free the mbufs which failed from transmit */ 202 app_stats.tx.ro_tx_failed_pkts += count; 203 RTE_LOG_DP(DEBUG, REORDERAPP, "%s:Packet loss with tx_burst\n", __func__); 204 pktmbuf_free_bulk(unsent, count); 205 206 } 207 208 static inline int 209 free_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[]) { 210 uint16_t port_id; 211 212 /* initialize buffers for all ports */ 213 RTE_ETH_FOREACH_DEV(port_id) { 214 /* skip ports that are not enabled */ 215 if ((portmask & (1 << port_id)) == 0) 216 continue; 217 218 rte_free(tx_buffer[port_id]); 219 } 220 return 0; 221 } 222 223 static inline int 224 configure_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[]) 225 { 226 uint16_t port_id; 227 int ret; 228 229 /* initialize buffers for all ports */ 230 RTE_ETH_FOREACH_DEV(port_id) { 231 /* skip ports that are not enabled */ 232 if ((portmask & (1 << port_id)) == 0) 233 continue; 234 235 /* Initialize TX buffers */ 236 tx_buffer[port_id] = rte_zmalloc_socket("tx_buffer", 237 RTE_ETH_TX_BUFFER_SIZE(MAX_PKTS_BURST), 0, 238 rte_eth_dev_socket_id(port_id)); 239 if (tx_buffer[port_id] == NULL) 240 rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n", 241 port_id); 242 243 rte_eth_tx_buffer_init(tx_buffer[port_id], MAX_PKTS_BURST); 244 245 ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[port_id], 246 flush_tx_error_callback, NULL); 247 if (ret < 0) 248 rte_exit(EXIT_FAILURE, 249 "Cannot set error callback for tx buffer on port %u\n", 250 port_id); 251 } 252 return 0; 253 } 254 255 static inline int 256 configure_eth_port(uint16_t port_id) 257 { 258 struct ether_addr addr; 259 const uint16_t rxRings = 1, txRings = 1; 260 int ret; 261 uint16_t q; 262 uint16_t nb_rxd = RX_DESC_PER_QUEUE; 263 uint16_t nb_txd = TX_DESC_PER_QUEUE; 264 struct rte_eth_dev_info dev_info; 265 struct rte_eth_txconf txconf; 266 struct rte_eth_conf port_conf = port_conf_default; 267 268 if (!rte_eth_dev_is_valid_port(port_id)) 269 return -1; 270 271 rte_eth_dev_info_get(port_id, &dev_info); 272 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 273 port_conf.txmode.offloads |= 274 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 275 ret = rte_eth_dev_configure(port_id, rxRings, txRings, &port_conf_default); 276 if (ret != 0) 277 return ret; 278 279 ret = rte_eth_dev_adjust_nb_rx_tx_desc(port_id, &nb_rxd, &nb_txd); 280 if (ret != 0) 281 return ret; 282 283 for (q = 0; q < rxRings; q++) { 284 ret = rte_eth_rx_queue_setup(port_id, q, nb_rxd, 285 rte_eth_dev_socket_id(port_id), NULL, 286 mbuf_pool); 287 if (ret < 0) 288 return ret; 289 } 290 291 txconf = dev_info.default_txconf; 292 txconf.offloads = port_conf.txmode.offloads; 293 for (q = 0; q < txRings; q++) { 294 ret = rte_eth_tx_queue_setup(port_id, q, nb_txd, 295 rte_eth_dev_socket_id(port_id), &txconf); 296 if (ret < 0) 297 return ret; 298 } 299 300 ret = rte_eth_dev_start(port_id); 301 if (ret < 0) 302 return ret; 303 304 rte_eth_macaddr_get(port_id, &addr); 305 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 306 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 307 port_id, 308 addr.addr_bytes[0], addr.addr_bytes[1], 309 addr.addr_bytes[2], addr.addr_bytes[3], 310 addr.addr_bytes[4], addr.addr_bytes[5]); 311 312 rte_eth_promiscuous_enable(port_id); 313 314 return 0; 315 } 316 317 static void 318 print_stats(void) 319 { 320 uint16_t i; 321 struct rte_eth_stats eth_stats; 322 323 printf("\nRX thread stats:\n"); 324 printf(" - Pkts rxd: %"PRIu64"\n", 325 app_stats.rx.rx_pkts); 326 printf(" - Pkts enqd to workers ring: %"PRIu64"\n", 327 app_stats.rx.enqueue_pkts); 328 329 printf("\nWorker thread stats:\n"); 330 printf(" - Pkts deqd from workers ring: %"PRIu64"\n", 331 app_stats.wkr.dequeue_pkts); 332 printf(" - Pkts enqd to tx ring: %"PRIu64"\n", 333 app_stats.wkr.enqueue_pkts); 334 printf(" - Pkts enq to tx failed: %"PRIu64"\n", 335 app_stats.wkr.enqueue_failed_pkts); 336 337 printf("\nTX stats:\n"); 338 printf(" - Pkts deqd from tx ring: %"PRIu64"\n", 339 app_stats.tx.dequeue_pkts); 340 printf(" - Ro Pkts transmitted: %"PRIu64"\n", 341 app_stats.tx.ro_tx_pkts); 342 printf(" - Ro Pkts tx failed: %"PRIu64"\n", 343 app_stats.tx.ro_tx_failed_pkts); 344 printf(" - Pkts transmitted w/o reorder: %"PRIu64"\n", 345 app_stats.tx.early_pkts_txtd_woro); 346 printf(" - Pkts tx failed w/o reorder: %"PRIu64"\n", 347 app_stats.tx.early_pkts_tx_failed_woro); 348 349 RTE_ETH_FOREACH_DEV(i) { 350 rte_eth_stats_get(i, ð_stats); 351 printf("\nPort %u stats:\n", i); 352 printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); 353 printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); 354 printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); 355 printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); 356 printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); 357 } 358 } 359 360 static void 361 int_handler(int sig_num) 362 { 363 printf("Exiting on signal %d\n", sig_num); 364 quit_signal = 1; 365 } 366 367 /** 368 * This thread receives mbufs from the port and affects them an internal 369 * sequence number to keep track of their order of arrival through an 370 * mbuf structure. 371 * The mbufs are then passed to the worker threads via the rx_to_workers 372 * ring. 373 */ 374 static int 375 rx_thread(struct rte_ring *ring_out) 376 { 377 uint32_t seqn = 0; 378 uint16_t i, ret = 0; 379 uint16_t nb_rx_pkts; 380 uint16_t port_id; 381 struct rte_mbuf *pkts[MAX_PKTS_BURST]; 382 383 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 384 rte_lcore_id()); 385 386 while (!quit_signal) { 387 388 RTE_ETH_FOREACH_DEV(port_id) { 389 if ((portmask & (1 << port_id)) != 0) { 390 391 /* receive packets */ 392 nb_rx_pkts = rte_eth_rx_burst(port_id, 0, 393 pkts, MAX_PKTS_BURST); 394 if (nb_rx_pkts == 0) { 395 RTE_LOG_DP(DEBUG, REORDERAPP, 396 "%s():Received zero packets\n", __func__); 397 continue; 398 } 399 app_stats.rx.rx_pkts += nb_rx_pkts; 400 401 /* mark sequence number */ 402 for (i = 0; i < nb_rx_pkts; ) 403 pkts[i++]->seqn = seqn++; 404 405 /* enqueue to rx_to_workers ring */ 406 ret = rte_ring_enqueue_burst(ring_out, 407 (void *)pkts, nb_rx_pkts, NULL); 408 app_stats.rx.enqueue_pkts += ret; 409 if (unlikely(ret < nb_rx_pkts)) { 410 app_stats.rx.enqueue_failed_pkts += 411 (nb_rx_pkts-ret); 412 pktmbuf_free_bulk(&pkts[ret], nb_rx_pkts - ret); 413 } 414 } 415 } 416 } 417 return 0; 418 } 419 420 /** 421 * This thread takes bursts of packets from the rx_to_workers ring and 422 * Changes the input port value to output port value. And feds it to 423 * workers_to_tx 424 */ 425 static int 426 worker_thread(void *args_ptr) 427 { 428 const uint16_t nb_ports = rte_eth_dev_count_avail(); 429 uint16_t i, ret = 0; 430 uint16_t burst_size = 0; 431 struct worker_thread_args *args; 432 struct rte_mbuf *burst_buffer[MAX_PKTS_BURST] = { NULL }; 433 struct rte_ring *ring_in, *ring_out; 434 const unsigned xor_val = (nb_ports > 1); 435 436 args = (struct worker_thread_args *) args_ptr; 437 ring_in = args->ring_in; 438 ring_out = args->ring_out; 439 440 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 441 rte_lcore_id()); 442 443 while (!quit_signal) { 444 445 /* dequeue the mbufs from rx_to_workers ring */ 446 burst_size = rte_ring_dequeue_burst(ring_in, 447 (void *)burst_buffer, MAX_PKTS_BURST, NULL); 448 if (unlikely(burst_size == 0)) 449 continue; 450 451 __sync_fetch_and_add(&app_stats.wkr.dequeue_pkts, burst_size); 452 453 /* just do some operation on mbuf */ 454 for (i = 0; i < burst_size;) 455 burst_buffer[i++]->port ^= xor_val; 456 457 /* enqueue the modified mbufs to workers_to_tx ring */ 458 ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, 459 burst_size, NULL); 460 __sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret); 461 if (unlikely(ret < burst_size)) { 462 /* Return the mbufs to their respective pool, dropping packets */ 463 __sync_fetch_and_add(&app_stats.wkr.enqueue_failed_pkts, 464 (int)burst_size - ret); 465 pktmbuf_free_bulk(&burst_buffer[ret], burst_size - ret); 466 } 467 } 468 return 0; 469 } 470 471 /** 472 * Dequeue mbufs from the workers_to_tx ring and reorder them before 473 * transmitting. 474 */ 475 static int 476 send_thread(struct send_thread_args *args) 477 { 478 int ret; 479 unsigned int i, dret; 480 uint16_t nb_dq_mbufs; 481 uint8_t outp; 482 unsigned sent; 483 struct rte_mbuf *mbufs[MAX_PKTS_BURST]; 484 struct rte_mbuf *rombufs[MAX_PKTS_BURST] = {NULL}; 485 static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS]; 486 487 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, rte_lcore_id()); 488 489 configure_tx_buffers(tx_buffer); 490 491 while (!quit_signal) { 492 493 /* deque the mbufs from workers_to_tx ring */ 494 nb_dq_mbufs = rte_ring_dequeue_burst(args->ring_in, 495 (void *)mbufs, MAX_PKTS_BURST, NULL); 496 497 if (unlikely(nb_dq_mbufs == 0)) 498 continue; 499 500 app_stats.tx.dequeue_pkts += nb_dq_mbufs; 501 502 for (i = 0; i < nb_dq_mbufs; i++) { 503 /* send dequeued mbufs for reordering */ 504 ret = rte_reorder_insert(args->buffer, mbufs[i]); 505 506 if (ret == -1 && rte_errno == ERANGE) { 507 /* Too early pkts should be transmitted out directly */ 508 RTE_LOG_DP(DEBUG, REORDERAPP, 509 "%s():Cannot reorder early packet " 510 "direct enqueuing to TX\n", __func__); 511 outp = mbufs[i]->port; 512 if ((portmask & (1 << outp)) == 0) { 513 rte_pktmbuf_free(mbufs[i]); 514 continue; 515 } 516 if (rte_eth_tx_burst(outp, 0, (void *)mbufs[i], 1) != 1) { 517 rte_pktmbuf_free(mbufs[i]); 518 app_stats.tx.early_pkts_tx_failed_woro++; 519 } else 520 app_stats.tx.early_pkts_txtd_woro++; 521 } else if (ret == -1 && rte_errno == ENOSPC) { 522 /** 523 * Early pkts just outside of window should be dropped 524 */ 525 rte_pktmbuf_free(mbufs[i]); 526 } 527 } 528 529 /* 530 * drain MAX_PKTS_BURST of reordered 531 * mbufs for transmit 532 */ 533 dret = rte_reorder_drain(args->buffer, rombufs, MAX_PKTS_BURST); 534 for (i = 0; i < dret; i++) { 535 536 struct rte_eth_dev_tx_buffer *outbuf; 537 uint8_t outp1; 538 539 outp1 = rombufs[i]->port; 540 /* skip ports that are not enabled */ 541 if ((portmask & (1 << outp1)) == 0) { 542 rte_pktmbuf_free(rombufs[i]); 543 continue; 544 } 545 546 outbuf = tx_buffer[outp1]; 547 sent = rte_eth_tx_buffer(outp1, 0, outbuf, rombufs[i]); 548 if (sent) 549 app_stats.tx.ro_tx_pkts += sent; 550 } 551 } 552 553 free_tx_buffers(tx_buffer); 554 555 return 0; 556 } 557 558 /** 559 * Dequeue mbufs from the workers_to_tx ring and transmit them 560 */ 561 static int 562 tx_thread(struct rte_ring *ring_in) 563 { 564 uint32_t i, dqnum; 565 uint8_t outp; 566 unsigned sent; 567 struct rte_mbuf *mbufs[MAX_PKTS_BURST]; 568 struct rte_eth_dev_tx_buffer *outbuf; 569 static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS]; 570 571 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 572 rte_lcore_id()); 573 574 configure_tx_buffers(tx_buffer); 575 576 while (!quit_signal) { 577 578 /* deque the mbufs from workers_to_tx ring */ 579 dqnum = rte_ring_dequeue_burst(ring_in, 580 (void *)mbufs, MAX_PKTS_BURST, NULL); 581 582 if (unlikely(dqnum == 0)) 583 continue; 584 585 app_stats.tx.dequeue_pkts += dqnum; 586 587 for (i = 0; i < dqnum; i++) { 588 outp = mbufs[i]->port; 589 /* skip ports that are not enabled */ 590 if ((portmask & (1 << outp)) == 0) { 591 rte_pktmbuf_free(mbufs[i]); 592 continue; 593 } 594 595 outbuf = tx_buffer[outp]; 596 sent = rte_eth_tx_buffer(outp, 0, outbuf, mbufs[i]); 597 if (sent) 598 app_stats.tx.ro_tx_pkts += sent; 599 } 600 } 601 602 return 0; 603 } 604 605 int 606 main(int argc, char **argv) 607 { 608 int ret; 609 unsigned nb_ports; 610 unsigned int lcore_id, last_lcore_id, master_lcore_id; 611 uint16_t port_id; 612 uint16_t nb_ports_available; 613 struct worker_thread_args worker_args = {NULL, NULL}; 614 struct send_thread_args send_args = {NULL, NULL}; 615 struct rte_ring *rx_to_workers; 616 struct rte_ring *workers_to_tx; 617 618 /* catch ctrl-c so we can print on exit */ 619 signal(SIGINT, int_handler); 620 621 /* Initialize EAL */ 622 ret = rte_eal_init(argc, argv); 623 if (ret < 0) 624 return -1; 625 626 argc -= ret; 627 argv += ret; 628 629 /* Parse the application specific arguments */ 630 ret = parse_args(argc, argv); 631 if (ret < 0) 632 return -1; 633 634 /* Check if we have enought cores */ 635 if (rte_lcore_count() < 3) 636 rte_exit(EXIT_FAILURE, "Error, This application needs at " 637 "least 3 logical cores to run:\n" 638 "1 lcore for packet RX\n" 639 "1 lcore for packet TX\n" 640 "and at least 1 lcore for worker threads\n"); 641 642 nb_ports = rte_eth_dev_count_avail(); 643 if (nb_ports == 0) 644 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 645 if (nb_ports != 1 && (nb_ports & 1)) 646 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 647 "when using a single port\n"); 648 649 mbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", MBUF_PER_POOL, 650 MBUF_POOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, 651 rte_socket_id()); 652 if (mbuf_pool == NULL) 653 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 654 655 nb_ports_available = nb_ports; 656 657 /* initialize all ports */ 658 RTE_ETH_FOREACH_DEV(port_id) { 659 /* skip ports that are not enabled */ 660 if ((portmask & (1 << port_id)) == 0) { 661 printf("\nSkipping disabled port %d\n", port_id); 662 nb_ports_available--; 663 continue; 664 } 665 /* init port */ 666 printf("Initializing port %u... done\n", port_id); 667 668 if (configure_eth_port(port_id) != 0) 669 rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n", 670 port_id); 671 } 672 673 if (!nb_ports_available) { 674 rte_exit(EXIT_FAILURE, 675 "All available ports are disabled. Please set portmask.\n"); 676 } 677 678 /* Create rings for inter core communication */ 679 rx_to_workers = rte_ring_create("rx_to_workers", RING_SIZE, rte_socket_id(), 680 RING_F_SP_ENQ); 681 if (rx_to_workers == NULL) 682 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 683 684 workers_to_tx = rte_ring_create("workers_to_tx", RING_SIZE, rte_socket_id(), 685 RING_F_SC_DEQ); 686 if (workers_to_tx == NULL) 687 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 688 689 if (!disable_reorder) { 690 send_args.buffer = rte_reorder_create("PKT_RO", rte_socket_id(), 691 REORDER_BUFFER_SIZE); 692 if (send_args.buffer == NULL) 693 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 694 } 695 696 last_lcore_id = get_last_lcore_id(); 697 master_lcore_id = rte_get_master_lcore(); 698 699 worker_args.ring_in = rx_to_workers; 700 worker_args.ring_out = workers_to_tx; 701 702 /* Start worker_thread() on all the available slave cores but the last 1 */ 703 for (lcore_id = 0; lcore_id <= get_previous_lcore_id(last_lcore_id); lcore_id++) 704 if (rte_lcore_is_enabled(lcore_id) && lcore_id != master_lcore_id) 705 rte_eal_remote_launch(worker_thread, (void *)&worker_args, 706 lcore_id); 707 708 if (disable_reorder) { 709 /* Start tx_thread() on the last slave core */ 710 rte_eal_remote_launch((lcore_function_t *)tx_thread, workers_to_tx, 711 last_lcore_id); 712 } else { 713 send_args.ring_in = workers_to_tx; 714 /* Start send_thread() on the last slave core */ 715 rte_eal_remote_launch((lcore_function_t *)send_thread, 716 (void *)&send_args, last_lcore_id); 717 } 718 719 /* Start rx_thread() on the master core */ 720 rx_thread(rx_to_workers); 721 722 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 723 if (rte_eal_wait_lcore(lcore_id) < 0) 724 return -1; 725 } 726 727 print_stats(); 728 return 0; 729 } 730