xref: /dpdk/examples/eventdev_pipeline/main.c (revision 573ef95dc7c9e4991da41fa1fc3d45c5c4076deb)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4 
5 #include <getopt.h>
6 #include <stdint.h>
7 #include <stdio.h>
8 #include <signal.h>
9 #include <sched.h>
10 
11 #include "pipeline_common.h"
12 
13 struct fastpath_data *fdata;
14 
15 struct config_data cdata = {
16 	.num_packets = (1L << 25), /* do ~32M packets */
17 	.num_fids = 512,
18 	.queue_type = RTE_SCHED_TYPE_ATOMIC,
19 	.next_qid = {-1},
20 	.qid = {-1},
21 	.num_stages = 1,
22 	.worker_cq_depth = 16
23 };
24 
25 static bool
26 core_in_use(unsigned int lcore_id) {
27 	return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] ||
28 		fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
29 }
30 
31 /*
32  * Parse the coremask given as argument (hexadecimal string) and fill
33  * the global configuration (core role and core count) with the parsed
34  * value.
35  */
36 static int xdigit2val(unsigned char c)
37 {
38 	int val;
39 
40 	if (isdigit(c))
41 		val = c - '0';
42 	else if (isupper(c))
43 		val = c - 'A' + 10;
44 	else
45 		val = c - 'a' + 10;
46 	return val;
47 }
48 
49 static uint64_t
50 parse_coremask(const char *coremask)
51 {
52 	int i, j, idx = 0;
53 	unsigned int count = 0;
54 	char c;
55 	int val;
56 	uint64_t mask = 0;
57 	const int32_t BITS_HEX = 4;
58 
59 	if (coremask == NULL)
60 		return -1;
61 	/* Remove all blank characters ahead and after .
62 	 * Remove 0x/0X if exists.
63 	 */
64 	while (isblank(*coremask))
65 		coremask++;
66 	if (coremask[0] == '0' && ((coremask[1] == 'x')
67 		|| (coremask[1] == 'X')))
68 		coremask += 2;
69 	i = strlen(coremask);
70 	while ((i > 0) && isblank(coremask[i - 1]))
71 		i--;
72 	if (i == 0)
73 		return -1;
74 
75 	for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
76 		c = coremask[i];
77 		if (isxdigit(c) == 0) {
78 			/* invalid characters */
79 			return -1;
80 		}
81 		val = xdigit2val(c);
82 		for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
83 			if ((1 << j) & val) {
84 				mask |= (1UL << idx);
85 				count++;
86 			}
87 		}
88 	}
89 	for (; i >= 0; i--)
90 		if (coremask[i] != '0')
91 			return -1;
92 	if (count == 0)
93 		return -1;
94 	return mask;
95 }
96 
97 static struct option long_options[] = {
98 	{"workers", required_argument, 0, 'w'},
99 	{"packets", required_argument, 0, 'n'},
100 	{"atomic-flows", required_argument, 0, 'f'},
101 	{"num_stages", required_argument, 0, 's'},
102 	{"rx-mask", required_argument, 0, 'r'},
103 	{"tx-mask", required_argument, 0, 't'},
104 	{"sched-mask", required_argument, 0, 'e'},
105 	{"cq-depth", required_argument, 0, 'c'},
106 	{"work-cycles", required_argument, 0, 'W'},
107 	{"mempool-size", required_argument, 0, 'm'},
108 	{"queue-priority", no_argument, 0, 'P'},
109 	{"parallel", no_argument, 0, 'p'},
110 	{"ordered", no_argument, 0, 'o'},
111 	{"quiet", no_argument, 0, 'q'},
112 	{"use-atq", no_argument, 0, 'a'},
113 	{"dump", no_argument, 0, 'D'},
114 	{0, 0, 0, 0}
115 };
116 
117 static void
118 usage(void)
119 {
120 	const char *usage_str =
121 		"  Usage: eventdev_demo [options]\n"
122 		"  Options:\n"
123 		"  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
124 		"  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"
125 		"  -s, --num_stages=N           Use N atomic stages (default 1)\n"
126 		"  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
127 		"  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
128 		"  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
129 		"  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
130 		"  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
131 		"  -W  --work-cycles=N          Worker cycles (default 0)\n"
132 		"  -P  --queue-priority         Enable scheduler queue prioritization\n"
133 		"  -o, --ordered                Use ordered scheduling\n"
134 		"  -p, --parallel               Use parallel scheduling\n"
135 		"  -q, --quiet                  Minimize printed output\n"
136 		"  -a, --use-atq                Use all type queues\n"
137 		"  -m, --mempool-size=N         Dictate the mempool size\n"
138 		"  -D, --dump                   Print detailed statistics before exit"
139 		"\n";
140 	fprintf(stderr, "%s", usage_str);
141 	exit(1);
142 }
143 
144 static void
145 parse_app_args(int argc, char **argv)
146 {
147 	/* Parse cli options*/
148 	int option_index;
149 	int c;
150 	opterr = 0;
151 	uint64_t rx_lcore_mask = 0;
152 	uint64_t tx_lcore_mask = 0;
153 	uint64_t sched_lcore_mask = 0;
154 	uint64_t worker_lcore_mask = 0;
155 	int i;
156 
157 	for (;;) {
158 		c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:m:paoPqDW:",
159 				long_options, &option_index);
160 		if (c == -1)
161 			break;
162 
163 		int popcnt = 0;
164 		switch (c) {
165 		case 'n':
166 			cdata.num_packets = (int64_t)atol(optarg);
167 			if (cdata.num_packets == 0)
168 				cdata.num_packets = INT64_MAX;
169 			break;
170 		case 'f':
171 			cdata.num_fids = (unsigned int)atoi(optarg);
172 			break;
173 		case 's':
174 			cdata.num_stages = (unsigned int)atoi(optarg);
175 			break;
176 		case 'c':
177 			cdata.worker_cq_depth = (unsigned int)atoi(optarg);
178 			break;
179 		case 'W':
180 			cdata.worker_cycles = (unsigned int)atoi(optarg);
181 			break;
182 		case 'P':
183 			cdata.enable_queue_priorities = 1;
184 			break;
185 		case 'o':
186 			cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
187 			break;
188 		case 'p':
189 			cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
190 			break;
191 		case 'a':
192 			cdata.all_type_queues = 1;
193 			break;
194 		case 'q':
195 			cdata.quiet = 1;
196 			break;
197 		case 'D':
198 			cdata.dump_dev = 1;
199 			break;
200 		case 'w':
201 			worker_lcore_mask = parse_coremask(optarg);
202 			break;
203 		case 'r':
204 			rx_lcore_mask = parse_coremask(optarg);
205 			popcnt = __builtin_popcountll(rx_lcore_mask);
206 			fdata->rx_single = (popcnt == 1);
207 			break;
208 		case 't':
209 			tx_lcore_mask = parse_coremask(optarg);
210 			popcnt = __builtin_popcountll(tx_lcore_mask);
211 			fdata->tx_single = (popcnt == 1);
212 			break;
213 		case 'e':
214 			sched_lcore_mask = parse_coremask(optarg);
215 			popcnt = __builtin_popcountll(sched_lcore_mask);
216 			fdata->sched_single = (popcnt == 1);
217 			break;
218 		case 'm':
219 			cdata.num_mbuf = (uint64_t)atol(optarg);
220 			break;
221 		default:
222 			usage();
223 		}
224 	}
225 
226 	cdata.worker_lcore_mask = worker_lcore_mask;
227 	cdata.sched_lcore_mask = sched_lcore_mask;
228 	cdata.rx_lcore_mask = rx_lcore_mask;
229 	cdata.tx_lcore_mask = tx_lcore_mask;
230 
231 	if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES)
232 		usage();
233 
234 	for (i = 0; i < MAX_NUM_CORE; i++) {
235 		fdata->rx_core[i] = !!(rx_lcore_mask & (1UL << i));
236 		fdata->tx_core[i] = !!(tx_lcore_mask & (1UL << i));
237 		fdata->sched_core[i] = !!(sched_lcore_mask & (1UL << i));
238 		fdata->worker_core[i] = !!(worker_lcore_mask & (1UL << i));
239 
240 		if (fdata->worker_core[i])
241 			cdata.num_workers++;
242 		if (core_in_use(i))
243 			cdata.active_cores++;
244 	}
245 }
246 
247 static void
248 do_capability_setup(uint8_t eventdev_id)
249 {
250 	int ret;
251 	uint16_t i;
252 	uint8_t generic_pipeline = 0;
253 	uint8_t burst = 0;
254 
255 	RTE_ETH_FOREACH_DEV(i) {
256 		uint32_t caps = 0;
257 
258 		ret = rte_event_eth_tx_adapter_caps_get(eventdev_id, i, &caps);
259 		if (ret)
260 			rte_exit(EXIT_FAILURE,
261 				"Invalid capability for Tx adptr port %d\n", i);
262 		generic_pipeline |= !(caps &
263 				RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
264 	}
265 
266 	struct rte_event_dev_info eventdev_info;
267 	memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
268 
269 	rte_event_dev_info_get(eventdev_id, &eventdev_info);
270 	burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
271 		0;
272 
273 	if (generic_pipeline)
274 		set_worker_generic_setup_data(&fdata->cap, burst);
275 	else
276 		set_worker_tx_enq_setup_data(&fdata->cap, burst);
277 }
278 
279 static void
280 signal_handler(int signum)
281 {
282 	static uint8_t once;
283 	uint16_t portid;
284 
285 	if (fdata->done)
286 		rte_exit(1, "Exiting on signal %d\n", signum);
287 	if ((signum == SIGINT || signum == SIGTERM) && !once) {
288 		printf("\n\nSignal %d received, preparing to exit...\n",
289 				signum);
290 		if (cdata.dump_dev)
291 			rte_event_dev_dump(0, stdout);
292 		once = 1;
293 		fdata->done = 1;
294 		rte_smp_wmb();
295 
296 		RTE_ETH_FOREACH_DEV(portid) {
297 			rte_event_eth_rx_adapter_stop(portid);
298 			rte_event_eth_tx_adapter_stop(portid);
299 			rte_eth_dev_stop(portid);
300 		}
301 
302 		rte_eal_mp_wait_lcore();
303 
304 		RTE_ETH_FOREACH_DEV(portid) {
305 			rte_eth_dev_close(portid);
306 		}
307 
308 		rte_event_dev_stop(0);
309 		rte_event_dev_close(0);
310 	}
311 	if (signum == SIGTSTP)
312 		rte_event_dev_dump(0, stdout);
313 }
314 
315 static inline uint64_t
316 port_stat(int dev_id, int32_t p)
317 {
318 	char statname[64];
319 	snprintf(statname, sizeof(statname), "port_%u_rx", p);
320 	return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL);
321 }
322 
323 int
324 main(int argc, char **argv)
325 {
326 	struct worker_data *worker_data;
327 	uint16_t num_ports;
328 	uint16_t portid;
329 	int lcore_id;
330 	int err;
331 
332 	signal(SIGINT, signal_handler);
333 	signal(SIGTERM, signal_handler);
334 	signal(SIGTSTP, signal_handler);
335 
336 	err = rte_eal_init(argc, argv);
337 	if (err < 0)
338 		rte_panic("Invalid EAL arguments\n");
339 
340 	argc -= err;
341 	argv += err;
342 
343 	fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0);
344 	if (fdata == NULL)
345 		rte_panic("Out of memory\n");
346 
347 	/* Parse cli options*/
348 	parse_app_args(argc, argv);
349 
350 	num_ports = rte_eth_dev_count_avail();
351 	if (num_ports == 0)
352 		rte_panic("No ethernet ports found\n");
353 
354 	const unsigned int cores_needed = cdata.active_cores;
355 
356 	if (!cdata.quiet) {
357 		printf("  Config:\n");
358 		printf("\tports: %u\n", num_ports);
359 		printf("\tworkers: %u\n", cdata.num_workers);
360 		printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
361 		printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
362 		if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
363 			printf("\tqid0 type: ordered\n");
364 		if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
365 			printf("\tqid0 type: atomic\n");
366 		printf("\tCores available: %u\n", rte_lcore_count());
367 		printf("\tCores used: %u\n", cores_needed);
368 	}
369 
370 	if (rte_lcore_count() < cores_needed)
371 		rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
372 				cores_needed);
373 
374 	const unsigned int ndevs = rte_event_dev_count();
375 	if (ndevs == 0)
376 		rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
377 	if (ndevs > 1)
378 		fprintf(stderr, "Warning: More than one eventdev, using idx 0");
379 
380 
381 	do_capability_setup(0);
382 	fdata->cap.check_opt();
383 
384 	worker_data = rte_calloc(0, cdata.num_workers,
385 			sizeof(worker_data[0]), 0);
386 	if (worker_data == NULL)
387 		rte_panic("rte_calloc failed\n");
388 
389 	int dev_id = fdata->cap.evdev_setup(worker_data);
390 	if (dev_id < 0)
391 		rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
392 
393 	fdata->cap.adptr_setup(num_ports);
394 
395 	/* Start the Ethernet port. */
396 	RTE_ETH_FOREACH_DEV(portid) {
397 		err = rte_eth_dev_start(portid);
398 		if (err < 0)
399 			rte_exit(EXIT_FAILURE, "Error starting ethdev %d\n",
400 					portid);
401 	}
402 
403 	int worker_idx = 0;
404 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
405 		if (lcore_id >= MAX_NUM_CORE)
406 			break;
407 
408 		if (!fdata->rx_core[lcore_id] &&
409 			!fdata->worker_core[lcore_id] &&
410 			!fdata->tx_core[lcore_id] &&
411 			!fdata->sched_core[lcore_id])
412 			continue;
413 
414 		if (fdata->rx_core[lcore_id])
415 			printf(
416 				"[%s()] lcore %d executing NIC Rx\n",
417 				__func__, lcore_id);
418 
419 		if (fdata->tx_core[lcore_id])
420 			printf(
421 				"[%s()] lcore %d executing NIC Tx\n",
422 				__func__, lcore_id);
423 
424 		if (fdata->sched_core[lcore_id])
425 			printf("[%s()] lcore %d executing scheduler\n",
426 					__func__, lcore_id);
427 
428 		if (fdata->worker_core[lcore_id])
429 			printf(
430 				"[%s()] lcore %d executing worker, using eventdev port %u\n",
431 				__func__, lcore_id,
432 				worker_data[worker_idx].port_id);
433 
434 		err = rte_eal_remote_launch(fdata->cap.worker,
435 				&worker_data[worker_idx], lcore_id);
436 		if (err) {
437 			rte_panic("Failed to launch worker on core %d\n",
438 					lcore_id);
439 			continue;
440 		}
441 		if (fdata->worker_core[lcore_id])
442 			worker_idx++;
443 	}
444 
445 	lcore_id = rte_lcore_id();
446 
447 	if (core_in_use(lcore_id))
448 		fdata->cap.worker(&worker_data[worker_idx++]);
449 
450 	rte_eal_mp_wait_lcore();
451 
452 	if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
453 			(uint64_t)-ENOTSUP)) {
454 		printf("\nPort Workload distribution:\n");
455 		uint32_t i;
456 		uint64_t tot_pkts = 0;
457 		uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
458 		for (i = 0; i < cdata.num_workers; i++) {
459 			pkts_per_wkr[i] =
460 				port_stat(dev_id, worker_data[i].port_id);
461 			tot_pkts += pkts_per_wkr[i];
462 		}
463 		for (i = 0; i < cdata.num_workers; i++) {
464 			float pc = pkts_per_wkr[i]  * 100 /
465 				((float)tot_pkts);
466 			printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
467 					i, pc, pkts_per_wkr[i]);
468 		}
469 
470 	}
471 
472 	return 0;
473 }
474