xref: /dpdk/examples/distributor/main.c (revision f30a1bbd63f494f5ba623582d7e9166c817794a4)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2017 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 #include <stdlib.h>
7 #include <inttypes.h>
8 #include <unistd.h>
9 #include <signal.h>
10 #include <getopt.h>
11 
12 #include <rte_eal.h>
13 #include <rte_ethdev.h>
14 #include <rte_cycles.h>
15 #include <rte_malloc.h>
16 #include <rte_debug.h>
17 #include <rte_prefetch.h>
18 #include <rte_distributor.h>
19 #include <rte_pause.h>
20 #include <rte_power_cpufreq.h>
21 
22 #define RX_RING_SIZE 1024
23 #define TX_RING_SIZE 1024
24 #define NUM_MBUFS ((64*1024)-1)
25 #define MBUF_CACHE_SIZE 128
26 #define BURST_SIZE 64
27 #define SCHED_RX_RING_SZ 8192
28 #define SCHED_TX_RING_SZ 65536
29 #define BURST_SIZE_TX 32
30 
31 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
32 
33 #define ANSI_COLOR_RED     "\x1b[31m"
34 #define ANSI_COLOR_RESET   "\x1b[0m"
35 
36 /* mask of enabled ports */
37 static uint32_t enabled_port_mask;
38 volatile uint8_t quit_signal;
39 volatile uint8_t quit_signal_rx;
40 volatile uint8_t quit_signal_dist;
41 volatile uint8_t quit_signal_work;
42 unsigned int power_lib_initialised;
43 bool enable_lcore_rx_distributor;
44 unsigned int num_workers;
45 
46 static volatile struct app_stats {
47 	alignas(RTE_CACHE_LINE_SIZE) struct {
48 		uint64_t rx_pkts;
49 		uint64_t returned_pkts;
50 		uint64_t enqueued_pkts;
51 		uint64_t enqdrop_pkts;
52 	} rx;
53 	alignas(RTE_CACHE_LINE_SIZE) int pad1;
54 
55 	alignas(RTE_CACHE_LINE_SIZE) struct {
56 		uint64_t in_pkts;
57 		uint64_t ret_pkts;
58 		uint64_t sent_pkts;
59 		uint64_t enqdrop_pkts;
60 	} dist;
61 	alignas(RTE_CACHE_LINE_SIZE) int pad2;
62 
63 	alignas(RTE_CACHE_LINE_SIZE) struct {
64 		uint64_t dequeue_pkts;
65 		uint64_t tx_pkts;
66 		uint64_t enqdrop_pkts;
67 	} tx;
68 	alignas(RTE_CACHE_LINE_SIZE) int pad3;
69 
70 	alignas(RTE_CACHE_LINE_SIZE) uint64_t worker_pkts[64];
71 
72 	alignas(RTE_CACHE_LINE_SIZE) int pad4;
73 
74 	alignas(RTE_CACHE_LINE_SIZE) uint64_t worker_bursts[64][8];
75 
76 	alignas(RTE_CACHE_LINE_SIZE) int pad5;
77 
78 	alignas(RTE_CACHE_LINE_SIZE) uint64_t port_rx_pkts[64];
79 	alignas(RTE_CACHE_LINE_SIZE) uint64_t port_tx_pkts[64];
80 } app_stats;
81 
82 struct app_stats prev_app_stats;
83 
84 static const struct rte_eth_conf port_conf_default = {
85 	.rxmode = {
86 		.mq_mode = RTE_ETH_MQ_RX_RSS,
87 	},
88 	.txmode = {
89 		.mq_mode = RTE_ETH_MQ_TX_NONE,
90 	},
91 	.rx_adv_conf = {
92 		.rss_conf = {
93 			.rss_hf = RTE_ETH_RSS_IP | RTE_ETH_RSS_UDP |
94 				RTE_ETH_RSS_TCP | RTE_ETH_RSS_SCTP,
95 		}
96 	},
97 };
98 
99 struct output_buffer {
100 	unsigned count;
101 	struct rte_mbuf *mbufs[BURST_SIZE];
102 };
103 
104 static void print_stats(void);
105 
106 /*
107  * Initialises a given port using global settings and with the rx buffers
108  * coming from the mbuf_pool passed as parameter
109  */
110 static inline int
111 port_init(uint16_t port, struct rte_mempool *mbuf_pool)
112 {
113 	struct rte_eth_conf port_conf = port_conf_default;
114 	const uint16_t rxRings = 1, txRings = 1;
115 	int retval;
116 	uint16_t q;
117 	uint16_t nb_rxd = RX_RING_SIZE;
118 	uint16_t nb_txd = TX_RING_SIZE;
119 	struct rte_eth_dev_info dev_info;
120 	struct rte_eth_txconf txconf;
121 
122 	if (!rte_eth_dev_is_valid_port(port))
123 		return -1;
124 
125 	retval = rte_eth_dev_info_get(port, &dev_info);
126 	if (retval != 0) {
127 		printf("Error during getting device (port %u) info: %s\n",
128 				port, strerror(-retval));
129 		return retval;
130 	}
131 
132 	if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
133 		port_conf.txmode.offloads |=
134 			RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
135 
136 	port_conf.rx_adv_conf.rss_conf.rss_hf &=
137 		dev_info.flow_type_rss_offloads;
138 	if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
139 			port_conf_default.rx_adv_conf.rss_conf.rss_hf) {
140 		printf("Port %u modified RSS hash function based on hardware support,"
141 			"requested:%#"PRIx64" configured:%#"PRIx64"\n",
142 			port,
143 			port_conf_default.rx_adv_conf.rss_conf.rss_hf,
144 			port_conf.rx_adv_conf.rss_conf.rss_hf);
145 	}
146 
147 	retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
148 	if (retval != 0)
149 		return retval;
150 
151 	retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
152 	if (retval != 0)
153 		return retval;
154 
155 	for (q = 0; q < rxRings; q++) {
156 		retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
157 						rte_eth_dev_socket_id(port),
158 						NULL, mbuf_pool);
159 		if (retval < 0)
160 			return retval;
161 	}
162 
163 	txconf = dev_info.default_txconf;
164 	txconf.offloads = port_conf.txmode.offloads;
165 	for (q = 0; q < txRings; q++) {
166 		retval = rte_eth_tx_queue_setup(port, q, nb_txd,
167 						rte_eth_dev_socket_id(port),
168 						&txconf);
169 		if (retval < 0)
170 			return retval;
171 	}
172 
173 	retval = rte_eth_dev_start(port);
174 	if (retval < 0)
175 		return retval;
176 
177 	struct rte_eth_link link;
178 	do {
179 		retval = rte_eth_link_get_nowait(port, &link);
180 		if (retval < 0) {
181 			printf("Failed link get (port %u): %s\n",
182 				port, rte_strerror(-retval));
183 			return retval;
184 		} else if (link.link_status)
185 			break;
186 
187 		printf("Waiting for Link up on port %"PRIu16"\n", port);
188 		sleep(1);
189 	} while (!link.link_status);
190 
191 	if (!link.link_status) {
192 		printf("Link down on port %"PRIu16"\n", port);
193 		return 0;
194 	}
195 
196 	struct rte_ether_addr addr;
197 	retval = rte_eth_macaddr_get(port, &addr);
198 	if (retval < 0) {
199 		printf("Failed to get MAC address (port %u): %s\n",
200 				port, rte_strerror(-retval));
201 		return retval;
202 	}
203 
204 	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
205 			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
206 			port, RTE_ETHER_ADDR_BYTES(&addr));
207 
208 	retval = rte_eth_promiscuous_enable(port);
209 	if (retval != 0)
210 		return retval;
211 
212 	return 0;
213 }
214 
215 struct lcore_params {
216 	unsigned worker_id;
217 	struct rte_distributor *d;
218 	struct rte_ring *rx_dist_ring;
219 	struct rte_ring *dist_tx_ring;
220 	struct rte_mempool *mem_pool;
221 };
222 
223 static int
224 lcore_rx(struct lcore_params *p)
225 {
226 	const uint16_t nb_ports = rte_eth_dev_count_avail();
227 	const int socket_id = rte_socket_id();
228 	uint16_t port;
229 	struct rte_mbuf *bufs[BURST_SIZE*2];
230 
231 	RTE_ETH_FOREACH_DEV(port) {
232 		/* skip ports that are not enabled */
233 		if ((enabled_port_mask & (1 << port)) == 0)
234 			continue;
235 
236 		if (rte_eth_dev_socket_id(port) >= 0 &&
237 				rte_eth_dev_socket_id(port) != socket_id)
238 			printf("WARNING, port %u is on remote NUMA node to "
239 					"RX thread.\n\tPerformance will not "
240 					"be optimal.\n", port);
241 	}
242 
243 	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
244 	port = 0;
245 	while (!quit_signal_rx) {
246 
247 		/* skip ports that are not enabled */
248 		if ((enabled_port_mask & (1 << port)) == 0) {
249 			if (++port == nb_ports)
250 				port = 0;
251 			continue;
252 		}
253 		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
254 				BURST_SIZE);
255 		if (unlikely(nb_rx == 0)) {
256 			if (++port == nb_ports)
257 				port = 0;
258 			continue;
259 		}
260 		app_stats.rx.rx_pkts += nb_rx;
261 
262 		/*
263 		 * Swap the following two lines if you want the rx traffic
264 		 * to go directly to tx, no distribution.
265 		 */
266 		struct rte_ring *out_ring = p->rx_dist_ring;
267 		/* struct rte_ring *out_ring = p->dist_tx_ring; */
268 
269 		uint16_t sent = rte_ring_enqueue_burst(out_ring,
270 				(void *)bufs, nb_rx, NULL);
271 
272 		app_stats.rx.enqueued_pkts += sent;
273 		if (unlikely(sent < nb_rx)) {
274 			app_stats.rx.enqdrop_pkts +=  nb_rx - sent;
275 			RTE_LOG_DP(DEBUG, DISTRAPP,
276 				"%s:Packet loss due to full ring\n", __func__);
277 			while (sent < nb_rx)
278 				rte_pktmbuf_free(bufs[sent++]);
279 		}
280 		if (++port == nb_ports)
281 			port = 0;
282 	}
283 	if (power_lib_initialised)
284 		rte_power_exit(rte_lcore_id());
285 	printf("\nCore %u exiting rx task.\n", rte_lcore_id());
286 	/* set distributor threads quit flag */
287 	quit_signal_dist = 1;
288 	return 0;
289 }
290 
291 static int
292 lcore_rx_and_distributor(struct lcore_params *p)
293 {
294 	struct rte_distributor *d = p->d;
295 	const uint16_t nb_ports = rte_eth_dev_count_avail();
296 	const int socket_id = rte_socket_id();
297 	uint16_t port;
298 	struct rte_mbuf *bufs[BURST_SIZE*2];
299 
300 	RTE_ETH_FOREACH_DEV(port) {
301 		/* skip ports that are not enabled */
302 		if ((enabled_port_mask & (1 << port)) == 0)
303 			continue;
304 
305 		if (rte_eth_dev_socket_id(port) > 0 &&
306 				rte_eth_dev_socket_id(port) != socket_id)
307 			printf("WARNING, port %u is on remote NUMA node to "
308 					"RX thread.\n\tPerformance will not "
309 					"be optimal.\n", port);
310 	}
311 
312 	printf("\nCore %u doing packet RX and Distributor.\n", rte_lcore_id());
313 	port = 0;
314 	while (!quit_signal_rx) {
315 
316 		/* skip ports that are not enabled */
317 		if ((enabled_port_mask & (1 << port)) == 0) {
318 			if (++port == nb_ports)
319 				port = 0;
320 			continue;
321 		}
322 		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
323 				BURST_SIZE);
324 		if (unlikely(nb_rx == 0)) {
325 			if (++port == nb_ports)
326 				port = 0;
327 			continue;
328 		}
329 		app_stats.rx.rx_pkts += nb_rx;
330 
331 		/*
332 		 * Run the distributor on the rx core. Returned
333 		 * packets are then send straight to the tx core.
334 		 */
335 		rte_distributor_process(d, bufs, nb_rx);
336 		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
337 				bufs, BURST_SIZE*2);
338 
339 		app_stats.rx.returned_pkts += nb_ret;
340 		if (unlikely(nb_ret == 0)) {
341 			if (++port == nb_ports)
342 				port = 0;
343 			continue;
344 		}
345 
346 		struct rte_ring *tx_ring = p->dist_tx_ring;
347 		uint16_t sent = rte_ring_enqueue_burst(tx_ring,
348 				(void *)bufs, nb_ret, NULL);
349 
350 		app_stats.rx.enqueued_pkts += sent;
351 		if (unlikely(sent < nb_ret)) {
352 			app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
353 			RTE_LOG_DP(DEBUG, DISTRAPP,
354 				"%s:Packet loss due to full ring\n", __func__);
355 			while (sent < nb_ret)
356 				rte_pktmbuf_free(bufs[sent++]);
357 		}
358 		if (++port == nb_ports)
359 			port = 0;
360 	}
361 	if (power_lib_initialised)
362 		rte_power_exit(rte_lcore_id());
363 	printf("\nCore %u exiting rx task.\n", rte_lcore_id());
364 	/* set tx threads quit flag */
365 	quit_signal = 1;
366 	/* set worker threads quit flag */
367 	quit_signal_work = 1;
368 	rte_distributor_flush(d);
369 	/* Unblock any returns so workers can exit */
370 	rte_distributor_clear_returns(d);
371 	return 0;
372 }
373 
374 static inline void
375 flush_one_port(struct output_buffer *outbuf, uint8_t outp)
376 {
377 	unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
378 			outbuf->mbufs, outbuf->count);
379 	app_stats.tx.tx_pkts += outbuf->count;
380 
381 	if (unlikely(nb_tx < outbuf->count)) {
382 		app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
383 		do {
384 			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
385 		} while (++nb_tx < outbuf->count);
386 	}
387 	outbuf->count = 0;
388 }
389 
390 static inline void
391 flush_all_ports(struct output_buffer *tx_buffers)
392 {
393 	uint16_t outp;
394 
395 	RTE_ETH_FOREACH_DEV(outp) {
396 		/* skip ports that are not enabled */
397 		if ((enabled_port_mask & (1 << outp)) == 0)
398 			continue;
399 
400 		if (tx_buffers[outp].count == 0)
401 			continue;
402 
403 		flush_one_port(&tx_buffers[outp], outp);
404 	}
405 }
406 
407 
408 
409 static int
410 lcore_distributor(struct lcore_params *p)
411 {
412 	struct rte_ring *in_r = p->rx_dist_ring;
413 	struct rte_ring *out_r = p->dist_tx_ring;
414 	struct rte_mbuf *bufs[BURST_SIZE * 4];
415 	struct rte_distributor *d = p->d;
416 
417 	printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
418 	while (!quit_signal_dist) {
419 		const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
420 				(void *)bufs, BURST_SIZE*1, NULL);
421 		if (nb_rx) {
422 			app_stats.dist.in_pkts += nb_rx;
423 
424 			/* Distribute the packets */
425 			rte_distributor_process(d, bufs, nb_rx);
426 			/* Handle Returns */
427 			const uint16_t nb_ret =
428 				rte_distributor_returned_pkts(d,
429 					bufs, BURST_SIZE*2);
430 
431 			if (unlikely(nb_ret == 0))
432 				continue;
433 			app_stats.dist.ret_pkts += nb_ret;
434 
435 			uint16_t sent = rte_ring_enqueue_burst(out_r,
436 					(void *)bufs, nb_ret, NULL);
437 			app_stats.dist.sent_pkts += sent;
438 			if (unlikely(sent < nb_ret)) {
439 				app_stats.dist.enqdrop_pkts += nb_ret - sent;
440 				RTE_LOG(DEBUG, DISTRAPP,
441 					"%s:Packet loss due to full out ring\n",
442 					__func__);
443 				while (sent < nb_ret)
444 					rte_pktmbuf_free(bufs[sent++]);
445 			}
446 		}
447 	}
448 	if (power_lib_initialised)
449 		rte_power_exit(rte_lcore_id());
450 	printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
451 	/* set tx threads quit flag */
452 	quit_signal = 1;
453 	/* set worker threads quit flag */
454 	quit_signal_work = 1;
455 	rte_distributor_flush(d);
456 	/* Unblock any returns so workers can exit */
457 	rte_distributor_clear_returns(d);
458 	return 0;
459 }
460 
461 
462 static int
463 lcore_tx(struct rte_ring *in_r)
464 {
465 	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
466 	const int socket_id = rte_socket_id();
467 	uint16_t port;
468 
469 	RTE_ETH_FOREACH_DEV(port) {
470 		/* skip ports that are not enabled */
471 		if ((enabled_port_mask & (1 << port)) == 0)
472 			continue;
473 
474 		if (rte_eth_dev_socket_id(port) >= 0 &&
475 				rte_eth_dev_socket_id(port) != socket_id)
476 			printf("WARNING, port %u is on remote NUMA node to "
477 					"TX thread.\n\tPerformance will not "
478 					"be optimal.\n", port);
479 	}
480 
481 	printf("\nCore %u doing packet TX.\n", rte_lcore_id());
482 	while (!quit_signal) {
483 
484 		RTE_ETH_FOREACH_DEV(port) {
485 			/* skip ports that are not enabled */
486 			if ((enabled_port_mask & (1 << port)) == 0)
487 				continue;
488 
489 			struct rte_mbuf *bufs[BURST_SIZE_TX];
490 			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
491 					(void *)bufs, BURST_SIZE_TX, NULL);
492 			app_stats.tx.dequeue_pkts += nb_rx;
493 
494 			/* if we get no traffic, flush anything we have */
495 			if (unlikely(nb_rx == 0)) {
496 				flush_all_ports(tx_buffers);
497 				continue;
498 			}
499 
500 			/* for traffic we receive, queue it up for transmit */
501 			uint16_t i;
502 			rte_prefetch_non_temporal((void *)bufs[0]);
503 			rte_prefetch_non_temporal((void *)bufs[1]);
504 			rte_prefetch_non_temporal((void *)bufs[2]);
505 			for (i = 0; i < nb_rx; i++) {
506 				struct output_buffer *outbuf;
507 				uint8_t outp;
508 				rte_prefetch_non_temporal((void *)bufs[i + 3]);
509 				/*
510 				 * workers should update in_port to hold the
511 				 * output port value
512 				 */
513 				outp = bufs[i]->port;
514 				/* skip ports that are not enabled */
515 				if ((enabled_port_mask & (1 << outp)) == 0)
516 					continue;
517 
518 				outbuf = &tx_buffers[outp];
519 				outbuf->mbufs[outbuf->count++] = bufs[i];
520 				if (outbuf->count == BURST_SIZE_TX)
521 					flush_one_port(outbuf, outp);
522 			}
523 		}
524 	}
525 	if (power_lib_initialised)
526 		rte_power_exit(rte_lcore_id());
527 	printf("\nCore %u exiting tx task.\n", rte_lcore_id());
528 	return 0;
529 }
530 
531 static void
532 int_handler(int sig_num)
533 {
534 	printf("Exiting on signal %d\n", sig_num);
535 	/* set quit flag for rx thread to exit */
536 	quit_signal_rx = 1;
537 }
538 
539 static void
540 print_stats(void)
541 {
542 	struct rte_eth_stats eth_stats;
543 	unsigned int i, j;
544 
545 	RTE_ETH_FOREACH_DEV(i) {
546 		rte_eth_stats_get(i, &eth_stats);
547 		app_stats.port_rx_pkts[i] = eth_stats.ipackets;
548 		app_stats.port_tx_pkts[i] = eth_stats.opackets;
549 	}
550 
551 	printf("\n\nRX Thread:\n");
552 	RTE_ETH_FOREACH_DEV(i) {
553 		printf("Port %u Pktsin : %5.2f\n", i,
554 				(app_stats.port_rx_pkts[i] -
555 				prev_app_stats.port_rx_pkts[i])/1000000.0);
556 		prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i];
557 	}
558 	printf(" - Received:    %5.2f\n",
559 			(app_stats.rx.rx_pkts -
560 			prev_app_stats.rx.rx_pkts)/1000000.0);
561 	printf(" - Returned:    %5.2f\n",
562 			(app_stats.rx.returned_pkts -
563 			prev_app_stats.rx.returned_pkts)/1000000.0);
564 	printf(" - Enqueued:    %5.2f\n",
565 			(app_stats.rx.enqueued_pkts -
566 			prev_app_stats.rx.enqueued_pkts)/1000000.0);
567 	printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
568 			(app_stats.rx.enqdrop_pkts -
569 			prev_app_stats.rx.enqdrop_pkts)/1000000.0,
570 			ANSI_COLOR_RESET);
571 
572 	if (!enable_lcore_rx_distributor) {
573 		printf("Distributor thread:\n");
574 		printf(" - In:          %5.2f\n",
575 				(app_stats.dist.in_pkts -
576 				prev_app_stats.dist.in_pkts)/1000000.0);
577 		printf(" - Returned:    %5.2f\n",
578 				(app_stats.dist.ret_pkts -
579 				prev_app_stats.dist.ret_pkts)/1000000.0);
580 		printf(" - Sent:        %5.2f\n",
581 				(app_stats.dist.sent_pkts -
582 				prev_app_stats.dist.sent_pkts)/1000000.0);
583 		printf(" - Dropped      %s%5.2f%s\n", ANSI_COLOR_RED,
584 				(app_stats.dist.enqdrop_pkts -
585 				prev_app_stats.dist.enqdrop_pkts)/1000000.0,
586 				ANSI_COLOR_RESET);
587 	}
588 
589 	printf("TX thread:\n");
590 	printf(" - Dequeued:    %5.2f\n",
591 			(app_stats.tx.dequeue_pkts -
592 			prev_app_stats.tx.dequeue_pkts)/1000000.0);
593 	RTE_ETH_FOREACH_DEV(i) {
594 		printf("Port %u Pktsout: %5.2f\n",
595 				i, (app_stats.port_tx_pkts[i] -
596 				prev_app_stats.port_tx_pkts[i])/1000000.0);
597 		prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i];
598 	}
599 	printf(" - Transmitted: %5.2f\n",
600 			(app_stats.tx.tx_pkts -
601 			prev_app_stats.tx.tx_pkts)/1000000.0);
602 	printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
603 			(app_stats.tx.enqdrop_pkts -
604 			prev_app_stats.tx.enqdrop_pkts)/1000000.0,
605 			ANSI_COLOR_RESET);
606 
607 	prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts;
608 	prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts;
609 	prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts;
610 	prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts;
611 	prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts;
612 	prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts;
613 	prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts;
614 	prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts;
615 	prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts;
616 	prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts;
617 	prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts;
618 
619 	for (i = 0; i < num_workers; i++) {
620 		printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i,
621 				(app_stats.worker_pkts[i] -
622 				prev_app_stats.worker_pkts[i])/1000000.0);
623 		for (j = 0; j < 8; j++) {
624 			printf("%"PRIu64" ", app_stats.worker_bursts[i][j]);
625 			app_stats.worker_bursts[i][j] = 0;
626 		}
627 		printf("\n");
628 		prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i];
629 	}
630 }
631 
632 static int
633 lcore_worker(struct lcore_params *p)
634 {
635 	struct rte_distributor *d = p->d;
636 	const unsigned id = p->worker_id;
637 	unsigned int num = 0;
638 	unsigned int i;
639 
640 	/*
641 	 * for single port, xor_val will be zero so we won't modify the output
642 	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
643 	 */
644 	const unsigned xor_val = (rte_eth_dev_count_avail() > 1);
645 	alignas(RTE_CACHE_LINE_SIZE) struct rte_mbuf *buf[8];
646 
647 	for (i = 0; i < 8; i++)
648 		buf[i] = NULL;
649 
650 	app_stats.worker_pkts[p->worker_id] = 1;
651 
652 	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
653 	while (!quit_signal_work) {
654 		num = rte_distributor_get_pkt(d, id, buf, buf, num);
655 		/* Do a little bit of work for each packet */
656 		for (i = 0; i < num; i++) {
657 			uint64_t t = rte_rdtsc()+100;
658 
659 			while (rte_rdtsc() < t)
660 				rte_pause();
661 			buf[i]->port ^= xor_val;
662 		}
663 
664 		app_stats.worker_pkts[p->worker_id] += num;
665 		if (num > 0)
666 			app_stats.worker_bursts[p->worker_id][num-1]++;
667 	}
668 	if (power_lib_initialised)
669 		rte_power_exit(rte_lcore_id());
670 	rte_free(p);
671 	return 0;
672 }
673 
674 static int
675 init_power_library(void)
676 {
677 	int ret = 0, lcore_id;
678 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
679 		/* init power management library */
680 		ret = rte_power_init(lcore_id);
681 		if (ret) {
682 			fprintf(stderr,
683 				"Library initialization failed on core %u\n",
684 				lcore_id);
685 			/*
686 			 * Return on first failure, we'll fall back
687 			 * to non-power operation
688 			 */
689 			return ret;
690 		}
691 	}
692 	return ret;
693 }
694 
695 /* display usage */
696 static void
697 print_usage(const char *prgname)
698 {
699 	printf("%s [EAL options] -- -p PORTMASK [-c]\n"
700 			"  -p PORTMASK: hexadecimal bitmask of ports to configure\n"
701 			"  -c: Combines the RX core with the distribution core\n",
702 			prgname);
703 }
704 
705 static int
706 parse_portmask(const char *portmask)
707 {
708 	char *end = NULL;
709 	unsigned long pm;
710 
711 	/* parse hexadecimal string */
712 	pm = strtoul(portmask, &end, 16);
713 	if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0'))
714 		return 0;
715 
716 	return pm;
717 }
718 
719 /* Parse the argument given in the command line of the application */
720 static int
721 parse_args(int argc, char **argv)
722 {
723 	int opt;
724 	char **argvopt;
725 	int option_index;
726 	char *prgname = argv[0];
727 	static struct option lgopts[] = {
728 		{NULL, 0, 0, 0}
729 	};
730 
731 	argvopt = argv;
732 	enable_lcore_rx_distributor = false;
733 	while ((opt = getopt_long(argc, argvopt, "cp:",
734 			lgopts, &option_index)) != EOF) {
735 
736 		switch (opt) {
737 		/* portmask */
738 		case 'p':
739 			enabled_port_mask = parse_portmask(optarg);
740 			if (enabled_port_mask == 0) {
741 				printf("invalid portmask\n");
742 				print_usage(prgname);
743 				return -1;
744 			}
745 			break;
746 
747 		case 'c':
748 			enable_lcore_rx_distributor = true;
749 			break;
750 
751 		default:
752 			print_usage(prgname);
753 			return -1;
754 		}
755 	}
756 
757 	if (optind <= 1) {
758 		print_usage(prgname);
759 		return -1;
760 	}
761 
762 	argv[optind-1] = prgname;
763 
764 	optind = 1; /* reset getopt lib */
765 	return 0;
766 }
767 
768 /* Main function, does initialization and calls the per-lcore functions */
769 int
770 main(int argc, char *argv[])
771 {
772 	struct rte_mempool *mbuf_pool;
773 	struct rte_distributor *d;
774 	struct rte_ring *dist_tx_ring;
775 	struct rte_ring *rx_dist_ring;
776 	struct rte_power_core_capabilities lcore_cap;
777 	unsigned int lcore_id, worker_id = 0;
778 	int distr_core_id = -1, rx_core_id = -1, tx_core_id = -1;
779 	unsigned nb_ports;
780 	unsigned int min_cores;
781 	uint16_t portid;
782 	uint16_t nb_ports_available;
783 	uint64_t t, freq;
784 
785 	/* catch ctrl-c so we can print on exit */
786 	signal(SIGINT, int_handler);
787 
788 	/* init EAL */
789 	int ret = rte_eal_init(argc, argv);
790 	if (ret < 0)
791 		rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
792 	argc -= ret;
793 	argv += ret;
794 
795 	/* parse application arguments (after the EAL ones) */
796 	ret = parse_args(argc, argv);
797 	if (ret < 0)
798 		rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
799 
800 	if (enable_lcore_rx_distributor) {
801 	/* RX and distributor combined, 3 fixed function cores (stat, TX, at least 1 worker) */
802 		min_cores = 4;
803 		num_workers = rte_lcore_count() - 3;
804 	} else {
805 	/* separate RX and distributor, 3 fixed function cores (stat, TX, at least 1 worker) */
806 		min_cores = 5;
807 		num_workers = rte_lcore_count() - 4;
808 	}
809 
810 	if (rte_lcore_count() < min_cores)
811 		rte_exit(EXIT_FAILURE, "Error, This application needs at "
812 				"least 4 logical cores to run:\n"
813 				"1 lcore for stats (can be core 0)\n"
814 				"1 or 2 lcore for packet RX and distribution\n"
815 				"1 lcore for packet TX\n"
816 				"and at least 1 lcore for worker threads\n");
817 
818 	if (init_power_library() == 0)
819 		power_lib_initialised = 1;
820 
821 	nb_ports = rte_eth_dev_count_avail();
822 	if (nb_ports == 0)
823 		rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n");
824 	if (nb_ports != 1 && (nb_ports & 1))
825 		rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except "
826 				"when using a single port\n");
827 
828 	mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
829 		NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0,
830 		RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
831 	if (mbuf_pool == NULL)
832 		rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
833 	nb_ports_available = nb_ports;
834 
835 	/* initialize all ports */
836 	RTE_ETH_FOREACH_DEV(portid) {
837 		/* skip ports that are not enabled */
838 		if ((enabled_port_mask & (1 << portid)) == 0) {
839 			printf("\nSkipping disabled port %d\n", portid);
840 			nb_ports_available--;
841 			continue;
842 		}
843 		/* init port */
844 		printf("Initializing port %u... done\n", portid);
845 
846 		if (port_init(portid, mbuf_pool) != 0)
847 			rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n",
848 					portid);
849 	}
850 
851 	if (!nb_ports_available) {
852 		rte_exit(EXIT_FAILURE,
853 				"All available ports are disabled. Please set portmask.\n");
854 	}
855 
856 	d = rte_distributor_create("PKT_DIST", rte_socket_id(),
857 			num_workers,
858 			RTE_DIST_ALG_BURST);
859 	if (d == NULL)
860 		rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
861 
862 	/*
863 	 * scheduler ring is read by the transmitter core, and written to
864 	 * by scheduler core
865 	 */
866 	dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
867 			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
868 	if (dist_tx_ring == NULL)
869 		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
870 
871 	rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
872 			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
873 	if (rx_dist_ring == NULL)
874 		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
875 
876 	if (power_lib_initialised) {
877 		/*
878 		 * Here we'll pre-assign lcore ids to the rx, tx and
879 		 * distributor workloads if there's higher frequency
880 		 * on those cores e.g. if Turbo Boost is enabled.
881 		 * It's also worth mentioning that it will assign cores in a
882 		 * specific order, so that if there's less than three
883 		 * available, the higher frequency cores will go to the
884 		 * distributor first, then rx, then tx.
885 		 */
886 		RTE_LCORE_FOREACH_WORKER(lcore_id) {
887 
888 			rte_power_get_capabilities(lcore_id, &lcore_cap);
889 
890 			if (lcore_cap.priority != 1)
891 				continue;
892 
893 			if (distr_core_id < 0 && !enable_lcore_rx_distributor) {
894 				distr_core_id = lcore_id;
895 				printf("Distributor on priority core %d\n",
896 					lcore_id);
897 				continue;
898 			}
899 			if (rx_core_id < 0) {
900 				rx_core_id = lcore_id;
901 				printf("Rx on priority core %d\n",
902 					lcore_id);
903 				continue;
904 			}
905 			if (tx_core_id < 0) {
906 				tx_core_id = lcore_id;
907 				printf("Tx on priority core %d\n",
908 					lcore_id);
909 				continue;
910 			}
911 		}
912 	}
913 
914 	/*
915 	 * If there's any of the key workloads left without an lcore_id
916 	 * after the high performing core assignment above, pre-assign
917 	 * them here.
918 	 */
919 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
920 		if (lcore_id == (unsigned int)distr_core_id ||
921 				lcore_id == (unsigned int)rx_core_id ||
922 				lcore_id == (unsigned int)tx_core_id)
923 			continue;
924 		if (distr_core_id < 0 && !enable_lcore_rx_distributor) {
925 			distr_core_id = lcore_id;
926 			printf("Distributor on core %d\n", lcore_id);
927 			continue;
928 		}
929 		if (rx_core_id < 0) {
930 			rx_core_id = lcore_id;
931 			printf("Rx on core %d\n", lcore_id);
932 			continue;
933 		}
934 		if (tx_core_id < 0) {
935 			tx_core_id = lcore_id;
936 			printf("Tx on core %d\n", lcore_id);
937 			continue;
938 		}
939 	}
940 
941 	if (enable_lcore_rx_distributor)
942 		printf(" tx id %d, rx id %d\n",
943 			tx_core_id,
944 			rx_core_id);
945 	else
946 		printf(" tx id %d, dist id %d, rx id %d\n",
947 			tx_core_id,
948 			distr_core_id,
949 			rx_core_id);
950 
951 	/*
952 	 * Kick off all the worker threads first, avoiding the pre-assigned
953 	 * lcore_ids for tx, rx and distributor workloads.
954 	 */
955 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
956 		if (lcore_id == (unsigned int)distr_core_id ||
957 				lcore_id == (unsigned int)rx_core_id ||
958 				lcore_id == (unsigned int)tx_core_id)
959 			continue;
960 		printf("Starting thread %d as worker, lcore_id %d\n",
961 				worker_id, lcore_id);
962 		struct lcore_params *p =
963 			rte_malloc(NULL, sizeof(*p), 0);
964 		if (!p)
965 			rte_panic("malloc failure\n");
966 		*p = (struct lcore_params){worker_id++, d, rx_dist_ring,
967 			dist_tx_ring, mbuf_pool};
968 
969 		rte_eal_remote_launch((lcore_function_t *)lcore_worker,
970 				p, lcore_id);
971 	}
972 
973 	/* Start tx core */
974 	rte_eal_remote_launch((lcore_function_t *)lcore_tx,
975 			dist_tx_ring, tx_core_id);
976 
977 	/* Start distributor core */
978 	struct lcore_params *pd = NULL;
979 	if (!enable_lcore_rx_distributor) {
980 		pd = rte_malloc(NULL, sizeof(*pd), 0);
981 		if (!pd)
982 			rte_panic("malloc failure\n");
983 		*pd = (struct lcore_params){worker_id++, d,
984 			rx_dist_ring, dist_tx_ring, mbuf_pool};
985 		rte_eal_remote_launch((lcore_function_t *)lcore_distributor,
986 				pd, distr_core_id);
987 	}
988 
989 	/* Start rx core */
990 	struct lcore_params *pr =
991 		rte_malloc(NULL, sizeof(*pr), 0);
992 	if (!pr)
993 		rte_panic("malloc failure\n");
994 	*pr = (struct lcore_params){worker_id++, d, rx_dist_ring,
995 		dist_tx_ring, mbuf_pool};
996 	if (enable_lcore_rx_distributor)
997 		rte_eal_remote_launch((lcore_function_t *)lcore_rx_and_distributor,
998 				pr, rx_core_id);
999 	else
1000 		rte_eal_remote_launch((lcore_function_t *)lcore_rx,
1001 				pr, rx_core_id);
1002 
1003 	freq = rte_get_timer_hz();
1004 	t = rte_rdtsc() + freq;
1005 	while (!quit_signal) {
1006 		if (t < rte_rdtsc()) {
1007 			print_stats();
1008 			t = rte_rdtsc() + freq;
1009 		}
1010 		usleep(1000);
1011 	}
1012 
1013 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
1014 		if (rte_eal_wait_lcore(lcore_id) < 0)
1015 			return -1;
1016 	}
1017 
1018 	print_stats();
1019 
1020 	rte_free(pd);
1021 	rte_free(pr);
1022 
1023 	/* clean up the EAL */
1024 	rte_eal_cleanup();
1025 
1026 	return 0;
1027 }
1028