1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2016 Intel Corporation 3 */ 4 5 #include <signal.h> 6 #include <getopt.h> 7 8 #include <rte_eal.h> 9 #include <rte_common.h> 10 #include <rte_errno.h> 11 #include <rte_ethdev.h> 12 #include <rte_lcore.h> 13 #include <rte_malloc.h> 14 #include <rte_mbuf.h> 15 #include <rte_mempool.h> 16 #include <rte_ring.h> 17 #include <rte_reorder.h> 18 19 #define RX_DESC_PER_QUEUE 128 20 #define TX_DESC_PER_QUEUE 512 21 22 #define MAX_PKTS_BURST 32 23 #define REORDER_BUFFER_SIZE 8192 24 #define MBUF_PER_POOL 65535 25 #define MBUF_POOL_CACHE_SIZE 250 26 27 #define RING_SIZE 16384 28 29 /* Macros for printing using RTE_LOG */ 30 #define RTE_LOGTYPE_REORDERAPP RTE_LOGTYPE_USER1 31 32 unsigned int portmask; 33 unsigned int disable_reorder; 34 volatile uint8_t quit_signal; 35 36 static struct rte_mempool *mbuf_pool; 37 38 static struct rte_eth_conf port_conf_default; 39 40 struct worker_thread_args { 41 struct rte_ring *ring_in; 42 struct rte_ring *ring_out; 43 }; 44 45 struct send_thread_args { 46 struct rte_ring *ring_in; 47 struct rte_reorder_buffer *buffer; 48 }; 49 50 volatile struct app_stats { 51 struct { 52 uint64_t rx_pkts; 53 uint64_t enqueue_pkts; 54 uint64_t enqueue_failed_pkts; 55 } rx __rte_cache_aligned; 56 57 struct { 58 uint64_t dequeue_pkts; 59 uint64_t enqueue_pkts; 60 uint64_t enqueue_failed_pkts; 61 } wkr __rte_cache_aligned; 62 63 struct { 64 uint64_t dequeue_pkts; 65 /* Too early pkts transmitted directly w/o reordering */ 66 uint64_t early_pkts_txtd_woro; 67 /* Too early pkts failed from direct transmit */ 68 uint64_t early_pkts_tx_failed_woro; 69 uint64_t ro_tx_pkts; 70 uint64_t ro_tx_failed_pkts; 71 } tx __rte_cache_aligned; 72 } app_stats; 73 74 /** 75 * Get the last enabled lcore ID 76 * 77 * @return 78 * The last enabled lcore ID. 79 */ 80 static unsigned int 81 get_last_lcore_id(void) 82 { 83 int i; 84 85 for (i = RTE_MAX_LCORE - 1; i >= 0; i--) 86 if (rte_lcore_is_enabled(i)) 87 return i; 88 return 0; 89 } 90 91 /** 92 * Get the previous enabled lcore ID 93 * @param id 94 * The current lcore ID 95 * @return 96 * The previous enabled lcore ID or the current lcore 97 * ID if it is the first available core. 98 */ 99 static unsigned int 100 get_previous_lcore_id(unsigned int id) 101 { 102 int i; 103 104 for (i = id - 1; i >= 0; i--) 105 if (rte_lcore_is_enabled(i)) 106 return i; 107 return id; 108 } 109 110 static inline void 111 pktmbuf_free_bulk(struct rte_mbuf *mbuf_table[], unsigned n) 112 { 113 unsigned int i; 114 115 for (i = 0; i < n; i++) 116 rte_pktmbuf_free(mbuf_table[i]); 117 } 118 119 /* display usage */ 120 static void 121 print_usage(const char *prgname) 122 { 123 printf("%s [EAL options] -- -p PORTMASK\n" 124 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 125 prgname); 126 } 127 128 static int 129 parse_portmask(const char *portmask) 130 { 131 unsigned long pm; 132 char *end = NULL; 133 134 /* parse hexadecimal string */ 135 pm = strtoul(portmask, &end, 16); 136 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 137 return -1; 138 139 if (pm == 0) 140 return -1; 141 142 return pm; 143 } 144 145 /* Parse the argument given in the command line of the application */ 146 static int 147 parse_args(int argc, char **argv) 148 { 149 int opt; 150 int option_index; 151 char **argvopt; 152 char *prgname = argv[0]; 153 static struct option lgopts[] = { 154 {"disable-reorder", 0, 0, 0}, 155 {NULL, 0, 0, 0} 156 }; 157 158 argvopt = argv; 159 160 while ((opt = getopt_long(argc, argvopt, "p:", 161 lgopts, &option_index)) != EOF) { 162 switch (opt) { 163 /* portmask */ 164 case 'p': 165 portmask = parse_portmask(optarg); 166 if (portmask == 0) { 167 printf("invalid portmask\n"); 168 print_usage(prgname); 169 return -1; 170 } 171 break; 172 /* long options */ 173 case 0: 174 if (!strcmp(lgopts[option_index].name, "disable-reorder")) { 175 printf("reorder disabled\n"); 176 disable_reorder = 1; 177 } 178 break; 179 default: 180 print_usage(prgname); 181 return -1; 182 } 183 } 184 if (optind <= 1) { 185 print_usage(prgname); 186 return -1; 187 } 188 189 argv[optind-1] = prgname; 190 optind = 1; /* reset getopt lib */ 191 return 0; 192 } 193 194 /* 195 * Tx buffer error callback 196 */ 197 static void 198 flush_tx_error_callback(struct rte_mbuf **unsent, uint16_t count, 199 void *userdata __rte_unused) { 200 201 /* free the mbufs which failed from transmit */ 202 app_stats.tx.ro_tx_failed_pkts += count; 203 RTE_LOG_DP(DEBUG, REORDERAPP, "%s:Packet loss with tx_burst\n", __func__); 204 pktmbuf_free_bulk(unsent, count); 205 206 } 207 208 static inline int 209 free_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[]) { 210 const uint8_t nb_ports = rte_eth_dev_count(); 211 unsigned port_id; 212 213 /* initialize buffers for all ports */ 214 for (port_id = 0; port_id < nb_ports; port_id++) { 215 /* skip ports that are not enabled */ 216 if ((portmask & (1 << port_id)) == 0) 217 continue; 218 219 rte_free(tx_buffer[port_id]); 220 } 221 return 0; 222 } 223 224 static inline int 225 configure_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[]) 226 { 227 const uint8_t nb_ports = rte_eth_dev_count(); 228 unsigned port_id; 229 int ret; 230 231 /* initialize buffers for all ports */ 232 for (port_id = 0; port_id < nb_ports; port_id++) { 233 /* skip ports that are not enabled */ 234 if ((portmask & (1 << port_id)) == 0) 235 continue; 236 237 /* Initialize TX buffers */ 238 tx_buffer[port_id] = rte_zmalloc_socket("tx_buffer", 239 RTE_ETH_TX_BUFFER_SIZE(MAX_PKTS_BURST), 0, 240 rte_eth_dev_socket_id(port_id)); 241 if (tx_buffer[port_id] == NULL) 242 rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n", 243 port_id); 244 245 rte_eth_tx_buffer_init(tx_buffer[port_id], MAX_PKTS_BURST); 246 247 ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[port_id], 248 flush_tx_error_callback, NULL); 249 if (ret < 0) 250 rte_exit(EXIT_FAILURE, 251 "Cannot set error callback for tx buffer on port %u\n", 252 port_id); 253 } 254 return 0; 255 } 256 257 static inline int 258 configure_eth_port(uint16_t port_id) 259 { 260 struct ether_addr addr; 261 const uint16_t rxRings = 1, txRings = 1; 262 const uint8_t nb_ports = rte_eth_dev_count(); 263 int ret; 264 uint16_t q; 265 uint16_t nb_rxd = RX_DESC_PER_QUEUE; 266 uint16_t nb_txd = TX_DESC_PER_QUEUE; 267 268 if (port_id > nb_ports) 269 return -1; 270 271 ret = rte_eth_dev_configure(port_id, rxRings, txRings, &port_conf_default); 272 if (ret != 0) 273 return ret; 274 275 ret = rte_eth_dev_adjust_nb_rx_tx_desc(port_id, &nb_rxd, &nb_txd); 276 if (ret != 0) 277 return ret; 278 279 for (q = 0; q < rxRings; q++) { 280 ret = rte_eth_rx_queue_setup(port_id, q, nb_rxd, 281 rte_eth_dev_socket_id(port_id), NULL, 282 mbuf_pool); 283 if (ret < 0) 284 return ret; 285 } 286 287 for (q = 0; q < txRings; q++) { 288 ret = rte_eth_tx_queue_setup(port_id, q, nb_txd, 289 rte_eth_dev_socket_id(port_id), NULL); 290 if (ret < 0) 291 return ret; 292 } 293 294 ret = rte_eth_dev_start(port_id); 295 if (ret < 0) 296 return ret; 297 298 rte_eth_macaddr_get(port_id, &addr); 299 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 300 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 301 port_id, 302 addr.addr_bytes[0], addr.addr_bytes[1], 303 addr.addr_bytes[2], addr.addr_bytes[3], 304 addr.addr_bytes[4], addr.addr_bytes[5]); 305 306 rte_eth_promiscuous_enable(port_id); 307 308 return 0; 309 } 310 311 static void 312 print_stats(void) 313 { 314 const uint8_t nb_ports = rte_eth_dev_count(); 315 unsigned i; 316 struct rte_eth_stats eth_stats; 317 318 printf("\nRX thread stats:\n"); 319 printf(" - Pkts rxd: %"PRIu64"\n", 320 app_stats.rx.rx_pkts); 321 printf(" - Pkts enqd to workers ring: %"PRIu64"\n", 322 app_stats.rx.enqueue_pkts); 323 324 printf("\nWorker thread stats:\n"); 325 printf(" - Pkts deqd from workers ring: %"PRIu64"\n", 326 app_stats.wkr.dequeue_pkts); 327 printf(" - Pkts enqd to tx ring: %"PRIu64"\n", 328 app_stats.wkr.enqueue_pkts); 329 printf(" - Pkts enq to tx failed: %"PRIu64"\n", 330 app_stats.wkr.enqueue_failed_pkts); 331 332 printf("\nTX stats:\n"); 333 printf(" - Pkts deqd from tx ring: %"PRIu64"\n", 334 app_stats.tx.dequeue_pkts); 335 printf(" - Ro Pkts transmitted: %"PRIu64"\n", 336 app_stats.tx.ro_tx_pkts); 337 printf(" - Ro Pkts tx failed: %"PRIu64"\n", 338 app_stats.tx.ro_tx_failed_pkts); 339 printf(" - Pkts transmitted w/o reorder: %"PRIu64"\n", 340 app_stats.tx.early_pkts_txtd_woro); 341 printf(" - Pkts tx failed w/o reorder: %"PRIu64"\n", 342 app_stats.tx.early_pkts_tx_failed_woro); 343 344 for (i = 0; i < nb_ports; i++) { 345 rte_eth_stats_get(i, ð_stats); 346 printf("\nPort %u stats:\n", i); 347 printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); 348 printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); 349 printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); 350 printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); 351 printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); 352 } 353 } 354 355 static void 356 int_handler(int sig_num) 357 { 358 printf("Exiting on signal %d\n", sig_num); 359 quit_signal = 1; 360 } 361 362 /** 363 * This thread receives mbufs from the port and affects them an internal 364 * sequence number to keep track of their order of arrival through an 365 * mbuf structure. 366 * The mbufs are then passed to the worker threads via the rx_to_workers 367 * ring. 368 */ 369 static int 370 rx_thread(struct rte_ring *ring_out) 371 { 372 const uint8_t nb_ports = rte_eth_dev_count(); 373 uint32_t seqn = 0; 374 uint16_t i, ret = 0; 375 uint16_t nb_rx_pkts; 376 uint16_t port_id; 377 struct rte_mbuf *pkts[MAX_PKTS_BURST]; 378 379 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 380 rte_lcore_id()); 381 382 while (!quit_signal) { 383 384 for (port_id = 0; port_id < nb_ports; port_id++) { 385 if ((portmask & (1 << port_id)) != 0) { 386 387 /* receive packets */ 388 nb_rx_pkts = rte_eth_rx_burst(port_id, 0, 389 pkts, MAX_PKTS_BURST); 390 if (nb_rx_pkts == 0) { 391 RTE_LOG_DP(DEBUG, REORDERAPP, 392 "%s():Received zero packets\n", __func__); 393 continue; 394 } 395 app_stats.rx.rx_pkts += nb_rx_pkts; 396 397 /* mark sequence number */ 398 for (i = 0; i < nb_rx_pkts; ) 399 pkts[i++]->seqn = seqn++; 400 401 /* enqueue to rx_to_workers ring */ 402 ret = rte_ring_enqueue_burst(ring_out, 403 (void *)pkts, nb_rx_pkts, NULL); 404 app_stats.rx.enqueue_pkts += ret; 405 if (unlikely(ret < nb_rx_pkts)) { 406 app_stats.rx.enqueue_failed_pkts += 407 (nb_rx_pkts-ret); 408 pktmbuf_free_bulk(&pkts[ret], nb_rx_pkts - ret); 409 } 410 } 411 } 412 } 413 return 0; 414 } 415 416 /** 417 * This thread takes bursts of packets from the rx_to_workers ring and 418 * Changes the input port value to output port value. And feds it to 419 * workers_to_tx 420 */ 421 static int 422 worker_thread(void *args_ptr) 423 { 424 const uint8_t nb_ports = rte_eth_dev_count(); 425 uint16_t i, ret = 0; 426 uint16_t burst_size = 0; 427 struct worker_thread_args *args; 428 struct rte_mbuf *burst_buffer[MAX_PKTS_BURST] = { NULL }; 429 struct rte_ring *ring_in, *ring_out; 430 const unsigned xor_val = (nb_ports > 1); 431 432 args = (struct worker_thread_args *) args_ptr; 433 ring_in = args->ring_in; 434 ring_out = args->ring_out; 435 436 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 437 rte_lcore_id()); 438 439 while (!quit_signal) { 440 441 /* dequeue the mbufs from rx_to_workers ring */ 442 burst_size = rte_ring_dequeue_burst(ring_in, 443 (void *)burst_buffer, MAX_PKTS_BURST, NULL); 444 if (unlikely(burst_size == 0)) 445 continue; 446 447 __sync_fetch_and_add(&app_stats.wkr.dequeue_pkts, burst_size); 448 449 /* just do some operation on mbuf */ 450 for (i = 0; i < burst_size;) 451 burst_buffer[i++]->port ^= xor_val; 452 453 /* enqueue the modified mbufs to workers_to_tx ring */ 454 ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, 455 burst_size, NULL); 456 __sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret); 457 if (unlikely(ret < burst_size)) { 458 /* Return the mbufs to their respective pool, dropping packets */ 459 __sync_fetch_and_add(&app_stats.wkr.enqueue_failed_pkts, 460 (int)burst_size - ret); 461 pktmbuf_free_bulk(&burst_buffer[ret], burst_size - ret); 462 } 463 } 464 return 0; 465 } 466 467 /** 468 * Dequeue mbufs from the workers_to_tx ring and reorder them before 469 * transmitting. 470 */ 471 static int 472 send_thread(struct send_thread_args *args) 473 { 474 int ret; 475 unsigned int i, dret; 476 uint16_t nb_dq_mbufs; 477 uint8_t outp; 478 unsigned sent; 479 struct rte_mbuf *mbufs[MAX_PKTS_BURST]; 480 struct rte_mbuf *rombufs[MAX_PKTS_BURST] = {NULL}; 481 static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS]; 482 483 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, rte_lcore_id()); 484 485 configure_tx_buffers(tx_buffer); 486 487 while (!quit_signal) { 488 489 /* deque the mbufs from workers_to_tx ring */ 490 nb_dq_mbufs = rte_ring_dequeue_burst(args->ring_in, 491 (void *)mbufs, MAX_PKTS_BURST, NULL); 492 493 if (unlikely(nb_dq_mbufs == 0)) 494 continue; 495 496 app_stats.tx.dequeue_pkts += nb_dq_mbufs; 497 498 for (i = 0; i < nb_dq_mbufs; i++) { 499 /* send dequeued mbufs for reordering */ 500 ret = rte_reorder_insert(args->buffer, mbufs[i]); 501 502 if (ret == -1 && rte_errno == ERANGE) { 503 /* Too early pkts should be transmitted out directly */ 504 RTE_LOG_DP(DEBUG, REORDERAPP, 505 "%s():Cannot reorder early packet " 506 "direct enqueuing to TX\n", __func__); 507 outp = mbufs[i]->port; 508 if ((portmask & (1 << outp)) == 0) { 509 rte_pktmbuf_free(mbufs[i]); 510 continue; 511 } 512 if (rte_eth_tx_burst(outp, 0, (void *)mbufs[i], 1) != 1) { 513 rte_pktmbuf_free(mbufs[i]); 514 app_stats.tx.early_pkts_tx_failed_woro++; 515 } else 516 app_stats.tx.early_pkts_txtd_woro++; 517 } else if (ret == -1 && rte_errno == ENOSPC) { 518 /** 519 * Early pkts just outside of window should be dropped 520 */ 521 rte_pktmbuf_free(mbufs[i]); 522 } 523 } 524 525 /* 526 * drain MAX_PKTS_BURST of reordered 527 * mbufs for transmit 528 */ 529 dret = rte_reorder_drain(args->buffer, rombufs, MAX_PKTS_BURST); 530 for (i = 0; i < dret; i++) { 531 532 struct rte_eth_dev_tx_buffer *outbuf; 533 uint8_t outp1; 534 535 outp1 = rombufs[i]->port; 536 /* skip ports that are not enabled */ 537 if ((portmask & (1 << outp1)) == 0) { 538 rte_pktmbuf_free(rombufs[i]); 539 continue; 540 } 541 542 outbuf = tx_buffer[outp1]; 543 sent = rte_eth_tx_buffer(outp1, 0, outbuf, rombufs[i]); 544 if (sent) 545 app_stats.tx.ro_tx_pkts += sent; 546 } 547 } 548 549 free_tx_buffers(tx_buffer); 550 551 return 0; 552 } 553 554 /** 555 * Dequeue mbufs from the workers_to_tx ring and transmit them 556 */ 557 static int 558 tx_thread(struct rte_ring *ring_in) 559 { 560 uint32_t i, dqnum; 561 uint8_t outp; 562 unsigned sent; 563 struct rte_mbuf *mbufs[MAX_PKTS_BURST]; 564 struct rte_eth_dev_tx_buffer *outbuf; 565 static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS]; 566 567 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 568 rte_lcore_id()); 569 570 configure_tx_buffers(tx_buffer); 571 572 while (!quit_signal) { 573 574 /* deque the mbufs from workers_to_tx ring */ 575 dqnum = rte_ring_dequeue_burst(ring_in, 576 (void *)mbufs, MAX_PKTS_BURST, NULL); 577 578 if (unlikely(dqnum == 0)) 579 continue; 580 581 app_stats.tx.dequeue_pkts += dqnum; 582 583 for (i = 0; i < dqnum; i++) { 584 outp = mbufs[i]->port; 585 /* skip ports that are not enabled */ 586 if ((portmask & (1 << outp)) == 0) { 587 rte_pktmbuf_free(mbufs[i]); 588 continue; 589 } 590 591 outbuf = tx_buffer[outp]; 592 sent = rte_eth_tx_buffer(outp, 0, outbuf, mbufs[i]); 593 if (sent) 594 app_stats.tx.ro_tx_pkts += sent; 595 } 596 } 597 598 return 0; 599 } 600 601 int 602 main(int argc, char **argv) 603 { 604 int ret; 605 unsigned nb_ports; 606 unsigned int lcore_id, last_lcore_id, master_lcore_id; 607 uint16_t port_id; 608 uint16_t nb_ports_available; 609 struct worker_thread_args worker_args = {NULL, NULL}; 610 struct send_thread_args send_args = {NULL, NULL}; 611 struct rte_ring *rx_to_workers; 612 struct rte_ring *workers_to_tx; 613 614 /* catch ctrl-c so we can print on exit */ 615 signal(SIGINT, int_handler); 616 617 /* Initialize EAL */ 618 ret = rte_eal_init(argc, argv); 619 if (ret < 0) 620 return -1; 621 622 argc -= ret; 623 argv += ret; 624 625 /* Parse the application specific arguments */ 626 ret = parse_args(argc, argv); 627 if (ret < 0) 628 return -1; 629 630 /* Check if we have enought cores */ 631 if (rte_lcore_count() < 3) 632 rte_exit(EXIT_FAILURE, "Error, This application needs at " 633 "least 3 logical cores to run:\n" 634 "1 lcore for packet RX\n" 635 "1 lcore for packet TX\n" 636 "and at least 1 lcore for worker threads\n"); 637 638 nb_ports = rte_eth_dev_count(); 639 if (nb_ports == 0) 640 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 641 if (nb_ports != 1 && (nb_ports & 1)) 642 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 643 "when using a single port\n"); 644 645 mbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", MBUF_PER_POOL, 646 MBUF_POOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, 647 rte_socket_id()); 648 if (mbuf_pool == NULL) 649 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 650 651 nb_ports_available = nb_ports; 652 653 /* initialize all ports */ 654 for (port_id = 0; port_id < nb_ports; port_id++) { 655 /* skip ports that are not enabled */ 656 if ((portmask & (1 << port_id)) == 0) { 657 printf("\nSkipping disabled port %d\n", port_id); 658 nb_ports_available--; 659 continue; 660 } 661 /* init port */ 662 printf("Initializing port %u... done\n", port_id); 663 664 if (configure_eth_port(port_id) != 0) 665 rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n", 666 port_id); 667 } 668 669 if (!nb_ports_available) { 670 rte_exit(EXIT_FAILURE, 671 "All available ports are disabled. Please set portmask.\n"); 672 } 673 674 /* Create rings for inter core communication */ 675 rx_to_workers = rte_ring_create("rx_to_workers", RING_SIZE, rte_socket_id(), 676 RING_F_SP_ENQ); 677 if (rx_to_workers == NULL) 678 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 679 680 workers_to_tx = rte_ring_create("workers_to_tx", RING_SIZE, rte_socket_id(), 681 RING_F_SC_DEQ); 682 if (workers_to_tx == NULL) 683 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 684 685 if (!disable_reorder) { 686 send_args.buffer = rte_reorder_create("PKT_RO", rte_socket_id(), 687 REORDER_BUFFER_SIZE); 688 if (send_args.buffer == NULL) 689 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 690 } 691 692 last_lcore_id = get_last_lcore_id(); 693 master_lcore_id = rte_get_master_lcore(); 694 695 worker_args.ring_in = rx_to_workers; 696 worker_args.ring_out = workers_to_tx; 697 698 /* Start worker_thread() on all the available slave cores but the last 1 */ 699 for (lcore_id = 0; lcore_id <= get_previous_lcore_id(last_lcore_id); lcore_id++) 700 if (rte_lcore_is_enabled(lcore_id) && lcore_id != master_lcore_id) 701 rte_eal_remote_launch(worker_thread, (void *)&worker_args, 702 lcore_id); 703 704 if (disable_reorder) { 705 /* Start tx_thread() on the last slave core */ 706 rte_eal_remote_launch((lcore_function_t *)tx_thread, workers_to_tx, 707 last_lcore_id); 708 } else { 709 send_args.ring_in = workers_to_tx; 710 /* Start send_thread() on the last slave core */ 711 rte_eal_remote_launch((lcore_function_t *)send_thread, 712 (void *)&send_args, last_lcore_id); 713 } 714 715 /* Start rx_thread() on the master core */ 716 rx_thread(rx_to_workers); 717 718 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 719 if (rte_eal_wait_lcore(lcore_id) < 0) 720 return -1; 721 } 722 723 print_stats(); 724 return 0; 725 } 726