xref: /dpdk/examples/eventdev_pipeline/main.c (revision 198b5448433ed329becaf47003faf038132fbb7f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4 
5 #include <getopt.h>
6 #include <stdint.h>
7 #include <stdio.h>
8 #include <signal.h>
9 #include <sched.h>
10 
11 #include "pipeline_common.h"
12 
13 struct fastpath_data *fdata;
14 
15 struct config_data cdata = {
16 	.num_packets = (1L << 25), /* do ~32M packets */
17 	.num_fids = 512,
18 	.queue_type = RTE_SCHED_TYPE_ATOMIC,
19 	.next_qid = {-1},
20 	.qid = {-1},
21 	.num_stages = 1,
22 	.worker_cq_depth = 16
23 };
24 
25 static void
26 dump_core_info(unsigned int lcore_id, struct worker_data *data,
27 		unsigned int worker_idx)
28 {
29 	if (fdata->rx_core[lcore_id])
30 		printf(
31 			"[%s()] lcore %d executing NIC Rx\n",
32 			__func__, lcore_id);
33 
34 	if (fdata->tx_core[lcore_id])
35 		printf(
36 			"[%s()] lcore %d executing NIC Tx\n",
37 			__func__, lcore_id);
38 
39 	if (fdata->sched_core[lcore_id])
40 		printf(
41 			"[%s()] lcore %d executing scheduler\n",
42 			__func__, lcore_id);
43 
44 	if (fdata->worker_core[lcore_id])
45 		printf(
46 			"[%s()] lcore %d executing worker, using eventdev port %u\n",
47 			__func__, lcore_id,
48 			data[worker_idx].port_id);
49 }
50 
51 static bool
52 core_in_use(unsigned int lcore_id) {
53 	return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] ||
54 		fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
55 }
56 
57 /*
58  * Parse the coremask given as argument (hexadecimal string) and fill
59  * the global configuration (core role and core count) with the parsed
60  * value.
61  */
62 static int xdigit2val(unsigned char c)
63 {
64 	int val;
65 
66 	if (isdigit(c))
67 		val = c - '0';
68 	else if (isupper(c))
69 		val = c - 'A' + 10;
70 	else
71 		val = c - 'a' + 10;
72 	return val;
73 }
74 
75 static uint64_t
76 parse_coremask(const char *coremask)
77 {
78 	int i, j, idx = 0;
79 	unsigned int count = 0;
80 	char c;
81 	int val;
82 	uint64_t mask = 0;
83 	const int32_t BITS_HEX = 4;
84 
85 	if (coremask == NULL)
86 		return -1;
87 	/* Remove all blank characters ahead and after .
88 	 * Remove 0x/0X if exists.
89 	 */
90 	while (isblank(*coremask))
91 		coremask++;
92 	if (coremask[0] == '0' && ((coremask[1] == 'x')
93 		|| (coremask[1] == 'X')))
94 		coremask += 2;
95 	i = strlen(coremask);
96 	while ((i > 0) && isblank(coremask[i - 1]))
97 		i--;
98 	if (i == 0)
99 		return -1;
100 
101 	for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
102 		c = coremask[i];
103 		if (isxdigit(c) == 0) {
104 			/* invalid characters */
105 			return -1;
106 		}
107 		val = xdigit2val(c);
108 		for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
109 			if ((1 << j) & val) {
110 				mask |= (1ULL << idx);
111 				count++;
112 			}
113 		}
114 	}
115 	for (; i >= 0; i--)
116 		if (coremask[i] != '0')
117 			return -1;
118 	if (count == 0)
119 		return -1;
120 	return mask;
121 }
122 
123 static struct option long_options[] = {
124 	{"workers", required_argument, 0, 'w'},
125 	{"packets", required_argument, 0, 'n'},
126 	{"atomic-flows", required_argument, 0, 'f'},
127 	{"num_stages", required_argument, 0, 's'},
128 	{"rx-mask", required_argument, 0, 'r'},
129 	{"tx-mask", required_argument, 0, 't'},
130 	{"sched-mask", required_argument, 0, 'e'},
131 	{"cq-depth", required_argument, 0, 'c'},
132 	{"work-cycles", required_argument, 0, 'W'},
133 	{"mempool-size", required_argument, 0, 'm'},
134 	{"queue-priority", no_argument, 0, 'P'},
135 	{"parallel", no_argument, 0, 'p'},
136 	{"ordered", no_argument, 0, 'o'},
137 	{"quiet", no_argument, 0, 'q'},
138 	{"use-atq", no_argument, 0, 'a'},
139 	{"dump", no_argument, 0, 'D'},
140 	{0, 0, 0, 0}
141 };
142 
143 static void
144 usage(void)
145 {
146 	const char *usage_str =
147 		"  Usage: eventdev_demo [options]\n"
148 		"  Options:\n"
149 		"  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
150 		"  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"
151 		"  -s, --num_stages=N           Use N atomic stages (default 1)\n"
152 		"  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
153 		"  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
154 		"  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
155 		"  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
156 		"  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
157 		"  -W  --work-cycles=N          Worker cycles (default 0)\n"
158 		"  -P  --queue-priority         Enable scheduler queue prioritization\n"
159 		"  -o, --ordered                Use ordered scheduling\n"
160 		"  -p, --parallel               Use parallel scheduling\n"
161 		"  -q, --quiet                  Minimize printed output\n"
162 		"  -a, --use-atq                Use all type queues\n"
163 		"  -m, --mempool-size=N         Dictate the mempool size\n"
164 		"  -D, --dump                   Print detailed statistics before exit"
165 		"\n";
166 	fprintf(stderr, "%s", usage_str);
167 	exit(1);
168 }
169 
170 static void
171 parse_app_args(int argc, char **argv)
172 {
173 	/* Parse cli options*/
174 	int option_index;
175 	int c;
176 	opterr = 0;
177 	uint64_t rx_lcore_mask = 0;
178 	uint64_t tx_lcore_mask = 0;
179 	uint64_t sched_lcore_mask = 0;
180 	uint64_t worker_lcore_mask = 0;
181 	int i;
182 
183 	for (;;) {
184 		c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:m:paoPqDW:",
185 				long_options, &option_index);
186 		if (c == -1)
187 			break;
188 
189 		int popcnt = 0;
190 		switch (c) {
191 		case 'n':
192 			cdata.num_packets = (int64_t)atol(optarg);
193 			if (cdata.num_packets == 0)
194 				cdata.num_packets = INT64_MAX;
195 			break;
196 		case 'f':
197 			cdata.num_fids = (unsigned int)atoi(optarg);
198 			break;
199 		case 's':
200 			cdata.num_stages = (unsigned int)atoi(optarg);
201 			break;
202 		case 'c':
203 			cdata.worker_cq_depth = (unsigned int)atoi(optarg);
204 			break;
205 		case 'W':
206 			cdata.worker_cycles = (unsigned int)atoi(optarg);
207 			break;
208 		case 'P':
209 			cdata.enable_queue_priorities = 1;
210 			break;
211 		case 'o':
212 			cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
213 			break;
214 		case 'p':
215 			cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
216 			break;
217 		case 'a':
218 			cdata.all_type_queues = 1;
219 			break;
220 		case 'q':
221 			cdata.quiet = 1;
222 			break;
223 		case 'D':
224 			cdata.dump_dev = 1;
225 			break;
226 		case 'w':
227 			worker_lcore_mask = parse_coremask(optarg);
228 			break;
229 		case 'r':
230 			rx_lcore_mask = parse_coremask(optarg);
231 			popcnt = __builtin_popcountll(rx_lcore_mask);
232 			fdata->rx_single = (popcnt == 1);
233 			break;
234 		case 't':
235 			tx_lcore_mask = parse_coremask(optarg);
236 			popcnt = __builtin_popcountll(tx_lcore_mask);
237 			fdata->tx_single = (popcnt == 1);
238 			break;
239 		case 'e':
240 			sched_lcore_mask = parse_coremask(optarg);
241 			popcnt = __builtin_popcountll(sched_lcore_mask);
242 			fdata->sched_single = (popcnt == 1);
243 			break;
244 		case 'm':
245 			cdata.num_mbuf = (uint64_t)atol(optarg);
246 			break;
247 		default:
248 			usage();
249 		}
250 	}
251 
252 	cdata.worker_lcore_mask = worker_lcore_mask;
253 	cdata.sched_lcore_mask = sched_lcore_mask;
254 	cdata.rx_lcore_mask = rx_lcore_mask;
255 	cdata.tx_lcore_mask = tx_lcore_mask;
256 
257 	if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES)
258 		usage();
259 
260 	for (i = 0; i < MAX_NUM_CORE; i++) {
261 		fdata->rx_core[i] = !!(rx_lcore_mask & (1ULL << i));
262 		fdata->tx_core[i] = !!(tx_lcore_mask & (1ULL << i));
263 		fdata->sched_core[i] = !!(sched_lcore_mask & (1ULL << i));
264 		fdata->worker_core[i] = !!(worker_lcore_mask & (1ULL << i));
265 
266 		if (fdata->worker_core[i])
267 			cdata.num_workers++;
268 		if (core_in_use(i)) {
269 			if (!rte_lcore_is_enabled(i)) {
270 				printf("lcore %d is not enabled in lcore list\n",
271 					i);
272 				rte_exit(EXIT_FAILURE,
273 					"check lcore params failed\n");
274 			}
275 			cdata.active_cores++;
276 		}
277 	}
278 }
279 
280 static void
281 do_capability_setup(uint8_t eventdev_id)
282 {
283 	int ret;
284 	uint16_t i;
285 	uint8_t generic_pipeline = 0;
286 	uint8_t burst = 0;
287 
288 	RTE_ETH_FOREACH_DEV(i) {
289 		uint32_t caps = 0;
290 
291 		ret = rte_event_eth_tx_adapter_caps_get(eventdev_id, i, &caps);
292 		if (ret)
293 			rte_exit(EXIT_FAILURE,
294 				"Invalid capability for Tx adptr port %d\n", i);
295 		generic_pipeline |= !(caps &
296 				RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
297 	}
298 
299 	struct rte_event_dev_info eventdev_info;
300 	memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
301 
302 	rte_event_dev_info_get(eventdev_id, &eventdev_info);
303 	burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
304 		0;
305 
306 	if (generic_pipeline)
307 		set_worker_generic_setup_data(&fdata->cap, burst);
308 	else
309 		set_worker_tx_enq_setup_data(&fdata->cap, burst);
310 }
311 
312 static void
313 signal_handler(int signum)
314 {
315 	static uint8_t once;
316 	uint16_t portid;
317 
318 	if (fdata->done)
319 		rte_exit(1, "Exiting on signal %d\n", signum);
320 	if ((signum == SIGINT || signum == SIGTERM) && !once) {
321 		printf("\n\nSignal %d received, preparing to exit...\n",
322 				signum);
323 		if (cdata.dump_dev)
324 			rte_event_dev_dump(0, stdout);
325 		once = 1;
326 		fdata->done = 1;
327 		rte_smp_wmb();
328 
329 		RTE_ETH_FOREACH_DEV(portid) {
330 			rte_event_eth_rx_adapter_stop(portid);
331 			rte_event_eth_tx_adapter_stop(portid);
332 			if (rte_eth_dev_stop(portid) < 0)
333 				printf("Failed to stop port %u", portid);
334 		}
335 
336 		rte_eal_mp_wait_lcore();
337 
338 	}
339 	if (signum == SIGTSTP)
340 		rte_event_dev_dump(0, stdout);
341 }
342 
343 static inline uint64_t
344 port_stat(int dev_id, int32_t p)
345 {
346 	char statname[64];
347 	snprintf(statname, sizeof(statname), "port_%u_rx", p);
348 	return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL);
349 }
350 
351 int
352 main(int argc, char **argv)
353 {
354 	struct worker_data *worker_data;
355 	uint16_t num_ports;
356 	uint16_t portid;
357 	int lcore_id;
358 	int err;
359 
360 	signal(SIGINT, signal_handler);
361 	signal(SIGTERM, signal_handler);
362 	signal(SIGTSTP, signal_handler);
363 
364 	err = rte_eal_init(argc, argv);
365 	if (err < 0)
366 		rte_panic("Invalid EAL arguments\n");
367 
368 	argc -= err;
369 	argv += err;
370 
371 	fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0);
372 	if (fdata == NULL)
373 		rte_panic("Out of memory\n");
374 
375 	/* Parse cli options*/
376 	parse_app_args(argc, argv);
377 
378 	num_ports = rte_eth_dev_count_avail();
379 	if (num_ports == 0)
380 		rte_panic("No ethernet ports found\n");
381 
382 	const unsigned int cores_needed = cdata.active_cores;
383 
384 	if (!cdata.quiet) {
385 		printf("  Config:\n");
386 		printf("\tports: %u\n", num_ports);
387 		printf("\tworkers: %u\n", cdata.num_workers);
388 		printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
389 		printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
390 		if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
391 			printf("\tqid0 type: ordered\n");
392 		if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
393 			printf("\tqid0 type: atomic\n");
394 		printf("\tCores available: %u\n", rte_lcore_count());
395 		printf("\tCores used: %u\n", cores_needed);
396 	}
397 
398 	if (rte_lcore_count() < cores_needed)
399 		rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
400 				cores_needed);
401 
402 	const unsigned int ndevs = rte_event_dev_count();
403 	if (ndevs == 0)
404 		rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
405 	if (ndevs > 1)
406 		fprintf(stderr, "Warning: More than one eventdev, using idx 0");
407 
408 
409 	do_capability_setup(0);
410 	fdata->cap.check_opt();
411 
412 	worker_data = rte_calloc(0, cdata.num_workers,
413 			sizeof(worker_data[0]), 0);
414 	if (worker_data == NULL)
415 		rte_panic("rte_calloc failed\n");
416 
417 	int dev_id = fdata->cap.evdev_setup(worker_data);
418 	if (dev_id < 0)
419 		rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
420 
421 	fdata->cap.adptr_setup(num_ports);
422 
423 	/* Start the Ethernet port. */
424 	RTE_ETH_FOREACH_DEV(portid) {
425 		err = rte_eth_dev_start(portid);
426 		if (err < 0)
427 			rte_exit(EXIT_FAILURE, "Error starting ethdev %d\n",
428 					portid);
429 	}
430 
431 	int worker_idx = 0;
432 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
433 		if (lcore_id >= MAX_NUM_CORE)
434 			break;
435 
436 		if (!fdata->rx_core[lcore_id] &&
437 			!fdata->worker_core[lcore_id] &&
438 			!fdata->tx_core[lcore_id] &&
439 			!fdata->sched_core[lcore_id])
440 			continue;
441 
442 		dump_core_info(lcore_id, worker_data, worker_idx);
443 
444 		err = rte_eal_remote_launch(fdata->cap.worker,
445 				&worker_data[worker_idx], lcore_id);
446 		if (err) {
447 			rte_panic("Failed to launch worker on core %d\n",
448 					lcore_id);
449 			continue;
450 		}
451 		if (fdata->worker_core[lcore_id])
452 			worker_idx++;
453 	}
454 
455 	lcore_id = rte_lcore_id();
456 
457 	if (core_in_use(lcore_id)) {
458 		dump_core_info(lcore_id, worker_data, worker_idx);
459 		fdata->cap.worker(&worker_data[worker_idx]);
460 
461 		if (fdata->worker_core[lcore_id])
462 			worker_idx++;
463 	}
464 
465 	rte_eal_mp_wait_lcore();
466 
467 	if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
468 			(uint64_t)-ENOTSUP)) {
469 		printf("\nPort Workload distribution:\n");
470 		uint32_t i;
471 		uint64_t tot_pkts = 0;
472 		uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
473 		for (i = 0; i < cdata.num_workers; i++) {
474 			pkts_per_wkr[i] =
475 				port_stat(dev_id, worker_data[i].port_id);
476 			tot_pkts += pkts_per_wkr[i];
477 		}
478 		for (i = 0; i < cdata.num_workers; i++) {
479 			float pc = pkts_per_wkr[i]  * 100 /
480 				((float)tot_pkts);
481 			printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
482 					i, pc, pkts_per_wkr[i]);
483 		}
484 
485 	}
486 
487 	RTE_ETH_FOREACH_DEV(portid) {
488 		rte_eth_dev_close(portid);
489 	}
490 
491 	rte_event_dev_stop(0);
492 	rte_event_dev_close(0);
493 
494 	rte_eal_cleanup();
495 
496 	return 0;
497 }
498