1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2016 Intel Corporation 3 */ 4 5 #include <signal.h> 6 #include <getopt.h> 7 8 #include <rte_eal.h> 9 #include <rte_common.h> 10 #include <rte_errno.h> 11 #include <rte_ethdev.h> 12 #include <rte_lcore.h> 13 #include <rte_malloc.h> 14 #include <rte_mbuf.h> 15 #include <rte_mempool.h> 16 #include <rte_ring.h> 17 #include <rte_reorder.h> 18 19 #define RX_DESC_PER_QUEUE 1024 20 #define TX_DESC_PER_QUEUE 1024 21 22 #define MAX_PKTS_BURST 32 23 #define REORDER_BUFFER_SIZE 8192 24 #define MBUF_PER_POOL 65535 25 #define MBUF_POOL_CACHE_SIZE 250 26 27 #define RING_SIZE 16384 28 29 /* Macros for printing using RTE_LOG */ 30 #define RTE_LOGTYPE_REORDERAPP RTE_LOGTYPE_USER1 31 32 unsigned int portmask; 33 unsigned int disable_reorder; 34 volatile uint8_t quit_signal; 35 36 static struct rte_mempool *mbuf_pool; 37 38 static struct rte_eth_conf port_conf_default = { 39 .rxmode = { 40 .ignore_offload_bitfield = 1, 41 }, 42 }; 43 44 struct worker_thread_args { 45 struct rte_ring *ring_in; 46 struct rte_ring *ring_out; 47 }; 48 49 struct send_thread_args { 50 struct rte_ring *ring_in; 51 struct rte_reorder_buffer *buffer; 52 }; 53 54 volatile struct app_stats { 55 struct { 56 uint64_t rx_pkts; 57 uint64_t enqueue_pkts; 58 uint64_t enqueue_failed_pkts; 59 } rx __rte_cache_aligned; 60 61 struct { 62 uint64_t dequeue_pkts; 63 uint64_t enqueue_pkts; 64 uint64_t enqueue_failed_pkts; 65 } wkr __rte_cache_aligned; 66 67 struct { 68 uint64_t dequeue_pkts; 69 /* Too early pkts transmitted directly w/o reordering */ 70 uint64_t early_pkts_txtd_woro; 71 /* Too early pkts failed from direct transmit */ 72 uint64_t early_pkts_tx_failed_woro; 73 uint64_t ro_tx_pkts; 74 uint64_t ro_tx_failed_pkts; 75 } tx __rte_cache_aligned; 76 } app_stats; 77 78 /** 79 * Get the last enabled lcore ID 80 * 81 * @return 82 * The last enabled lcore ID. 83 */ 84 static unsigned int 85 get_last_lcore_id(void) 86 { 87 int i; 88 89 for (i = RTE_MAX_LCORE - 1; i >= 0; i--) 90 if (rte_lcore_is_enabled(i)) 91 return i; 92 return 0; 93 } 94 95 /** 96 * Get the previous enabled lcore ID 97 * @param id 98 * The current lcore ID 99 * @return 100 * The previous enabled lcore ID or the current lcore 101 * ID if it is the first available core. 102 */ 103 static unsigned int 104 get_previous_lcore_id(unsigned int id) 105 { 106 int i; 107 108 for (i = id - 1; i >= 0; i--) 109 if (rte_lcore_is_enabled(i)) 110 return i; 111 return id; 112 } 113 114 static inline void 115 pktmbuf_free_bulk(struct rte_mbuf *mbuf_table[], unsigned n) 116 { 117 unsigned int i; 118 119 for (i = 0; i < n; i++) 120 rte_pktmbuf_free(mbuf_table[i]); 121 } 122 123 /* display usage */ 124 static void 125 print_usage(const char *prgname) 126 { 127 printf("%s [EAL options] -- -p PORTMASK\n" 128 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 129 prgname); 130 } 131 132 static int 133 parse_portmask(const char *portmask) 134 { 135 unsigned long pm; 136 char *end = NULL; 137 138 /* parse hexadecimal string */ 139 pm = strtoul(portmask, &end, 16); 140 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 141 return -1; 142 143 if (pm == 0) 144 return -1; 145 146 return pm; 147 } 148 149 /* Parse the argument given in the command line of the application */ 150 static int 151 parse_args(int argc, char **argv) 152 { 153 int opt; 154 int option_index; 155 char **argvopt; 156 char *prgname = argv[0]; 157 static struct option lgopts[] = { 158 {"disable-reorder", 0, 0, 0}, 159 {NULL, 0, 0, 0} 160 }; 161 162 argvopt = argv; 163 164 while ((opt = getopt_long(argc, argvopt, "p:", 165 lgopts, &option_index)) != EOF) { 166 switch (opt) { 167 /* portmask */ 168 case 'p': 169 portmask = parse_portmask(optarg); 170 if (portmask == 0) { 171 printf("invalid portmask\n"); 172 print_usage(prgname); 173 return -1; 174 } 175 break; 176 /* long options */ 177 case 0: 178 if (!strcmp(lgopts[option_index].name, "disable-reorder")) { 179 printf("reorder disabled\n"); 180 disable_reorder = 1; 181 } 182 break; 183 default: 184 print_usage(prgname); 185 return -1; 186 } 187 } 188 if (optind <= 1) { 189 print_usage(prgname); 190 return -1; 191 } 192 193 argv[optind-1] = prgname; 194 optind = 1; /* reset getopt lib */ 195 return 0; 196 } 197 198 /* 199 * Tx buffer error callback 200 */ 201 static void 202 flush_tx_error_callback(struct rte_mbuf **unsent, uint16_t count, 203 void *userdata __rte_unused) { 204 205 /* free the mbufs which failed from transmit */ 206 app_stats.tx.ro_tx_failed_pkts += count; 207 RTE_LOG_DP(DEBUG, REORDERAPP, "%s:Packet loss with tx_burst\n", __func__); 208 pktmbuf_free_bulk(unsent, count); 209 210 } 211 212 static inline int 213 free_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[]) { 214 uint16_t port_id; 215 216 /* initialize buffers for all ports */ 217 RTE_ETH_FOREACH_DEV(port_id) { 218 /* skip ports that are not enabled */ 219 if ((portmask & (1 << port_id)) == 0) 220 continue; 221 222 rte_free(tx_buffer[port_id]); 223 } 224 return 0; 225 } 226 227 static inline int 228 configure_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[]) 229 { 230 uint16_t port_id; 231 int ret; 232 233 /* initialize buffers for all ports */ 234 RTE_ETH_FOREACH_DEV(port_id) { 235 /* skip ports that are not enabled */ 236 if ((portmask & (1 << port_id)) == 0) 237 continue; 238 239 /* Initialize TX buffers */ 240 tx_buffer[port_id] = rte_zmalloc_socket("tx_buffer", 241 RTE_ETH_TX_BUFFER_SIZE(MAX_PKTS_BURST), 0, 242 rte_eth_dev_socket_id(port_id)); 243 if (tx_buffer[port_id] == NULL) 244 rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n", 245 port_id); 246 247 rte_eth_tx_buffer_init(tx_buffer[port_id], MAX_PKTS_BURST); 248 249 ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[port_id], 250 flush_tx_error_callback, NULL); 251 if (ret < 0) 252 rte_exit(EXIT_FAILURE, 253 "Cannot set error callback for tx buffer on port %u\n", 254 port_id); 255 } 256 return 0; 257 } 258 259 static inline int 260 configure_eth_port(uint16_t port_id) 261 { 262 struct ether_addr addr; 263 const uint16_t rxRings = 1, txRings = 1; 264 int ret; 265 uint16_t q; 266 uint16_t nb_rxd = RX_DESC_PER_QUEUE; 267 uint16_t nb_txd = TX_DESC_PER_QUEUE; 268 struct rte_eth_dev_info dev_info; 269 struct rte_eth_txconf txconf; 270 struct rte_eth_conf port_conf = port_conf_default; 271 272 if (!rte_eth_dev_is_valid_port(port_id)) 273 return -1; 274 275 rte_eth_dev_info_get(port_id, &dev_info); 276 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 277 port_conf.txmode.offloads |= 278 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 279 ret = rte_eth_dev_configure(port_id, rxRings, txRings, &port_conf_default); 280 if (ret != 0) 281 return ret; 282 283 ret = rte_eth_dev_adjust_nb_rx_tx_desc(port_id, &nb_rxd, &nb_txd); 284 if (ret != 0) 285 return ret; 286 287 for (q = 0; q < rxRings; q++) { 288 ret = rte_eth_rx_queue_setup(port_id, q, nb_rxd, 289 rte_eth_dev_socket_id(port_id), NULL, 290 mbuf_pool); 291 if (ret < 0) 292 return ret; 293 } 294 295 txconf = dev_info.default_txconf; 296 txconf.txq_flags = ETH_TXQ_FLAGS_IGNORE; 297 txconf.offloads = port_conf.txmode.offloads; 298 for (q = 0; q < txRings; q++) { 299 ret = rte_eth_tx_queue_setup(port_id, q, nb_txd, 300 rte_eth_dev_socket_id(port_id), &txconf); 301 if (ret < 0) 302 return ret; 303 } 304 305 ret = rte_eth_dev_start(port_id); 306 if (ret < 0) 307 return ret; 308 309 rte_eth_macaddr_get(port_id, &addr); 310 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 311 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 312 port_id, 313 addr.addr_bytes[0], addr.addr_bytes[1], 314 addr.addr_bytes[2], addr.addr_bytes[3], 315 addr.addr_bytes[4], addr.addr_bytes[5]); 316 317 rte_eth_promiscuous_enable(port_id); 318 319 return 0; 320 } 321 322 static void 323 print_stats(void) 324 { 325 uint16_t i; 326 struct rte_eth_stats eth_stats; 327 328 printf("\nRX thread stats:\n"); 329 printf(" - Pkts rxd: %"PRIu64"\n", 330 app_stats.rx.rx_pkts); 331 printf(" - Pkts enqd to workers ring: %"PRIu64"\n", 332 app_stats.rx.enqueue_pkts); 333 334 printf("\nWorker thread stats:\n"); 335 printf(" - Pkts deqd from workers ring: %"PRIu64"\n", 336 app_stats.wkr.dequeue_pkts); 337 printf(" - Pkts enqd to tx ring: %"PRIu64"\n", 338 app_stats.wkr.enqueue_pkts); 339 printf(" - Pkts enq to tx failed: %"PRIu64"\n", 340 app_stats.wkr.enqueue_failed_pkts); 341 342 printf("\nTX stats:\n"); 343 printf(" - Pkts deqd from tx ring: %"PRIu64"\n", 344 app_stats.tx.dequeue_pkts); 345 printf(" - Ro Pkts transmitted: %"PRIu64"\n", 346 app_stats.tx.ro_tx_pkts); 347 printf(" - Ro Pkts tx failed: %"PRIu64"\n", 348 app_stats.tx.ro_tx_failed_pkts); 349 printf(" - Pkts transmitted w/o reorder: %"PRIu64"\n", 350 app_stats.tx.early_pkts_txtd_woro); 351 printf(" - Pkts tx failed w/o reorder: %"PRIu64"\n", 352 app_stats.tx.early_pkts_tx_failed_woro); 353 354 RTE_ETH_FOREACH_DEV(i) { 355 rte_eth_stats_get(i, ð_stats); 356 printf("\nPort %u stats:\n", i); 357 printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); 358 printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); 359 printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); 360 printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); 361 printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); 362 } 363 } 364 365 static void 366 int_handler(int sig_num) 367 { 368 printf("Exiting on signal %d\n", sig_num); 369 quit_signal = 1; 370 } 371 372 /** 373 * This thread receives mbufs from the port and affects them an internal 374 * sequence number to keep track of their order of arrival through an 375 * mbuf structure. 376 * The mbufs are then passed to the worker threads via the rx_to_workers 377 * ring. 378 */ 379 static int 380 rx_thread(struct rte_ring *ring_out) 381 { 382 uint32_t seqn = 0; 383 uint16_t i, ret = 0; 384 uint16_t nb_rx_pkts; 385 uint16_t port_id; 386 struct rte_mbuf *pkts[MAX_PKTS_BURST]; 387 388 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 389 rte_lcore_id()); 390 391 while (!quit_signal) { 392 393 RTE_ETH_FOREACH_DEV(port_id) { 394 if ((portmask & (1 << port_id)) != 0) { 395 396 /* receive packets */ 397 nb_rx_pkts = rte_eth_rx_burst(port_id, 0, 398 pkts, MAX_PKTS_BURST); 399 if (nb_rx_pkts == 0) { 400 RTE_LOG_DP(DEBUG, REORDERAPP, 401 "%s():Received zero packets\n", __func__); 402 continue; 403 } 404 app_stats.rx.rx_pkts += nb_rx_pkts; 405 406 /* mark sequence number */ 407 for (i = 0; i < nb_rx_pkts; ) 408 pkts[i++]->seqn = seqn++; 409 410 /* enqueue to rx_to_workers ring */ 411 ret = rte_ring_enqueue_burst(ring_out, 412 (void *)pkts, nb_rx_pkts, NULL); 413 app_stats.rx.enqueue_pkts += ret; 414 if (unlikely(ret < nb_rx_pkts)) { 415 app_stats.rx.enqueue_failed_pkts += 416 (nb_rx_pkts-ret); 417 pktmbuf_free_bulk(&pkts[ret], nb_rx_pkts - ret); 418 } 419 } 420 } 421 } 422 return 0; 423 } 424 425 /** 426 * This thread takes bursts of packets from the rx_to_workers ring and 427 * Changes the input port value to output port value. And feds it to 428 * workers_to_tx 429 */ 430 static int 431 worker_thread(void *args_ptr) 432 { 433 const uint16_t nb_ports = rte_eth_dev_count_avail(); 434 uint16_t i, ret = 0; 435 uint16_t burst_size = 0; 436 struct worker_thread_args *args; 437 struct rte_mbuf *burst_buffer[MAX_PKTS_BURST] = { NULL }; 438 struct rte_ring *ring_in, *ring_out; 439 const unsigned xor_val = (nb_ports > 1); 440 441 args = (struct worker_thread_args *) args_ptr; 442 ring_in = args->ring_in; 443 ring_out = args->ring_out; 444 445 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 446 rte_lcore_id()); 447 448 while (!quit_signal) { 449 450 /* dequeue the mbufs from rx_to_workers ring */ 451 burst_size = rte_ring_dequeue_burst(ring_in, 452 (void *)burst_buffer, MAX_PKTS_BURST, NULL); 453 if (unlikely(burst_size == 0)) 454 continue; 455 456 __sync_fetch_and_add(&app_stats.wkr.dequeue_pkts, burst_size); 457 458 /* just do some operation on mbuf */ 459 for (i = 0; i < burst_size;) 460 burst_buffer[i++]->port ^= xor_val; 461 462 /* enqueue the modified mbufs to workers_to_tx ring */ 463 ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, 464 burst_size, NULL); 465 __sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret); 466 if (unlikely(ret < burst_size)) { 467 /* Return the mbufs to their respective pool, dropping packets */ 468 __sync_fetch_and_add(&app_stats.wkr.enqueue_failed_pkts, 469 (int)burst_size - ret); 470 pktmbuf_free_bulk(&burst_buffer[ret], burst_size - ret); 471 } 472 } 473 return 0; 474 } 475 476 /** 477 * Dequeue mbufs from the workers_to_tx ring and reorder them before 478 * transmitting. 479 */ 480 static int 481 send_thread(struct send_thread_args *args) 482 { 483 int ret; 484 unsigned int i, dret; 485 uint16_t nb_dq_mbufs; 486 uint8_t outp; 487 unsigned sent; 488 struct rte_mbuf *mbufs[MAX_PKTS_BURST]; 489 struct rte_mbuf *rombufs[MAX_PKTS_BURST] = {NULL}; 490 static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS]; 491 492 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, rte_lcore_id()); 493 494 configure_tx_buffers(tx_buffer); 495 496 while (!quit_signal) { 497 498 /* deque the mbufs from workers_to_tx ring */ 499 nb_dq_mbufs = rte_ring_dequeue_burst(args->ring_in, 500 (void *)mbufs, MAX_PKTS_BURST, NULL); 501 502 if (unlikely(nb_dq_mbufs == 0)) 503 continue; 504 505 app_stats.tx.dequeue_pkts += nb_dq_mbufs; 506 507 for (i = 0; i < nb_dq_mbufs; i++) { 508 /* send dequeued mbufs for reordering */ 509 ret = rte_reorder_insert(args->buffer, mbufs[i]); 510 511 if (ret == -1 && rte_errno == ERANGE) { 512 /* Too early pkts should be transmitted out directly */ 513 RTE_LOG_DP(DEBUG, REORDERAPP, 514 "%s():Cannot reorder early packet " 515 "direct enqueuing to TX\n", __func__); 516 outp = mbufs[i]->port; 517 if ((portmask & (1 << outp)) == 0) { 518 rte_pktmbuf_free(mbufs[i]); 519 continue; 520 } 521 if (rte_eth_tx_burst(outp, 0, (void *)mbufs[i], 1) != 1) { 522 rte_pktmbuf_free(mbufs[i]); 523 app_stats.tx.early_pkts_tx_failed_woro++; 524 } else 525 app_stats.tx.early_pkts_txtd_woro++; 526 } else if (ret == -1 && rte_errno == ENOSPC) { 527 /** 528 * Early pkts just outside of window should be dropped 529 */ 530 rte_pktmbuf_free(mbufs[i]); 531 } 532 } 533 534 /* 535 * drain MAX_PKTS_BURST of reordered 536 * mbufs for transmit 537 */ 538 dret = rte_reorder_drain(args->buffer, rombufs, MAX_PKTS_BURST); 539 for (i = 0; i < dret; i++) { 540 541 struct rte_eth_dev_tx_buffer *outbuf; 542 uint8_t outp1; 543 544 outp1 = rombufs[i]->port; 545 /* skip ports that are not enabled */ 546 if ((portmask & (1 << outp1)) == 0) { 547 rte_pktmbuf_free(rombufs[i]); 548 continue; 549 } 550 551 outbuf = tx_buffer[outp1]; 552 sent = rte_eth_tx_buffer(outp1, 0, outbuf, rombufs[i]); 553 if (sent) 554 app_stats.tx.ro_tx_pkts += sent; 555 } 556 } 557 558 free_tx_buffers(tx_buffer); 559 560 return 0; 561 } 562 563 /** 564 * Dequeue mbufs from the workers_to_tx ring and transmit them 565 */ 566 static int 567 tx_thread(struct rte_ring *ring_in) 568 { 569 uint32_t i, dqnum; 570 uint8_t outp; 571 unsigned sent; 572 struct rte_mbuf *mbufs[MAX_PKTS_BURST]; 573 struct rte_eth_dev_tx_buffer *outbuf; 574 static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS]; 575 576 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 577 rte_lcore_id()); 578 579 configure_tx_buffers(tx_buffer); 580 581 while (!quit_signal) { 582 583 /* deque the mbufs from workers_to_tx ring */ 584 dqnum = rte_ring_dequeue_burst(ring_in, 585 (void *)mbufs, MAX_PKTS_BURST, NULL); 586 587 if (unlikely(dqnum == 0)) 588 continue; 589 590 app_stats.tx.dequeue_pkts += dqnum; 591 592 for (i = 0; i < dqnum; i++) { 593 outp = mbufs[i]->port; 594 /* skip ports that are not enabled */ 595 if ((portmask & (1 << outp)) == 0) { 596 rte_pktmbuf_free(mbufs[i]); 597 continue; 598 } 599 600 outbuf = tx_buffer[outp]; 601 sent = rte_eth_tx_buffer(outp, 0, outbuf, mbufs[i]); 602 if (sent) 603 app_stats.tx.ro_tx_pkts += sent; 604 } 605 } 606 607 return 0; 608 } 609 610 int 611 main(int argc, char **argv) 612 { 613 int ret; 614 unsigned nb_ports; 615 unsigned int lcore_id, last_lcore_id, master_lcore_id; 616 uint16_t port_id; 617 uint16_t nb_ports_available; 618 struct worker_thread_args worker_args = {NULL, NULL}; 619 struct send_thread_args send_args = {NULL, NULL}; 620 struct rte_ring *rx_to_workers; 621 struct rte_ring *workers_to_tx; 622 623 /* catch ctrl-c so we can print on exit */ 624 signal(SIGINT, int_handler); 625 626 /* Initialize EAL */ 627 ret = rte_eal_init(argc, argv); 628 if (ret < 0) 629 return -1; 630 631 argc -= ret; 632 argv += ret; 633 634 /* Parse the application specific arguments */ 635 ret = parse_args(argc, argv); 636 if (ret < 0) 637 return -1; 638 639 /* Check if we have enought cores */ 640 if (rte_lcore_count() < 3) 641 rte_exit(EXIT_FAILURE, "Error, This application needs at " 642 "least 3 logical cores to run:\n" 643 "1 lcore for packet RX\n" 644 "1 lcore for packet TX\n" 645 "and at least 1 lcore for worker threads\n"); 646 647 nb_ports = rte_eth_dev_count_avail(); 648 if (nb_ports == 0) 649 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 650 if (nb_ports != 1 && (nb_ports & 1)) 651 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 652 "when using a single port\n"); 653 654 mbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", MBUF_PER_POOL, 655 MBUF_POOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, 656 rte_socket_id()); 657 if (mbuf_pool == NULL) 658 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 659 660 nb_ports_available = nb_ports; 661 662 /* initialize all ports */ 663 RTE_ETH_FOREACH_DEV(port_id) { 664 /* skip ports that are not enabled */ 665 if ((portmask & (1 << port_id)) == 0) { 666 printf("\nSkipping disabled port %d\n", port_id); 667 nb_ports_available--; 668 continue; 669 } 670 /* init port */ 671 printf("Initializing port %u... done\n", port_id); 672 673 if (configure_eth_port(port_id) != 0) 674 rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n", 675 port_id); 676 } 677 678 if (!nb_ports_available) { 679 rte_exit(EXIT_FAILURE, 680 "All available ports are disabled. Please set portmask.\n"); 681 } 682 683 /* Create rings for inter core communication */ 684 rx_to_workers = rte_ring_create("rx_to_workers", RING_SIZE, rte_socket_id(), 685 RING_F_SP_ENQ); 686 if (rx_to_workers == NULL) 687 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 688 689 workers_to_tx = rte_ring_create("workers_to_tx", RING_SIZE, rte_socket_id(), 690 RING_F_SC_DEQ); 691 if (workers_to_tx == NULL) 692 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 693 694 if (!disable_reorder) { 695 send_args.buffer = rte_reorder_create("PKT_RO", rte_socket_id(), 696 REORDER_BUFFER_SIZE); 697 if (send_args.buffer == NULL) 698 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 699 } 700 701 last_lcore_id = get_last_lcore_id(); 702 master_lcore_id = rte_get_master_lcore(); 703 704 worker_args.ring_in = rx_to_workers; 705 worker_args.ring_out = workers_to_tx; 706 707 /* Start worker_thread() on all the available slave cores but the last 1 */ 708 for (lcore_id = 0; lcore_id <= get_previous_lcore_id(last_lcore_id); lcore_id++) 709 if (rte_lcore_is_enabled(lcore_id) && lcore_id != master_lcore_id) 710 rte_eal_remote_launch(worker_thread, (void *)&worker_args, 711 lcore_id); 712 713 if (disable_reorder) { 714 /* Start tx_thread() on the last slave core */ 715 rte_eal_remote_launch((lcore_function_t *)tx_thread, workers_to_tx, 716 last_lcore_id); 717 } else { 718 send_args.ring_in = workers_to_tx; 719 /* Start send_thread() on the last slave core */ 720 rte_eal_remote_launch((lcore_function_t *)send_thread, 721 (void *)&send_args, last_lcore_id); 722 } 723 724 /* Start rx_thread() on the master core */ 725 rx_thread(rx_to_workers); 726 727 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 728 if (rte_eal_wait_lcore(lcore_id) < 0) 729 return -1; 730 } 731 732 print_stats(); 733 return 0; 734 } 735