1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2016 Intel Corporation 3 */ 4 5 #include <signal.h> 6 #include <getopt.h> 7 8 #include <rte_eal.h> 9 #include <rte_common.h> 10 #include <rte_errno.h> 11 #include <rte_ethdev.h> 12 #include <rte_lcore.h> 13 #include <rte_malloc.h> 14 #include <rte_mbuf.h> 15 #include <rte_mempool.h> 16 #include <rte_ring.h> 17 #include <rte_reorder.h> 18 19 #define RX_DESC_PER_QUEUE 128 20 #define TX_DESC_PER_QUEUE 512 21 22 #define MAX_PKTS_BURST 32 23 #define REORDER_BUFFER_SIZE 8192 24 #define MBUF_PER_POOL 65535 25 #define MBUF_POOL_CACHE_SIZE 250 26 27 #define RING_SIZE 16384 28 29 /* Macros for printing using RTE_LOG */ 30 #define RTE_LOGTYPE_REORDERAPP RTE_LOGTYPE_USER1 31 32 unsigned int portmask; 33 unsigned int disable_reorder; 34 volatile uint8_t quit_signal; 35 36 static struct rte_mempool *mbuf_pool; 37 38 static struct rte_eth_conf port_conf_default = { 39 .rxmode = { 40 .ignore_offload_bitfield = 1, 41 }, 42 }; 43 44 struct worker_thread_args { 45 struct rte_ring *ring_in; 46 struct rte_ring *ring_out; 47 }; 48 49 struct send_thread_args { 50 struct rte_ring *ring_in; 51 struct rte_reorder_buffer *buffer; 52 }; 53 54 volatile struct app_stats { 55 struct { 56 uint64_t rx_pkts; 57 uint64_t enqueue_pkts; 58 uint64_t enqueue_failed_pkts; 59 } rx __rte_cache_aligned; 60 61 struct { 62 uint64_t dequeue_pkts; 63 uint64_t enqueue_pkts; 64 uint64_t enqueue_failed_pkts; 65 } wkr __rte_cache_aligned; 66 67 struct { 68 uint64_t dequeue_pkts; 69 /* Too early pkts transmitted directly w/o reordering */ 70 uint64_t early_pkts_txtd_woro; 71 /* Too early pkts failed from direct transmit */ 72 uint64_t early_pkts_tx_failed_woro; 73 uint64_t ro_tx_pkts; 74 uint64_t ro_tx_failed_pkts; 75 } tx __rte_cache_aligned; 76 } app_stats; 77 78 /** 79 * Get the last enabled lcore ID 80 * 81 * @return 82 * The last enabled lcore ID. 83 */ 84 static unsigned int 85 get_last_lcore_id(void) 86 { 87 int i; 88 89 for (i = RTE_MAX_LCORE - 1; i >= 0; i--) 90 if (rte_lcore_is_enabled(i)) 91 return i; 92 return 0; 93 } 94 95 /** 96 * Get the previous enabled lcore ID 97 * @param id 98 * The current lcore ID 99 * @return 100 * The previous enabled lcore ID or the current lcore 101 * ID if it is the first available core. 102 */ 103 static unsigned int 104 get_previous_lcore_id(unsigned int id) 105 { 106 int i; 107 108 for (i = id - 1; i >= 0; i--) 109 if (rte_lcore_is_enabled(i)) 110 return i; 111 return id; 112 } 113 114 static inline void 115 pktmbuf_free_bulk(struct rte_mbuf *mbuf_table[], unsigned n) 116 { 117 unsigned int i; 118 119 for (i = 0; i < n; i++) 120 rte_pktmbuf_free(mbuf_table[i]); 121 } 122 123 /* display usage */ 124 static void 125 print_usage(const char *prgname) 126 { 127 printf("%s [EAL options] -- -p PORTMASK\n" 128 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 129 prgname); 130 } 131 132 static int 133 parse_portmask(const char *portmask) 134 { 135 unsigned long pm; 136 char *end = NULL; 137 138 /* parse hexadecimal string */ 139 pm = strtoul(portmask, &end, 16); 140 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 141 return -1; 142 143 if (pm == 0) 144 return -1; 145 146 return pm; 147 } 148 149 /* Parse the argument given in the command line of the application */ 150 static int 151 parse_args(int argc, char **argv) 152 { 153 int opt; 154 int option_index; 155 char **argvopt; 156 char *prgname = argv[0]; 157 static struct option lgopts[] = { 158 {"disable-reorder", 0, 0, 0}, 159 {NULL, 0, 0, 0} 160 }; 161 162 argvopt = argv; 163 164 while ((opt = getopt_long(argc, argvopt, "p:", 165 lgopts, &option_index)) != EOF) { 166 switch (opt) { 167 /* portmask */ 168 case 'p': 169 portmask = parse_portmask(optarg); 170 if (portmask == 0) { 171 printf("invalid portmask\n"); 172 print_usage(prgname); 173 return -1; 174 } 175 break; 176 /* long options */ 177 case 0: 178 if (!strcmp(lgopts[option_index].name, "disable-reorder")) { 179 printf("reorder disabled\n"); 180 disable_reorder = 1; 181 } 182 break; 183 default: 184 print_usage(prgname); 185 return -1; 186 } 187 } 188 if (optind <= 1) { 189 print_usage(prgname); 190 return -1; 191 } 192 193 argv[optind-1] = prgname; 194 optind = 1; /* reset getopt lib */ 195 return 0; 196 } 197 198 /* 199 * Tx buffer error callback 200 */ 201 static void 202 flush_tx_error_callback(struct rte_mbuf **unsent, uint16_t count, 203 void *userdata __rte_unused) { 204 205 /* free the mbufs which failed from transmit */ 206 app_stats.tx.ro_tx_failed_pkts += count; 207 RTE_LOG_DP(DEBUG, REORDERAPP, "%s:Packet loss with tx_burst\n", __func__); 208 pktmbuf_free_bulk(unsent, count); 209 210 } 211 212 static inline int 213 free_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[]) { 214 const uint8_t nb_ports = rte_eth_dev_count(); 215 unsigned port_id; 216 217 /* initialize buffers for all ports */ 218 for (port_id = 0; port_id < nb_ports; port_id++) { 219 /* skip ports that are not enabled */ 220 if ((portmask & (1 << port_id)) == 0) 221 continue; 222 223 rte_free(tx_buffer[port_id]); 224 } 225 return 0; 226 } 227 228 static inline int 229 configure_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[]) 230 { 231 const uint8_t nb_ports = rte_eth_dev_count(); 232 unsigned port_id; 233 int ret; 234 235 /* initialize buffers for all ports */ 236 for (port_id = 0; port_id < nb_ports; port_id++) { 237 /* skip ports that are not enabled */ 238 if ((portmask & (1 << port_id)) == 0) 239 continue; 240 241 /* Initialize TX buffers */ 242 tx_buffer[port_id] = rte_zmalloc_socket("tx_buffer", 243 RTE_ETH_TX_BUFFER_SIZE(MAX_PKTS_BURST), 0, 244 rte_eth_dev_socket_id(port_id)); 245 if (tx_buffer[port_id] == NULL) 246 rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n", 247 port_id); 248 249 rte_eth_tx_buffer_init(tx_buffer[port_id], MAX_PKTS_BURST); 250 251 ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[port_id], 252 flush_tx_error_callback, NULL); 253 if (ret < 0) 254 rte_exit(EXIT_FAILURE, 255 "Cannot set error callback for tx buffer on port %u\n", 256 port_id); 257 } 258 return 0; 259 } 260 261 static inline int 262 configure_eth_port(uint16_t port_id) 263 { 264 struct ether_addr addr; 265 const uint16_t rxRings = 1, txRings = 1; 266 const uint8_t nb_ports = rte_eth_dev_count(); 267 int ret; 268 uint16_t q; 269 uint16_t nb_rxd = RX_DESC_PER_QUEUE; 270 uint16_t nb_txd = TX_DESC_PER_QUEUE; 271 struct rte_eth_dev_info dev_info; 272 struct rte_eth_txconf txconf; 273 struct rte_eth_conf port_conf = port_conf_default; 274 275 if (port_id > nb_ports) 276 return -1; 277 278 rte_eth_dev_info_get(port_id, &dev_info); 279 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 280 port_conf.txmode.offloads |= 281 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 282 ret = rte_eth_dev_configure(port_id, rxRings, txRings, &port_conf_default); 283 if (ret != 0) 284 return ret; 285 286 ret = rte_eth_dev_adjust_nb_rx_tx_desc(port_id, &nb_rxd, &nb_txd); 287 if (ret != 0) 288 return ret; 289 290 for (q = 0; q < rxRings; q++) { 291 ret = rte_eth_rx_queue_setup(port_id, q, nb_rxd, 292 rte_eth_dev_socket_id(port_id), NULL, 293 mbuf_pool); 294 if (ret < 0) 295 return ret; 296 } 297 298 txconf = dev_info.default_txconf; 299 txconf.txq_flags = ETH_TXQ_FLAGS_IGNORE; 300 txconf.offloads = port_conf.txmode.offloads; 301 for (q = 0; q < txRings; q++) { 302 ret = rte_eth_tx_queue_setup(port_id, q, nb_txd, 303 rte_eth_dev_socket_id(port_id), &txconf); 304 if (ret < 0) 305 return ret; 306 } 307 308 ret = rte_eth_dev_start(port_id); 309 if (ret < 0) 310 return ret; 311 312 rte_eth_macaddr_get(port_id, &addr); 313 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 314 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 315 port_id, 316 addr.addr_bytes[0], addr.addr_bytes[1], 317 addr.addr_bytes[2], addr.addr_bytes[3], 318 addr.addr_bytes[4], addr.addr_bytes[5]); 319 320 rte_eth_promiscuous_enable(port_id); 321 322 return 0; 323 } 324 325 static void 326 print_stats(void) 327 { 328 const uint8_t nb_ports = rte_eth_dev_count(); 329 unsigned i; 330 struct rte_eth_stats eth_stats; 331 332 printf("\nRX thread stats:\n"); 333 printf(" - Pkts rxd: %"PRIu64"\n", 334 app_stats.rx.rx_pkts); 335 printf(" - Pkts enqd to workers ring: %"PRIu64"\n", 336 app_stats.rx.enqueue_pkts); 337 338 printf("\nWorker thread stats:\n"); 339 printf(" - Pkts deqd from workers ring: %"PRIu64"\n", 340 app_stats.wkr.dequeue_pkts); 341 printf(" - Pkts enqd to tx ring: %"PRIu64"\n", 342 app_stats.wkr.enqueue_pkts); 343 printf(" - Pkts enq to tx failed: %"PRIu64"\n", 344 app_stats.wkr.enqueue_failed_pkts); 345 346 printf("\nTX stats:\n"); 347 printf(" - Pkts deqd from tx ring: %"PRIu64"\n", 348 app_stats.tx.dequeue_pkts); 349 printf(" - Ro Pkts transmitted: %"PRIu64"\n", 350 app_stats.tx.ro_tx_pkts); 351 printf(" - Ro Pkts tx failed: %"PRIu64"\n", 352 app_stats.tx.ro_tx_failed_pkts); 353 printf(" - Pkts transmitted w/o reorder: %"PRIu64"\n", 354 app_stats.tx.early_pkts_txtd_woro); 355 printf(" - Pkts tx failed w/o reorder: %"PRIu64"\n", 356 app_stats.tx.early_pkts_tx_failed_woro); 357 358 for (i = 0; i < nb_ports; i++) { 359 rte_eth_stats_get(i, ð_stats); 360 printf("\nPort %u stats:\n", i); 361 printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); 362 printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); 363 printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); 364 printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); 365 printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); 366 } 367 } 368 369 static void 370 int_handler(int sig_num) 371 { 372 printf("Exiting on signal %d\n", sig_num); 373 quit_signal = 1; 374 } 375 376 /** 377 * This thread receives mbufs from the port and affects them an internal 378 * sequence number to keep track of their order of arrival through an 379 * mbuf structure. 380 * The mbufs are then passed to the worker threads via the rx_to_workers 381 * ring. 382 */ 383 static int 384 rx_thread(struct rte_ring *ring_out) 385 { 386 const uint8_t nb_ports = rte_eth_dev_count(); 387 uint32_t seqn = 0; 388 uint16_t i, ret = 0; 389 uint16_t nb_rx_pkts; 390 uint16_t port_id; 391 struct rte_mbuf *pkts[MAX_PKTS_BURST]; 392 393 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 394 rte_lcore_id()); 395 396 while (!quit_signal) { 397 398 for (port_id = 0; port_id < nb_ports; port_id++) { 399 if ((portmask & (1 << port_id)) != 0) { 400 401 /* receive packets */ 402 nb_rx_pkts = rte_eth_rx_burst(port_id, 0, 403 pkts, MAX_PKTS_BURST); 404 if (nb_rx_pkts == 0) { 405 RTE_LOG_DP(DEBUG, REORDERAPP, 406 "%s():Received zero packets\n", __func__); 407 continue; 408 } 409 app_stats.rx.rx_pkts += nb_rx_pkts; 410 411 /* mark sequence number */ 412 for (i = 0; i < nb_rx_pkts; ) 413 pkts[i++]->seqn = seqn++; 414 415 /* enqueue to rx_to_workers ring */ 416 ret = rte_ring_enqueue_burst(ring_out, 417 (void *)pkts, nb_rx_pkts, NULL); 418 app_stats.rx.enqueue_pkts += ret; 419 if (unlikely(ret < nb_rx_pkts)) { 420 app_stats.rx.enqueue_failed_pkts += 421 (nb_rx_pkts-ret); 422 pktmbuf_free_bulk(&pkts[ret], nb_rx_pkts - ret); 423 } 424 } 425 } 426 } 427 return 0; 428 } 429 430 /** 431 * This thread takes bursts of packets from the rx_to_workers ring and 432 * Changes the input port value to output port value. And feds it to 433 * workers_to_tx 434 */ 435 static int 436 worker_thread(void *args_ptr) 437 { 438 const uint8_t nb_ports = rte_eth_dev_count(); 439 uint16_t i, ret = 0; 440 uint16_t burst_size = 0; 441 struct worker_thread_args *args; 442 struct rte_mbuf *burst_buffer[MAX_PKTS_BURST] = { NULL }; 443 struct rte_ring *ring_in, *ring_out; 444 const unsigned xor_val = (nb_ports > 1); 445 446 args = (struct worker_thread_args *) args_ptr; 447 ring_in = args->ring_in; 448 ring_out = args->ring_out; 449 450 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 451 rte_lcore_id()); 452 453 while (!quit_signal) { 454 455 /* dequeue the mbufs from rx_to_workers ring */ 456 burst_size = rte_ring_dequeue_burst(ring_in, 457 (void *)burst_buffer, MAX_PKTS_BURST, NULL); 458 if (unlikely(burst_size == 0)) 459 continue; 460 461 __sync_fetch_and_add(&app_stats.wkr.dequeue_pkts, burst_size); 462 463 /* just do some operation on mbuf */ 464 for (i = 0; i < burst_size;) 465 burst_buffer[i++]->port ^= xor_val; 466 467 /* enqueue the modified mbufs to workers_to_tx ring */ 468 ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, 469 burst_size, NULL); 470 __sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret); 471 if (unlikely(ret < burst_size)) { 472 /* Return the mbufs to their respective pool, dropping packets */ 473 __sync_fetch_and_add(&app_stats.wkr.enqueue_failed_pkts, 474 (int)burst_size - ret); 475 pktmbuf_free_bulk(&burst_buffer[ret], burst_size - ret); 476 } 477 } 478 return 0; 479 } 480 481 /** 482 * Dequeue mbufs from the workers_to_tx ring and reorder them before 483 * transmitting. 484 */ 485 static int 486 send_thread(struct send_thread_args *args) 487 { 488 int ret; 489 unsigned int i, dret; 490 uint16_t nb_dq_mbufs; 491 uint8_t outp; 492 unsigned sent; 493 struct rte_mbuf *mbufs[MAX_PKTS_BURST]; 494 struct rte_mbuf *rombufs[MAX_PKTS_BURST] = {NULL}; 495 static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS]; 496 497 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, rte_lcore_id()); 498 499 configure_tx_buffers(tx_buffer); 500 501 while (!quit_signal) { 502 503 /* deque the mbufs from workers_to_tx ring */ 504 nb_dq_mbufs = rte_ring_dequeue_burst(args->ring_in, 505 (void *)mbufs, MAX_PKTS_BURST, NULL); 506 507 if (unlikely(nb_dq_mbufs == 0)) 508 continue; 509 510 app_stats.tx.dequeue_pkts += nb_dq_mbufs; 511 512 for (i = 0; i < nb_dq_mbufs; i++) { 513 /* send dequeued mbufs for reordering */ 514 ret = rte_reorder_insert(args->buffer, mbufs[i]); 515 516 if (ret == -1 && rte_errno == ERANGE) { 517 /* Too early pkts should be transmitted out directly */ 518 RTE_LOG_DP(DEBUG, REORDERAPP, 519 "%s():Cannot reorder early packet " 520 "direct enqueuing to TX\n", __func__); 521 outp = mbufs[i]->port; 522 if ((portmask & (1 << outp)) == 0) { 523 rte_pktmbuf_free(mbufs[i]); 524 continue; 525 } 526 if (rte_eth_tx_burst(outp, 0, (void *)mbufs[i], 1) != 1) { 527 rte_pktmbuf_free(mbufs[i]); 528 app_stats.tx.early_pkts_tx_failed_woro++; 529 } else 530 app_stats.tx.early_pkts_txtd_woro++; 531 } else if (ret == -1 && rte_errno == ENOSPC) { 532 /** 533 * Early pkts just outside of window should be dropped 534 */ 535 rte_pktmbuf_free(mbufs[i]); 536 } 537 } 538 539 /* 540 * drain MAX_PKTS_BURST of reordered 541 * mbufs for transmit 542 */ 543 dret = rte_reorder_drain(args->buffer, rombufs, MAX_PKTS_BURST); 544 for (i = 0; i < dret; i++) { 545 546 struct rte_eth_dev_tx_buffer *outbuf; 547 uint8_t outp1; 548 549 outp1 = rombufs[i]->port; 550 /* skip ports that are not enabled */ 551 if ((portmask & (1 << outp1)) == 0) { 552 rte_pktmbuf_free(rombufs[i]); 553 continue; 554 } 555 556 outbuf = tx_buffer[outp1]; 557 sent = rte_eth_tx_buffer(outp1, 0, outbuf, rombufs[i]); 558 if (sent) 559 app_stats.tx.ro_tx_pkts += sent; 560 } 561 } 562 563 free_tx_buffers(tx_buffer); 564 565 return 0; 566 } 567 568 /** 569 * Dequeue mbufs from the workers_to_tx ring and transmit them 570 */ 571 static int 572 tx_thread(struct rte_ring *ring_in) 573 { 574 uint32_t i, dqnum; 575 uint8_t outp; 576 unsigned sent; 577 struct rte_mbuf *mbufs[MAX_PKTS_BURST]; 578 struct rte_eth_dev_tx_buffer *outbuf; 579 static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS]; 580 581 RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, 582 rte_lcore_id()); 583 584 configure_tx_buffers(tx_buffer); 585 586 while (!quit_signal) { 587 588 /* deque the mbufs from workers_to_tx ring */ 589 dqnum = rte_ring_dequeue_burst(ring_in, 590 (void *)mbufs, MAX_PKTS_BURST, NULL); 591 592 if (unlikely(dqnum == 0)) 593 continue; 594 595 app_stats.tx.dequeue_pkts += dqnum; 596 597 for (i = 0; i < dqnum; i++) { 598 outp = mbufs[i]->port; 599 /* skip ports that are not enabled */ 600 if ((portmask & (1 << outp)) == 0) { 601 rte_pktmbuf_free(mbufs[i]); 602 continue; 603 } 604 605 outbuf = tx_buffer[outp]; 606 sent = rte_eth_tx_buffer(outp, 0, outbuf, mbufs[i]); 607 if (sent) 608 app_stats.tx.ro_tx_pkts += sent; 609 } 610 } 611 612 return 0; 613 } 614 615 int 616 main(int argc, char **argv) 617 { 618 int ret; 619 unsigned nb_ports; 620 unsigned int lcore_id, last_lcore_id, master_lcore_id; 621 uint16_t port_id; 622 uint16_t nb_ports_available; 623 struct worker_thread_args worker_args = {NULL, NULL}; 624 struct send_thread_args send_args = {NULL, NULL}; 625 struct rte_ring *rx_to_workers; 626 struct rte_ring *workers_to_tx; 627 628 /* catch ctrl-c so we can print on exit */ 629 signal(SIGINT, int_handler); 630 631 /* Initialize EAL */ 632 ret = rte_eal_init(argc, argv); 633 if (ret < 0) 634 return -1; 635 636 argc -= ret; 637 argv += ret; 638 639 /* Parse the application specific arguments */ 640 ret = parse_args(argc, argv); 641 if (ret < 0) 642 return -1; 643 644 /* Check if we have enought cores */ 645 if (rte_lcore_count() < 3) 646 rte_exit(EXIT_FAILURE, "Error, This application needs at " 647 "least 3 logical cores to run:\n" 648 "1 lcore for packet RX\n" 649 "1 lcore for packet TX\n" 650 "and at least 1 lcore for worker threads\n"); 651 652 nb_ports = rte_eth_dev_count(); 653 if (nb_ports == 0) 654 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 655 if (nb_ports != 1 && (nb_ports & 1)) 656 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 657 "when using a single port\n"); 658 659 mbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", MBUF_PER_POOL, 660 MBUF_POOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, 661 rte_socket_id()); 662 if (mbuf_pool == NULL) 663 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 664 665 nb_ports_available = nb_ports; 666 667 /* initialize all ports */ 668 for (port_id = 0; port_id < nb_ports; port_id++) { 669 /* skip ports that are not enabled */ 670 if ((portmask & (1 << port_id)) == 0) { 671 printf("\nSkipping disabled port %d\n", port_id); 672 nb_ports_available--; 673 continue; 674 } 675 /* init port */ 676 printf("Initializing port %u... done\n", port_id); 677 678 if (configure_eth_port(port_id) != 0) 679 rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n", 680 port_id); 681 } 682 683 if (!nb_ports_available) { 684 rte_exit(EXIT_FAILURE, 685 "All available ports are disabled. Please set portmask.\n"); 686 } 687 688 /* Create rings for inter core communication */ 689 rx_to_workers = rte_ring_create("rx_to_workers", RING_SIZE, rte_socket_id(), 690 RING_F_SP_ENQ); 691 if (rx_to_workers == NULL) 692 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 693 694 workers_to_tx = rte_ring_create("workers_to_tx", RING_SIZE, rte_socket_id(), 695 RING_F_SC_DEQ); 696 if (workers_to_tx == NULL) 697 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 698 699 if (!disable_reorder) { 700 send_args.buffer = rte_reorder_create("PKT_RO", rte_socket_id(), 701 REORDER_BUFFER_SIZE); 702 if (send_args.buffer == NULL) 703 rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno)); 704 } 705 706 last_lcore_id = get_last_lcore_id(); 707 master_lcore_id = rte_get_master_lcore(); 708 709 worker_args.ring_in = rx_to_workers; 710 worker_args.ring_out = workers_to_tx; 711 712 /* Start worker_thread() on all the available slave cores but the last 1 */ 713 for (lcore_id = 0; lcore_id <= get_previous_lcore_id(last_lcore_id); lcore_id++) 714 if (rte_lcore_is_enabled(lcore_id) && lcore_id != master_lcore_id) 715 rte_eal_remote_launch(worker_thread, (void *)&worker_args, 716 lcore_id); 717 718 if (disable_reorder) { 719 /* Start tx_thread() on the last slave core */ 720 rte_eal_remote_launch((lcore_function_t *)tx_thread, workers_to_tx, 721 last_lcore_id); 722 } else { 723 send_args.ring_in = workers_to_tx; 724 /* Start send_thread() on the last slave core */ 725 rte_eal_remote_launch((lcore_function_t *)send_thread, 726 (void *)&send_args, last_lcore_id); 727 } 728 729 /* Start rx_thread() on the master core */ 730 rx_thread(rx_to_workers); 731 732 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 733 if (rte_eal_wait_lcore(lcore_id) < 0) 734 return -1; 735 } 736 737 print_stats(); 738 return 0; 739 } 740