xref: /dpdk/examples/packet_ordering/main.c (revision fea1d908d39989a27890b29b5c0ec94c85c8257b)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <signal.h>
35 #include <getopt.h>
36 
37 #include <rte_eal.h>
38 #include <rte_common.h>
39 #include <rte_errno.h>
40 #include <rte_ethdev.h>
41 #include <rte_lcore.h>
42 #include <rte_mbuf.h>
43 #include <rte_mempool.h>
44 #include <rte_ring.h>
45 #include <rte_reorder.h>
46 
47 #define RX_DESC_PER_QUEUE 128
48 #define TX_DESC_PER_QUEUE 512
49 
50 #define MAX_PKTS_BURST 32
51 #define REORDER_BUFFER_SIZE 8192
52 #define MBUF_PER_POOL 65535
53 #define MBUF_DATA_SIZE (1600 + RTE_PKTMBUF_HEADROOM)
54 #define MBUF_POOL_CACHE_SIZE 250
55 
56 #define RING_SIZE 16384
57 
58 /* uncommnet below line to enable debug logs */
59 /* #define DEBUG */
60 
61 #ifdef DEBUG
62 #define LOG_LEVEL RTE_LOG_DEBUG
63 #define LOG_DEBUG(log_type, fmt, args...) RTE_LOG(DEBUG, log_type, fmt, ##args)
64 #else
65 #define LOG_LEVEL RTE_LOG_INFO
66 #define LOG_DEBUG(log_type, fmt, args...) do {} while (0)
67 #endif
68 
69 /* Macros for printing using RTE_LOG */
70 #define RTE_LOGTYPE_REORDERAPP          RTE_LOGTYPE_USER1
71 
72 unsigned int portmask;
73 unsigned int disable_reorder;
74 volatile uint8_t quit_signal;
75 
76 static struct rte_mempool *mbuf_pool;
77 
78 static struct rte_eth_conf port_conf_default;
79 
80 struct worker_thread_args {
81 	struct rte_ring *ring_in;
82 	struct rte_ring *ring_out;
83 };
84 
85 struct send_thread_args {
86 	struct rte_ring *ring_in;
87 	struct rte_reorder_buffer *buffer;
88 };
89 
90 struct output_buffer {
91 	unsigned count;
92 	struct rte_mbuf *mbufs[MAX_PKTS_BURST];
93 };
94 
95 volatile struct app_stats {
96 	struct {
97 		uint64_t rx_pkts;
98 		uint64_t enqueue_pkts;
99 		uint64_t enqueue_failed_pkts;
100 	} rx __rte_cache_aligned;
101 
102 	struct {
103 		uint64_t dequeue_pkts;
104 		uint64_t enqueue_pkts;
105 		uint64_t enqueue_failed_pkts;
106 	} wkr __rte_cache_aligned;
107 
108 	struct {
109 		uint64_t dequeue_pkts;
110 		/* Too early pkts transmitted directly w/o reordering */
111 		uint64_t early_pkts_txtd_woro;
112 		/* Too early pkts failed from direct transmit */
113 		uint64_t early_pkts_tx_failed_woro;
114 		uint64_t ro_tx_pkts;
115 		uint64_t ro_tx_failed_pkts;
116 	} tx __rte_cache_aligned;
117 } app_stats;
118 
119 /**
120  * Get the last enabled lcore ID
121  *
122  * @return
123  *   The last enabled lcore ID.
124  */
125 static unsigned int
126 get_last_lcore_id(void)
127 {
128 	int i;
129 
130 	for (i = RTE_MAX_LCORE - 1; i >= 0; i--)
131 		if (rte_lcore_is_enabled(i))
132 			return i;
133 	return 0;
134 }
135 
136 /**
137  * Get the previous enabled lcore ID
138  * @param id
139  *  The current lcore ID
140  * @return
141  *   The previous enabled lcore ID or the current lcore
142  *   ID if it is the first available core.
143  */
144 static unsigned int
145 get_previous_lcore_id(unsigned int id)
146 {
147 	int i;
148 
149 	for (i = id - 1; i >= 0; i--)
150 		if (rte_lcore_is_enabled(i))
151 			return i;
152 	return id;
153 }
154 
155 static inline void
156 pktmbuf_free_bulk(struct rte_mbuf *mbuf_table[], unsigned n)
157 {
158 	unsigned int i;
159 
160 	for (i = 0; i < n; i++)
161 		rte_pktmbuf_free(mbuf_table[i]);
162 }
163 
164 /* display usage */
165 static void
166 print_usage(const char *prgname)
167 {
168 	printf("%s [EAL options] -- -p PORTMASK\n"
169 			"  -p PORTMASK: hexadecimal bitmask of ports to configure\n",
170 			prgname);
171 }
172 
173 static int
174 parse_portmask(const char *portmask)
175 {
176 	unsigned long pm;
177 	char *end = NULL;
178 
179 	/* parse hexadecimal string */
180 	pm = strtoul(portmask, &end, 16);
181 	if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0'))
182 		return -1;
183 
184 	if (pm == 0)
185 		return -1;
186 
187 	return pm;
188 }
189 
190 /* Parse the argument given in the command line of the application */
191 static int
192 parse_args(int argc, char **argv)
193 {
194 	int opt;
195 	int option_index;
196 	char **argvopt;
197 	char *prgname = argv[0];
198 	static struct option lgopts[] = {
199 		{"disable-reorder", 0, 0, 0},
200 		{NULL, 0, 0, 0}
201 	};
202 
203 	argvopt = argv;
204 
205 	while ((opt = getopt_long(argc, argvopt, "p:",
206 					lgopts, &option_index)) != EOF) {
207 		switch (opt) {
208 		/* portmask */
209 		case 'p':
210 			portmask = parse_portmask(optarg);
211 			if (portmask == 0) {
212 				printf("invalid portmask\n");
213 				print_usage(prgname);
214 				return -1;
215 			}
216 			break;
217 		/* long options */
218 		case 0:
219 			if (!strcmp(lgopts[option_index].name, "disable-reorder")) {
220 				printf("reorder disabled\n");
221 				disable_reorder = 1;
222 			}
223 			break;
224 		default:
225 			print_usage(prgname);
226 			return -1;
227 		}
228 	}
229 	if (optind <= 1) {
230 		print_usage(prgname);
231 		return -1;
232 	}
233 
234 	argv[optind-1] = prgname;
235 	optind = 0; /* reset getopt lib */
236 	return 0;
237 }
238 
239 static inline int
240 configure_eth_port(uint8_t port_id)
241 {
242 	struct ether_addr addr;
243 	const uint16_t rxRings = 1, txRings = 1;
244 	const uint8_t nb_ports = rte_eth_dev_count();
245 	int ret;
246 	uint16_t q;
247 
248 	if (port_id > nb_ports)
249 		return -1;
250 
251 	ret = rte_eth_dev_configure(port_id, rxRings, txRings, &port_conf_default);
252 	if (ret != 0)
253 		return ret;
254 
255 	for (q = 0; q < rxRings; q++) {
256 		ret = rte_eth_rx_queue_setup(port_id, q, RX_DESC_PER_QUEUE,
257 				rte_eth_dev_socket_id(port_id), NULL,
258 				mbuf_pool);
259 		if (ret < 0)
260 			return ret;
261 	}
262 
263 	for (q = 0; q < txRings; q++) {
264 		ret = rte_eth_tx_queue_setup(port_id, q, TX_DESC_PER_QUEUE,
265 				rte_eth_dev_socket_id(port_id), NULL);
266 		if (ret < 0)
267 			return ret;
268 	}
269 
270 	ret = rte_eth_dev_start(port_id);
271 	if (ret < 0)
272 		return ret;
273 
274 	rte_eth_macaddr_get(port_id, &addr);
275 	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
276 			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
277 			(unsigned)port_id,
278 			addr.addr_bytes[0], addr.addr_bytes[1],
279 			addr.addr_bytes[2], addr.addr_bytes[3],
280 			addr.addr_bytes[4], addr.addr_bytes[5]);
281 
282 	rte_eth_promiscuous_enable(port_id);
283 
284 	return 0;
285 }
286 
287 static void
288 print_stats(void)
289 {
290 	const uint8_t nb_ports = rte_eth_dev_count();
291 	unsigned i;
292 	struct rte_eth_stats eth_stats;
293 
294 	printf("\nRX thread stats:\n");
295 	printf(" - Pkts rxd:				%"PRIu64"\n",
296 						app_stats.rx.rx_pkts);
297 	printf(" - Pkts enqd to workers ring:		%"PRIu64"\n",
298 						app_stats.rx.enqueue_pkts);
299 
300 	printf("\nWorker thread stats:\n");
301 	printf(" - Pkts deqd from workers ring:		%"PRIu64"\n",
302 						app_stats.wkr.dequeue_pkts);
303 	printf(" - Pkts enqd to tx ring:		%"PRIu64"\n",
304 						app_stats.wkr.enqueue_pkts);
305 	printf(" - Pkts enq to tx failed:		%"PRIu64"\n",
306 						app_stats.wkr.enqueue_failed_pkts);
307 
308 	printf("\nTX stats:\n");
309 	printf(" - Pkts deqd from tx ring:		%"PRIu64"\n",
310 						app_stats.tx.dequeue_pkts);
311 	printf(" - Ro Pkts transmitted:			%"PRIu64"\n",
312 						app_stats.tx.ro_tx_pkts);
313 	printf(" - Ro Pkts tx failed:			%"PRIu64"\n",
314 						app_stats.tx.ro_tx_failed_pkts);
315 	printf(" - Pkts transmitted w/o reorder:	%"PRIu64"\n",
316 						app_stats.tx.early_pkts_txtd_woro);
317 	printf(" - Pkts tx failed w/o reorder:		%"PRIu64"\n",
318 						app_stats.tx.early_pkts_tx_failed_woro);
319 
320 	for (i = 0; i < nb_ports; i++) {
321 		rte_eth_stats_get(i, &eth_stats);
322 		printf("\nPort %u stats:\n", i);
323 		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
324 		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
325 		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
326 		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
327 		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
328 	}
329 }
330 
331 static void
332 int_handler(int sig_num)
333 {
334 	printf("Exiting on signal %d\n", sig_num);
335 	quit_signal = 1;
336 }
337 
338 /**
339  * This thread receives mbufs from the port and affects them an internal
340  * sequence number to keep track of their order of arrival through an
341  * mbuf structure.
342  * The mbufs are then passed to the worker threads via the rx_to_workers
343  * ring.
344  */
345 static int
346 rx_thread(struct rte_ring *ring_out)
347 {
348 	const uint8_t nb_ports = rte_eth_dev_count();
349 	uint32_t seqn = 0;
350 	uint16_t i, ret = 0;
351 	uint16_t nb_rx_pkts;
352 	uint8_t port_id;
353 	struct rte_mbuf *pkts[MAX_PKTS_BURST];
354 
355 	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__,
356 							rte_lcore_id());
357 
358 	while (!quit_signal) {
359 
360 		for (port_id = 0; port_id < nb_ports; port_id++) {
361 			if ((portmask & (1 << port_id)) != 0) {
362 
363 				/* receive packets */
364 				nb_rx_pkts = rte_eth_rx_burst(port_id, 0,
365 								pkts, MAX_PKTS_BURST);
366 				if (nb_rx_pkts == 0) {
367 					LOG_DEBUG(REORDERAPP,
368 					"%s():Received zero packets\n",	__func__);
369 					continue;
370 				}
371 				app_stats.rx.rx_pkts += nb_rx_pkts;
372 
373 				/* mark sequence number */
374 				for (i = 0; i < nb_rx_pkts; )
375 					pkts[i++]->seqn = seqn++;
376 
377 				/* enqueue to rx_to_workers ring */
378 				ret = rte_ring_enqueue_burst(ring_out, (void *) pkts,
379 								nb_rx_pkts);
380 				app_stats.rx.enqueue_pkts += ret;
381 				if (unlikely(ret < nb_rx_pkts)) {
382 					app_stats.rx.enqueue_failed_pkts +=
383 									(nb_rx_pkts-ret);
384 					pktmbuf_free_bulk(&pkts[ret], nb_rx_pkts - ret);
385 				}
386 			}
387 		}
388 	}
389 	return 0;
390 }
391 
392 /**
393  * This thread takes bursts of packets from the rx_to_workers ring and
394  * Changes the input port value to output port value. And feds it to
395  * workers_to_tx
396  */
397 static int
398 worker_thread(void *args_ptr)
399 {
400 	const uint8_t nb_ports = rte_eth_dev_count();
401 	uint16_t i, ret = 0;
402 	uint16_t burst_size = 0;
403 	struct worker_thread_args *args;
404 	struct rte_mbuf *burst_buffer[MAX_PKTS_BURST] = { NULL };
405 	struct rte_ring *ring_in, *ring_out;
406 	const unsigned xor_val = (nb_ports > 1);
407 
408 	args = (struct worker_thread_args *) args_ptr;
409 	ring_in  = args->ring_in;
410 	ring_out = args->ring_out;
411 
412 	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__,
413 							rte_lcore_id());
414 
415 	while (!quit_signal) {
416 
417 		/* dequeue the mbufs from rx_to_workers ring */
418 		burst_size = rte_ring_dequeue_burst(ring_in,
419 				(void *)burst_buffer, MAX_PKTS_BURST);
420 		if (unlikely(burst_size == 0))
421 			continue;
422 
423 		__sync_fetch_and_add(&app_stats.wkr.dequeue_pkts, burst_size);
424 
425 		/* just do some operation on mbuf */
426 		for (i = 0; i < burst_size;)
427 			burst_buffer[i++]->port ^= xor_val;
428 
429 		/* enqueue the modified mbufs to workers_to_tx ring */
430 		ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, burst_size);
431 		__sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret);
432 		if (unlikely(ret < burst_size)) {
433 			/* Return the mbufs to their respective pool, dropping packets */
434 			__sync_fetch_and_add(&app_stats.wkr.enqueue_failed_pkts,
435 					(int)burst_size - ret);
436 			pktmbuf_free_bulk(&burst_buffer[ret], burst_size - ret);
437 		}
438 	}
439 	return 0;
440 }
441 
442 static inline void
443 flush_one_port(struct output_buffer *outbuf, uint8_t outp)
444 {
445 	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
446 			outbuf->count);
447 	app_stats.tx.ro_tx_pkts += nb_tx;
448 
449 	if (unlikely(nb_tx < outbuf->count)) {
450 		/* free the mbufs which failed from transmit */
451 		app_stats.tx.ro_tx_failed_pkts += (outbuf->count - nb_tx);
452 		LOG_DEBUG(REORDERAPP, "%s:Packet loss with tx_burst\n", __func__);
453 		pktmbuf_free_bulk(&outbuf->mbufs[nb_tx], outbuf->count - nb_tx);
454 	}
455 	outbuf->count = 0;
456 }
457 
458 /**
459  * Dequeue mbufs from the workers_to_tx ring and reorder them before
460  * transmitting.
461  */
462 static int
463 send_thread(struct send_thread_args *args)
464 {
465 	int ret;
466 	unsigned int i, dret;
467 	uint16_t nb_dq_mbufs;
468 	uint8_t outp;
469 	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
470 	struct rte_mbuf *mbufs[MAX_PKTS_BURST];
471 	struct rte_mbuf *rombufs[MAX_PKTS_BURST] = {NULL};
472 
473 	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, rte_lcore_id());
474 
475 	while (!quit_signal) {
476 
477 		/* deque the mbufs from workers_to_tx ring */
478 		nb_dq_mbufs = rte_ring_dequeue_burst(args->ring_in,
479 				(void *)mbufs, MAX_PKTS_BURST);
480 
481 		if (unlikely(nb_dq_mbufs == 0))
482 			continue;
483 
484 		app_stats.tx.dequeue_pkts += nb_dq_mbufs;
485 
486 		for (i = 0; i < nb_dq_mbufs; i++) {
487 			/* send dequeued mbufs for reordering */
488 			ret = rte_reorder_insert(args->buffer, mbufs[i]);
489 
490 			if (ret == -1 && rte_errno == ERANGE) {
491 				/* Too early pkts should be transmitted out directly */
492 				LOG_DEBUG(REORDERAPP, "%s():Cannot reorder early packet "
493 						"direct enqueuing to TX\n", __func__);
494 				outp = mbufs[i]->port;
495 				if ((portmask & (1 << outp)) == 0) {
496 					rte_pktmbuf_free(mbufs[i]);
497 					continue;
498 				}
499 				if (rte_eth_tx_burst(outp, 0, (void *)mbufs[i], 1) != 1) {
500 					rte_pktmbuf_free(mbufs[i]);
501 					app_stats.tx.early_pkts_tx_failed_woro++;
502 				} else
503 					app_stats.tx.early_pkts_txtd_woro++;
504 			} else if (ret == -1 && rte_errno == ENOSPC) {
505 				/**
506 				 * Early pkts just outside of window should be dropped
507 				 */
508 				rte_pktmbuf_free(mbufs[i]);
509 			}
510 		}
511 
512 		/*
513 		 * drain MAX_PKTS_BURST of reordered
514 		 * mbufs for transmit
515 		 */
516 		dret = rte_reorder_drain(args->buffer, rombufs, MAX_PKTS_BURST);
517 		for (i = 0; i < dret; i++) {
518 
519 			struct output_buffer *outbuf;
520 			uint8_t outp1;
521 
522 			outp1 = rombufs[i]->port;
523 			/* skip ports that are not enabled */
524 			if ((portmask & (1 << outp1)) == 0) {
525 				rte_pktmbuf_free(rombufs[i]);
526 				continue;
527 			}
528 
529 			outbuf = &tx_buffers[outp1];
530 			outbuf->mbufs[outbuf->count++] = rombufs[i];
531 			if (outbuf->count == MAX_PKTS_BURST)
532 				flush_one_port(outbuf, outp1);
533 		}
534 	}
535 	return 0;
536 }
537 
538 /**
539  * Dequeue mbufs from the workers_to_tx ring and transmit them
540  */
541 static int
542 tx_thread(struct rte_ring *ring_in)
543 {
544 	uint32_t i, dqnum;
545 	uint8_t outp;
546 	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
547 	struct rte_mbuf *mbufs[MAX_PKTS_BURST];
548 	struct output_buffer *outbuf;
549 
550 	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__,
551 							rte_lcore_id());
552 	while (!quit_signal) {
553 
554 		/* deque the mbufs from workers_to_tx ring */
555 		dqnum = rte_ring_dequeue_burst(ring_in,
556 				(void *)mbufs, MAX_PKTS_BURST);
557 
558 		if (unlikely(dqnum == 0))
559 			continue;
560 
561 		app_stats.tx.dequeue_pkts += dqnum;
562 
563 		for (i = 0; i < dqnum; i++) {
564 			outp = mbufs[i]->port;
565 			/* skip ports that are not enabled */
566 			if ((portmask & (1 << outp)) == 0) {
567 				rte_pktmbuf_free(mbufs[i]);
568 				continue;
569 			}
570 
571 			outbuf = &tx_buffers[outp];
572 			outbuf->mbufs[outbuf->count++] = mbufs[i];
573 			if (outbuf->count == MAX_PKTS_BURST)
574 				flush_one_port(outbuf, outp);
575 		}
576 	}
577 
578 	return 0;
579 }
580 
581 int
582 main(int argc, char **argv)
583 {
584 	int ret;
585 	unsigned nb_ports;
586 	unsigned int lcore_id, last_lcore_id, master_lcore_id;
587 	uint8_t port_id;
588 	uint8_t nb_ports_available;
589 	struct worker_thread_args worker_args = {NULL, NULL};
590 	struct send_thread_args send_args = {NULL, NULL};
591 	struct rte_ring *rx_to_workers;
592 	struct rte_ring *workers_to_tx;
593 
594 	/* catch ctrl-c so we can print on exit */
595 	signal(SIGINT, int_handler);
596 
597 	/* Initialize EAL */
598 	ret = rte_eal_init(argc, argv);
599 	if (ret < 0)
600 		return -1;
601 
602 	argc -= ret;
603 	argv += ret;
604 
605 	/* Parse the application specific arguments */
606 	ret = parse_args(argc, argv);
607 	if (ret < 0)
608 		return -1;
609 
610 	/* Check if we have enought cores */
611 	if (rte_lcore_count() < 3)
612 		rte_exit(EXIT_FAILURE, "Error, This application needs at "
613 				"least 3 logical cores to run:\n"
614 				"1 lcore for packet RX\n"
615 				"1 lcore for packet TX\n"
616 				"and at least 1 lcore for worker threads\n");
617 
618 	nb_ports = rte_eth_dev_count();
619 	if (nb_ports == 0)
620 		rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n");
621 	if (nb_ports != 1 && (nb_ports & 1))
622 		rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except "
623 				"when using a single port\n");
624 
625 	mbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", MBUF_PER_POOL,
626 			MBUF_POOL_CACHE_SIZE, 0, MBUF_DATA_SIZE,
627 			rte_socket_id());
628 	if (mbuf_pool == NULL)
629 		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
630 
631 	nb_ports_available = nb_ports;
632 
633 	/* initialize all ports */
634 	for (port_id = 0; port_id < nb_ports; port_id++) {
635 		/* skip ports that are not enabled */
636 		if ((portmask & (1 << port_id)) == 0) {
637 			printf("\nSkipping disabled port %d\n", port_id);
638 			nb_ports_available--;
639 			continue;
640 		}
641 		/* init port */
642 		printf("Initializing port %u... done\n", (unsigned) port_id);
643 
644 		if (configure_eth_port(port_id) != 0)
645 			rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n",
646 					port_id);
647 	}
648 
649 	if (!nb_ports_available) {
650 		rte_exit(EXIT_FAILURE,
651 			"All available ports are disabled. Please set portmask.\n");
652 	}
653 
654 	/* Create rings for inter core communication */
655 	rx_to_workers = rte_ring_create("rx_to_workers", RING_SIZE, rte_socket_id(),
656 			RING_F_SP_ENQ);
657 	if (rx_to_workers == NULL)
658 		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
659 
660 	workers_to_tx = rte_ring_create("workers_to_tx", RING_SIZE, rte_socket_id(),
661 			RING_F_SC_DEQ);
662 	if (workers_to_tx == NULL)
663 		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
664 
665 	if (!disable_reorder) {
666 		send_args.buffer = rte_reorder_create("PKT_RO", rte_socket_id(),
667 				REORDER_BUFFER_SIZE);
668 		if (send_args.buffer == NULL)
669 			rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
670 	}
671 
672 	last_lcore_id   = get_last_lcore_id();
673 	master_lcore_id = rte_get_master_lcore();
674 
675 	worker_args.ring_in  = rx_to_workers;
676 	worker_args.ring_out = workers_to_tx;
677 
678 	/* Start worker_thread() on all the available slave cores but the last 1 */
679 	for (lcore_id = 0; lcore_id <= get_previous_lcore_id(last_lcore_id); lcore_id++)
680 		if (rte_lcore_is_enabled(lcore_id) && lcore_id != master_lcore_id)
681 			rte_eal_remote_launch(worker_thread, (void *)&worker_args,
682 					lcore_id);
683 
684 	if (disable_reorder) {
685 		/* Start tx_thread() on the last slave core */
686 		rte_eal_remote_launch((lcore_function_t *)tx_thread, workers_to_tx,
687 				last_lcore_id);
688 	} else {
689 		send_args.ring_in = workers_to_tx;
690 		/* Start send_thread() on the last slave core */
691 		rte_eal_remote_launch((lcore_function_t *)send_thread,
692 				(void *)&send_args, last_lcore_id);
693 	}
694 
695 	/* Start rx_thread() on the master core */
696 	rx_thread(rx_to_workers);
697 
698 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
699 		if (rte_eal_wait_lcore(lcore_id) < 0)
700 			return -1;
701 	}
702 
703 	print_stats();
704 	return 0;
705 }
706