xref: /dpdk/examples/eventdev_pipeline/main.c (revision 2717246ecd7d27125a346a2c5c55b53a9c251a93)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4 
5 #include <getopt.h>
6 #include <stdint.h>
7 #include <stdio.h>
8 #include <signal.h>
9 #include <sched.h>
10 
11 #include "pipeline_common.h"
12 
13 struct config_data cdata = {
14 	.num_packets = (1L << 25), /* do ~32M packets */
15 	.num_fids = 512,
16 	.queue_type = RTE_SCHED_TYPE_ATOMIC,
17 	.next_qid = {-1},
18 	.qid = {-1},
19 	.num_stages = 1,
20 	.worker_cq_depth = 16
21 };
22 
23 static bool
24 core_in_use(unsigned int lcore_id) {
25 	return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] ||
26 		fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
27 }
28 
29 static void
30 eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
31 			void *userdata)
32 {
33 	int port_id = (uintptr_t) userdata;
34 	unsigned int _sent = 0;
35 
36 	do {
37 		/* Note: hard-coded TX queue */
38 		_sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
39 					  unsent - _sent);
40 	} while (_sent != unsent);
41 }
42 
43 /*
44  * Parse the coremask given as argument (hexadecimal string) and fill
45  * the global configuration (core role and core count) with the parsed
46  * value.
47  */
48 static int xdigit2val(unsigned char c)
49 {
50 	int val;
51 
52 	if (isdigit(c))
53 		val = c - '0';
54 	else if (isupper(c))
55 		val = c - 'A' + 10;
56 	else
57 		val = c - 'a' + 10;
58 	return val;
59 }
60 
61 static uint64_t
62 parse_coremask(const char *coremask)
63 {
64 	int i, j, idx = 0;
65 	unsigned int count = 0;
66 	char c;
67 	int val;
68 	uint64_t mask = 0;
69 	const int32_t BITS_HEX = 4;
70 
71 	if (coremask == NULL)
72 		return -1;
73 	/* Remove all blank characters ahead and after .
74 	 * Remove 0x/0X if exists.
75 	 */
76 	while (isblank(*coremask))
77 		coremask++;
78 	if (coremask[0] == '0' && ((coremask[1] == 'x')
79 		|| (coremask[1] == 'X')))
80 		coremask += 2;
81 	i = strlen(coremask);
82 	while ((i > 0) && isblank(coremask[i - 1]))
83 		i--;
84 	if (i == 0)
85 		return -1;
86 
87 	for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
88 		c = coremask[i];
89 		if (isxdigit(c) == 0) {
90 			/* invalid characters */
91 			return -1;
92 		}
93 		val = xdigit2val(c);
94 		for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
95 			if ((1 << j) & val) {
96 				mask |= (1UL << idx);
97 				count++;
98 			}
99 		}
100 	}
101 	for (; i >= 0; i--)
102 		if (coremask[i] != '0')
103 			return -1;
104 	if (count == 0)
105 		return -1;
106 	return mask;
107 }
108 
109 static struct option long_options[] = {
110 	{"workers", required_argument, 0, 'w'},
111 	{"packets", required_argument, 0, 'n'},
112 	{"atomic-flows", required_argument, 0, 'f'},
113 	{"num_stages", required_argument, 0, 's'},
114 	{"rx-mask", required_argument, 0, 'r'},
115 	{"tx-mask", required_argument, 0, 't'},
116 	{"sched-mask", required_argument, 0, 'e'},
117 	{"cq-depth", required_argument, 0, 'c'},
118 	{"work-cycles", required_argument, 0, 'W'},
119 	{"mempool-size", required_argument, 0, 'm'},
120 	{"queue-priority", no_argument, 0, 'P'},
121 	{"parallel", no_argument, 0, 'p'},
122 	{"ordered", no_argument, 0, 'o'},
123 	{"quiet", no_argument, 0, 'q'},
124 	{"use-atq", no_argument, 0, 'a'},
125 	{"dump", no_argument, 0, 'D'},
126 	{0, 0, 0, 0}
127 };
128 
129 static void
130 usage(void)
131 {
132 	const char *usage_str =
133 		"  Usage: eventdev_demo [options]\n"
134 		"  Options:\n"
135 		"  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
136 		"  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"
137 		"  -s, --num_stages=N           Use N atomic stages (default 1)\n"
138 		"  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
139 		"  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
140 		"  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
141 		"  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
142 		"  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
143 		"  -W  --work-cycles=N          Worker cycles (default 0)\n"
144 		"  -P  --queue-priority         Enable scheduler queue prioritization\n"
145 		"  -o, --ordered                Use ordered scheduling\n"
146 		"  -p, --parallel               Use parallel scheduling\n"
147 		"  -q, --quiet                  Minimize printed output\n"
148 		"  -a, --use-atq                Use all type queues\n"
149 		"  -m, --mempool-size=N         Dictate the mempool size\n"
150 		"  -D, --dump                   Print detailed statistics before exit"
151 		"\n";
152 	fprintf(stderr, "%s", usage_str);
153 	exit(1);
154 }
155 
156 static void
157 parse_app_args(int argc, char **argv)
158 {
159 	/* Parse cli options*/
160 	int option_index;
161 	int c;
162 	opterr = 0;
163 	uint64_t rx_lcore_mask = 0;
164 	uint64_t tx_lcore_mask = 0;
165 	uint64_t sched_lcore_mask = 0;
166 	uint64_t worker_lcore_mask = 0;
167 	int i;
168 
169 	for (;;) {
170 		c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:m:paoPqDW:",
171 				long_options, &option_index);
172 		if (c == -1)
173 			break;
174 
175 		int popcnt = 0;
176 		switch (c) {
177 		case 'n':
178 			cdata.num_packets = (int64_t)atol(optarg);
179 			if (cdata.num_packets == 0)
180 				cdata.num_packets = INT64_MAX;
181 			break;
182 		case 'f':
183 			cdata.num_fids = (unsigned int)atoi(optarg);
184 			break;
185 		case 's':
186 			cdata.num_stages = (unsigned int)atoi(optarg);
187 			break;
188 		case 'c':
189 			cdata.worker_cq_depth = (unsigned int)atoi(optarg);
190 			break;
191 		case 'W':
192 			cdata.worker_cycles = (unsigned int)atoi(optarg);
193 			break;
194 		case 'P':
195 			cdata.enable_queue_priorities = 1;
196 			break;
197 		case 'o':
198 			cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
199 			break;
200 		case 'p':
201 			cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
202 			break;
203 		case 'a':
204 			cdata.all_type_queues = 1;
205 			break;
206 		case 'q':
207 			cdata.quiet = 1;
208 			break;
209 		case 'D':
210 			cdata.dump_dev = 1;
211 			break;
212 		case 'w':
213 			worker_lcore_mask = parse_coremask(optarg);
214 			break;
215 		case 'r':
216 			rx_lcore_mask = parse_coremask(optarg);
217 			popcnt = __builtin_popcountll(rx_lcore_mask);
218 			fdata->rx_single = (popcnt == 1);
219 			break;
220 		case 't':
221 			tx_lcore_mask = parse_coremask(optarg);
222 			popcnt = __builtin_popcountll(tx_lcore_mask);
223 			fdata->tx_single = (popcnt == 1);
224 			break;
225 		case 'e':
226 			sched_lcore_mask = parse_coremask(optarg);
227 			popcnt = __builtin_popcountll(sched_lcore_mask);
228 			fdata->sched_single = (popcnt == 1);
229 			break;
230 		case 'm':
231 			cdata.num_mbuf = (uint64_t)atol(optarg);
232 			break;
233 		default:
234 			usage();
235 		}
236 	}
237 
238 	cdata.worker_lcore_mask = worker_lcore_mask;
239 	cdata.sched_lcore_mask = sched_lcore_mask;
240 	cdata.rx_lcore_mask = rx_lcore_mask;
241 	cdata.tx_lcore_mask = tx_lcore_mask;
242 
243 	if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES)
244 		usage();
245 
246 	for (i = 0; i < MAX_NUM_CORE; i++) {
247 		fdata->rx_core[i] = !!(rx_lcore_mask & (1UL << i));
248 		fdata->tx_core[i] = !!(tx_lcore_mask & (1UL << i));
249 		fdata->sched_core[i] = !!(sched_lcore_mask & (1UL << i));
250 		fdata->worker_core[i] = !!(worker_lcore_mask & (1UL << i));
251 
252 		if (fdata->worker_core[i])
253 			cdata.num_workers++;
254 		if (core_in_use(i))
255 			cdata.active_cores++;
256 	}
257 }
258 
259 /*
260  * Initializes a given port using global settings and with the RX buffers
261  * coming from the mbuf_pool passed as a parameter.
262  */
263 static inline int
264 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
265 {
266 	static const struct rte_eth_conf port_conf_default = {
267 		.rxmode = {
268 			.mq_mode = ETH_MQ_RX_RSS,
269 			.max_rx_pkt_len = ETHER_MAX_LEN,
270 		},
271 		.rx_adv_conf = {
272 			.rss_conf = {
273 				.rss_hf = ETH_RSS_IP |
274 					  ETH_RSS_TCP |
275 					  ETH_RSS_UDP,
276 			}
277 		}
278 	};
279 	const uint16_t rx_rings = 1, tx_rings = 1;
280 	const uint16_t rx_ring_size = 512, tx_ring_size = 512;
281 	struct rte_eth_conf port_conf = port_conf_default;
282 	int retval;
283 	uint16_t q;
284 	struct rte_eth_dev_info dev_info;
285 	struct rte_eth_txconf txconf;
286 
287 	if (!rte_eth_dev_is_valid_port(port))
288 		return -1;
289 
290 	rte_eth_dev_info_get(port, &dev_info);
291 	if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
292 		port_conf.txmode.offloads |=
293 			DEV_TX_OFFLOAD_MBUF_FAST_FREE;
294 
295 	port_conf.rx_adv_conf.rss_conf.rss_hf &=
296 		dev_info.flow_type_rss_offloads;
297 	if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
298 			port_conf_default.rx_adv_conf.rss_conf.rss_hf) {
299 		printf("Port %u modified RSS hash function based on hardware support,"
300 			"requested:%#"PRIx64" configured:%#"PRIx64"\n",
301 			port,
302 			port_conf_default.rx_adv_conf.rss_conf.rss_hf,
303 			port_conf.rx_adv_conf.rss_conf.rss_hf);
304 	}
305 
306 	/* Configure the Ethernet device. */
307 	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
308 	if (retval != 0)
309 		return retval;
310 
311 	/* Allocate and set up 1 RX queue per Ethernet port. */
312 	for (q = 0; q < rx_rings; q++) {
313 		retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
314 				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
315 		if (retval < 0)
316 			return retval;
317 	}
318 
319 	txconf = dev_info.default_txconf;
320 	txconf.offloads = port_conf_default.txmode.offloads;
321 	/* Allocate and set up 1 TX queue per Ethernet port. */
322 	for (q = 0; q < tx_rings; q++) {
323 		retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
324 				rte_eth_dev_socket_id(port), &txconf);
325 		if (retval < 0)
326 			return retval;
327 	}
328 
329 	/* Start the Ethernet port. */
330 	retval = rte_eth_dev_start(port);
331 	if (retval < 0)
332 		return retval;
333 
334 	/* Display the port MAC address. */
335 	struct ether_addr addr;
336 	rte_eth_macaddr_get(port, &addr);
337 	printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
338 			   " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
339 			(unsigned int)port,
340 			addr.addr_bytes[0], addr.addr_bytes[1],
341 			addr.addr_bytes[2], addr.addr_bytes[3],
342 			addr.addr_bytes[4], addr.addr_bytes[5]);
343 
344 	/* Enable RX in promiscuous mode for the Ethernet device. */
345 	rte_eth_promiscuous_enable(port);
346 
347 	return 0;
348 }
349 
350 static int
351 init_ports(uint16_t num_ports)
352 {
353 	uint16_t portid, i;
354 
355 	if (!cdata.num_mbuf)
356 		cdata.num_mbuf = 16384 * num_ports;
357 
358 	struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
359 			/* mbufs */ cdata.num_mbuf,
360 			/* cache_size */ 512,
361 			/* priv_size*/ 0,
362 			/* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
363 			rte_socket_id());
364 
365 	RTE_ETH_FOREACH_DEV(portid)
366 		if (port_init(portid, mp) != 0)
367 			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
368 					portid);
369 
370 	RTE_ETH_FOREACH_DEV(i) {
371 		void *userdata = (void *)(uintptr_t) i;
372 		fdata->tx_buf[i] =
373 			rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
374 		if (fdata->tx_buf[i] == NULL)
375 			rte_panic("Out of memory\n");
376 		rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
377 		rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
378 						   eth_tx_buffer_retry,
379 						   userdata);
380 	}
381 
382 	return 0;
383 }
384 
385 static void
386 do_capability_setup(uint8_t eventdev_id)
387 {
388 	uint16_t i;
389 	uint8_t mt_unsafe = 0;
390 	uint8_t burst = 0;
391 
392 	RTE_ETH_FOREACH_DEV(i) {
393 		struct rte_eth_dev_info dev_info;
394 		memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
395 
396 		rte_eth_dev_info_get(i, &dev_info);
397 		/* Check if it is safe ask worker to tx. */
398 		mt_unsafe |= !(dev_info.tx_offload_capa &
399 				DEV_TX_OFFLOAD_MT_LOCKFREE);
400 	}
401 
402 	struct rte_event_dev_info eventdev_info;
403 	memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
404 
405 	rte_event_dev_info_get(eventdev_id, &eventdev_info);
406 	burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
407 		0;
408 
409 	if (mt_unsafe)
410 		set_worker_generic_setup_data(&fdata->cap, burst);
411 	else
412 		set_worker_tx_setup_data(&fdata->cap, burst);
413 }
414 
415 static void
416 signal_handler(int signum)
417 {
418 	if (fdata->done)
419 		rte_exit(1, "Exiting on signal %d\n", signum);
420 	if (signum == SIGINT || signum == SIGTERM) {
421 		printf("\n\nSignal %d received, preparing to exit...\n",
422 				signum);
423 		fdata->done = 1;
424 	}
425 	if (signum == SIGTSTP)
426 		rte_event_dev_dump(0, stdout);
427 }
428 
429 static inline uint64_t
430 port_stat(int dev_id, int32_t p)
431 {
432 	char statname[64];
433 	snprintf(statname, sizeof(statname), "port_%u_rx", p);
434 	return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL);
435 }
436 
437 int
438 main(int argc, char **argv)
439 {
440 	struct worker_data *worker_data;
441 	uint16_t num_ports;
442 	int lcore_id;
443 	int err;
444 
445 	signal(SIGINT, signal_handler);
446 	signal(SIGTERM, signal_handler);
447 	signal(SIGTSTP, signal_handler);
448 
449 	err = rte_eal_init(argc, argv);
450 	if (err < 0)
451 		rte_panic("Invalid EAL arguments\n");
452 
453 	argc -= err;
454 	argv += err;
455 
456 	fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0);
457 	if (fdata == NULL)
458 		rte_panic("Out of memory\n");
459 
460 	/* Parse cli options*/
461 	parse_app_args(argc, argv);
462 
463 	num_ports = rte_eth_dev_count_avail();
464 	if (num_ports == 0)
465 		rte_panic("No ethernet ports found\n");
466 
467 	const unsigned int cores_needed = cdata.active_cores;
468 
469 	if (!cdata.quiet) {
470 		printf("  Config:\n");
471 		printf("\tports: %u\n", num_ports);
472 		printf("\tworkers: %u\n", cdata.num_workers);
473 		printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
474 		printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
475 		if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
476 			printf("\tqid0 type: ordered\n");
477 		if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
478 			printf("\tqid0 type: atomic\n");
479 		printf("\tCores available: %u\n", rte_lcore_count());
480 		printf("\tCores used: %u\n", cores_needed);
481 	}
482 
483 	if (rte_lcore_count() < cores_needed)
484 		rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
485 				cores_needed);
486 
487 	const unsigned int ndevs = rte_event_dev_count();
488 	if (ndevs == 0)
489 		rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
490 	if (ndevs > 1)
491 		fprintf(stderr, "Warning: More than one eventdev, using idx 0");
492 
493 
494 	do_capability_setup(0);
495 	fdata->cap.check_opt();
496 
497 	worker_data = rte_calloc(0, cdata.num_workers,
498 			sizeof(worker_data[0]), 0);
499 	if (worker_data == NULL)
500 		rte_panic("rte_calloc failed\n");
501 
502 	int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data);
503 	if (dev_id < 0)
504 		rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
505 
506 	init_ports(num_ports);
507 	fdata->cap.adptr_setup(num_ports);
508 
509 	int worker_idx = 0;
510 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
511 		if (lcore_id >= MAX_NUM_CORE)
512 			break;
513 
514 		if (!fdata->rx_core[lcore_id] &&
515 			!fdata->worker_core[lcore_id] &&
516 			!fdata->tx_core[lcore_id] &&
517 			!fdata->sched_core[lcore_id])
518 			continue;
519 
520 		if (fdata->rx_core[lcore_id])
521 			printf(
522 				"[%s()] lcore %d executing NIC Rx\n",
523 				__func__, lcore_id);
524 
525 		if (fdata->tx_core[lcore_id])
526 			printf(
527 				"[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
528 				__func__, lcore_id, cons_data.port_id);
529 
530 		if (fdata->sched_core[lcore_id])
531 			printf("[%s()] lcore %d executing scheduler\n",
532 					__func__, lcore_id);
533 
534 		if (fdata->worker_core[lcore_id])
535 			printf(
536 				"[%s()] lcore %d executing worker, using eventdev port %u\n",
537 				__func__, lcore_id,
538 				worker_data[worker_idx].port_id);
539 
540 		err = rte_eal_remote_launch(fdata->cap.worker,
541 				&worker_data[worker_idx], lcore_id);
542 		if (err) {
543 			rte_panic("Failed to launch worker on core %d\n",
544 					lcore_id);
545 			continue;
546 		}
547 		if (fdata->worker_core[lcore_id])
548 			worker_idx++;
549 	}
550 
551 	lcore_id = rte_lcore_id();
552 
553 	if (core_in_use(lcore_id))
554 		fdata->cap.worker(&worker_data[worker_idx++]);
555 
556 	rte_eal_mp_wait_lcore();
557 
558 	if (cdata.dump_dev)
559 		rte_event_dev_dump(dev_id, stdout);
560 
561 	if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
562 			(uint64_t)-ENOTSUP)) {
563 		printf("\nPort Workload distribution:\n");
564 		uint32_t i;
565 		uint64_t tot_pkts = 0;
566 		uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
567 		for (i = 0; i < cdata.num_workers; i++) {
568 			pkts_per_wkr[i] =
569 				port_stat(dev_id, worker_data[i].port_id);
570 			tot_pkts += pkts_per_wkr[i];
571 		}
572 		for (i = 0; i < cdata.num_workers; i++) {
573 			float pc = pkts_per_wkr[i]  * 100 /
574 				((float)tot_pkts);
575 			printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
576 					i, pc, pkts_per_wkr[i]);
577 		}
578 
579 	}
580 
581 	return 0;
582 }
583