xref: /dpdk/app/test-eventdev/test_pipeline_queue.c (revision 314bcf58ca8fcd3e013f36729f9e6323a412cd1a)
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5 
6 #include "test_pipeline_common.h"
7 
8 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */
9 
10 static __rte_always_inline int
11 pipeline_queue_nb_event_queues(struct evt_options *opt)
12 {
13 	uint16_t eth_count = rte_eth_dev_count();
14 
15 	return (eth_count * opt->nb_stages) + eth_count;
16 }
17 
18 static int
19 pipeline_queue_worker_single_stage_tx(void *arg)
20 {
21 	PIPELINE_WROKER_SINGLE_STAGE_INIT;
22 
23 	while (t->done == false) {
24 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
25 
26 		if (!event) {
27 			rte_pause();
28 			continue;
29 		}
30 
31 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
32 			pipeline_tx_pkt(ev.mbuf);
33 			w->processed_pkts++;
34 		} else {
35 			ev.queue_id++;
36 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
37 			pipeline_event_enqueue(dev, port, &ev);
38 		}
39 	}
40 
41 	return 0;
42 }
43 
44 static int
45 pipeline_queue_worker_single_stage_fwd(void *arg)
46 {
47 	PIPELINE_WROKER_SINGLE_STAGE_INIT;
48 	const uint8_t tx_queue = t->tx_service.queue_id;
49 
50 	while (t->done == false) {
51 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
52 
53 		if (!event) {
54 			rte_pause();
55 			continue;
56 		}
57 
58 		ev.queue_id = tx_queue;
59 		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
60 		pipeline_event_enqueue(dev, port, &ev);
61 		w->processed_pkts++;
62 	}
63 
64 	return 0;
65 }
66 
67 static int
68 pipeline_queue_worker_single_stage_burst_tx(void *arg)
69 {
70 	PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
71 
72 	while (t->done == false) {
73 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
74 				BURST_SIZE, 0);
75 
76 		if (!nb_rx) {
77 			rte_pause();
78 			continue;
79 		}
80 
81 		for (i = 0; i < nb_rx; i++) {
82 			rte_prefetch0(ev[i + 1].mbuf);
83 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
84 
85 				pipeline_tx_pkt(ev[i].mbuf);
86 				ev[i].op = RTE_EVENT_OP_RELEASE;
87 				w->processed_pkts++;
88 			} else {
89 				ev[i].queue_id++;
90 				pipeline_fwd_event(&ev[i],
91 						RTE_SCHED_TYPE_ATOMIC);
92 			}
93 		}
94 
95 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
96 	}
97 
98 	return 0;
99 }
100 
101 static int
102 pipeline_queue_worker_single_stage_burst_fwd(void *arg)
103 {
104 	PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
105 	const uint8_t tx_queue = t->tx_service.queue_id;
106 
107 	while (t->done == false) {
108 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
109 				BURST_SIZE, 0);
110 
111 		if (!nb_rx) {
112 			rte_pause();
113 			continue;
114 		}
115 
116 		for (i = 0; i < nb_rx; i++) {
117 			rte_prefetch0(ev[i + 1].mbuf);
118 			ev[i].queue_id = tx_queue;
119 			pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
120 			w->processed_pkts++;
121 		}
122 
123 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
124 	}
125 
126 	return 0;
127 }
128 
129 
130 static int
131 pipeline_queue_worker_multi_stage_tx(void *arg)
132 {
133 	PIPELINE_WROKER_MULTI_STAGE_INIT;
134 	const uint8_t nb_stages = t->opt->nb_stages + 1;
135 
136 	while (t->done == false) {
137 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
138 
139 		if (!event) {
140 			rte_pause();
141 			continue;
142 		}
143 
144 		cq_id = ev.queue_id % nb_stages;
145 
146 		if (cq_id >= last_queue) {
147 			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
148 
149 				pipeline_tx_pkt(ev.mbuf);
150 				w->processed_pkts++;
151 				continue;
152 			}
153 			ev.queue_id += (cq_id == last_queue) ? 1 : 0;
154 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
155 		} else {
156 			ev.queue_id++;
157 			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
158 		}
159 
160 		pipeline_event_enqueue(dev, port, &ev);
161 	}
162 	return 0;
163 }
164 
165 static int
166 pipeline_queue_worker_multi_stage_fwd(void *arg)
167 {
168 	PIPELINE_WROKER_MULTI_STAGE_INIT;
169 	const uint8_t nb_stages = t->opt->nb_stages + 1;
170 	const uint8_t tx_queue = t->tx_service.queue_id;
171 
172 	while (t->done == false) {
173 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
174 
175 		if (!event) {
176 			rte_pause();
177 			continue;
178 		}
179 
180 		cq_id = ev.queue_id % nb_stages;
181 
182 		if (cq_id == last_queue) {
183 			ev.queue_id = tx_queue;
184 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
185 			w->processed_pkts++;
186 		} else {
187 			ev.queue_id++;
188 			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
189 		}
190 
191 		pipeline_event_enqueue(dev, port, &ev);
192 	}
193 	return 0;
194 }
195 
196 static int
197 pipeline_queue_worker_multi_stage_burst_tx(void *arg)
198 {
199 	PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
200 	const uint8_t nb_stages = t->opt->nb_stages + 1;
201 
202 	while (t->done == false) {
203 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
204 				BURST_SIZE, 0);
205 
206 		if (!nb_rx) {
207 			rte_pause();
208 			continue;
209 		}
210 
211 		for (i = 0; i < nb_rx; i++) {
212 			rte_prefetch0(ev[i + 1].mbuf);
213 			cq_id = ev[i].queue_id % nb_stages;
214 
215 			if (cq_id >= last_queue) {
216 				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
217 
218 					pipeline_tx_pkt(ev[i].mbuf);
219 					ev[i].op = RTE_EVENT_OP_RELEASE;
220 					w->processed_pkts++;
221 					continue;
222 				}
223 
224 				ev[i].queue_id += (cq_id == last_queue) ? 1 : 0;
225 				pipeline_fwd_event(&ev[i],
226 						RTE_SCHED_TYPE_ATOMIC);
227 			} else {
228 				ev[i].queue_id++;
229 				pipeline_fwd_event(&ev[i],
230 						sched_type_list[cq_id]);
231 			}
232 
233 		}
234 
235 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
236 	}
237 	return 0;
238 }
239 
240 static int
241 pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
242 {
243 	PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
244 	const uint8_t nb_stages = t->opt->nb_stages + 1;
245 	const uint8_t tx_queue = t->tx_service.queue_id;
246 
247 	while (t->done == false) {
248 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
249 				BURST_SIZE, 0);
250 
251 		if (!nb_rx) {
252 			rte_pause();
253 			continue;
254 		}
255 
256 		for (i = 0; i < nb_rx; i++) {
257 			rte_prefetch0(ev[i + 1].mbuf);
258 			cq_id = ev[i].queue_id % nb_stages;
259 
260 			if (cq_id == last_queue) {
261 				ev[i].queue_id = tx_queue;
262 				pipeline_fwd_event(&ev[i],
263 						RTE_SCHED_TYPE_ATOMIC);
264 				w->processed_pkts++;
265 			} else {
266 				ev[i].queue_id++;
267 				pipeline_fwd_event(&ev[i],
268 						sched_type_list[cq_id]);
269 			}
270 		}
271 
272 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
273 	}
274 	return 0;
275 }
276 
277 static int
278 worker_wrapper(void *arg)
279 {
280 	struct worker_data *w  = arg;
281 	struct evt_options *opt = w->t->opt;
282 	const bool burst = evt_has_burst_mode(w->dev_id);
283 	const bool mt_safe = !w->t->mt_unsafe;
284 	const uint8_t nb_stages = opt->nb_stages;
285 	RTE_SET_USED(opt);
286 
287 	if (nb_stages == 1) {
288 		if (!burst && mt_safe)
289 			return pipeline_queue_worker_single_stage_tx(arg);
290 		else if (!burst && !mt_safe)
291 			return pipeline_queue_worker_single_stage_fwd(arg);
292 		else if (burst && mt_safe)
293 			return pipeline_queue_worker_single_stage_burst_tx(arg);
294 		else if (burst && !mt_safe)
295 			return pipeline_queue_worker_single_stage_burst_fwd(
296 					arg);
297 	} else {
298 		if (!burst && mt_safe)
299 			return pipeline_queue_worker_multi_stage_tx(arg);
300 		else if (!burst && !mt_safe)
301 			return pipeline_queue_worker_multi_stage_fwd(arg);
302 		else if (burst && mt_safe)
303 			return pipeline_queue_worker_multi_stage_burst_tx(arg);
304 		else if (burst && !mt_safe)
305 			return pipeline_queue_worker_multi_stage_burst_fwd(arg);
306 
307 	}
308 	rte_panic("invalid worker\n");
309 }
310 
311 static int
312 pipeline_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
313 {
314 	struct test_pipeline *t = evt_test_priv(test);
315 
316 	if (t->mt_unsafe)
317 		rte_service_component_runstate_set(t->tx_service.service_id, 1);
318 	return pipeline_launch_lcores(test, opt, worker_wrapper);
319 }
320 
321 static int
322 pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
323 {
324 	int ret;
325 	int nb_ports;
326 	int nb_queues;
327 	int nb_stages = opt->nb_stages;
328 	uint8_t queue;
329 	struct rte_event_dev_info info;
330 	struct test_pipeline *t = evt_test_priv(test);
331 	uint8_t tx_evqueue_id = 0;
332 	uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
333 	uint8_t nb_worker_queues = 0;
334 
335 	nb_ports = evt_nr_active_lcores(opt->wlcores);
336 	nb_queues = rte_eth_dev_count() * (nb_stages);
337 
338 	/* Extra port for Tx service. */
339 	if (t->mt_unsafe) {
340 		tx_evqueue_id = nb_queues;
341 		nb_ports++;
342 		nb_queues++;
343 	} else
344 		nb_queues += rte_eth_dev_count();
345 
346 	rte_event_dev_info_get(opt->dev_id, &info);
347 
348 	const struct rte_event_dev_config config = {
349 			.nb_event_queues = nb_queues,
350 			.nb_event_ports = nb_ports,
351 			.nb_events_limit  = info.max_num_events,
352 			.nb_event_queue_flows = opt->nb_flows,
353 			.nb_event_port_dequeue_depth =
354 				info.max_event_port_dequeue_depth,
355 			.nb_event_port_enqueue_depth =
356 				info.max_event_port_enqueue_depth,
357 	};
358 	ret = rte_event_dev_configure(opt->dev_id, &config);
359 	if (ret) {
360 		evt_err("failed to configure eventdev %d", opt->dev_id);
361 		return ret;
362 	}
363 
364 	struct rte_event_queue_conf q_conf = {
365 			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
366 			.nb_atomic_flows = opt->nb_flows,
367 			.nb_atomic_order_sequences = opt->nb_flows,
368 	};
369 	/* queue configurations */
370 	for (queue = 0; queue < nb_queues; queue++) {
371 		uint8_t slot;
372 
373 		if (!t->mt_unsafe) {
374 			slot = queue % (nb_stages + 1);
375 			q_conf.schedule_type = slot == nb_stages ?
376 				RTE_SCHED_TYPE_ATOMIC :
377 				opt->sched_type_list[slot];
378 		} else {
379 			slot = queue % nb_stages;
380 
381 			if (queue == tx_evqueue_id) {
382 				q_conf.schedule_type = RTE_SCHED_TYPE_ATOMIC;
383 				q_conf.event_queue_cfg =
384 					RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
385 			} else {
386 				q_conf.schedule_type =
387 					opt->sched_type_list[slot];
388 				queue_arr[nb_worker_queues] = queue;
389 				nb_worker_queues++;
390 			}
391 		}
392 
393 		ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
394 		if (ret) {
395 			evt_err("failed to setup queue=%d", queue);
396 			return ret;
397 		}
398 	}
399 
400 	/* port configuration */
401 	const struct rte_event_port_conf p_conf = {
402 			.dequeue_depth = opt->wkr_deq_dep,
403 			.enqueue_depth = info.max_event_port_dequeue_depth,
404 			.new_event_threshold = info.max_num_events,
405 	};
406 
407 	/*
408 	 * If tx is multi thread safe then allow workers to do Tx else use Tx
409 	 * service to Tx packets.
410 	 */
411 	if (t->mt_unsafe) {
412 		ret = pipeline_event_port_setup(test, opt, queue_arr,
413 				nb_worker_queues, p_conf);
414 		if (ret)
415 			return ret;
416 
417 		ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id,
418 				nb_ports - 1, p_conf);
419 
420 	} else
421 		ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
422 				p_conf);
423 
424 	if (ret)
425 		return ret;
426 	/*
427 	 * The pipelines are setup in the following manner:
428 	 *
429 	 * eth_dev_count = 2, nb_stages = 2.
430 	 *
431 	 * Multi thread safe :
432 	 *	queues = 6
433 	 *	stride = 3
434 	 *
435 	 *	event queue pipelines:
436 	 *	eth0 -> q0 -> q1 -> (q2->tx)
437 	 *	eth1 -> q3 -> q4 -> (q5->tx)
438 	 *
439 	 *	q2, q5 configured as ATOMIC
440 	 *
441 	 * Multi thread unsafe :
442 	 *	queues = 5
443 	 *	stride = 2
444 	 *
445 	 *	event queue pipelines:
446 	 *	eth0 -> q0 -> q1
447 	 *			} (q4->tx) Tx service
448 	 *	eth1 -> q2 -> q3
449 	 *
450 	 *	q4 configured as SINGLE_LINK|ATOMIC
451 	 */
452 	ret = pipeline_event_rx_adapter_setup(opt,
453 			t->mt_unsafe ? nb_stages : nb_stages + 1, p_conf);
454 	if (ret)
455 		return ret;
456 
457 	if (!evt_has_distributed_sched(opt->dev_id)) {
458 		uint32_t service_id;
459 		rte_event_dev_service_id_get(opt->dev_id, &service_id);
460 		ret = evt_service_setup(service_id);
461 		if (ret) {
462 			evt_err("No service lcore found to run event dev.");
463 			return ret;
464 		}
465 	}
466 
467 	ret = rte_event_dev_start(opt->dev_id);
468 	if (ret) {
469 		evt_err("failed to start eventdev %d", opt->dev_id);
470 		return ret;
471 	}
472 
473 	return 0;
474 }
475 
476 static void
477 pipeline_queue_opt_dump(struct evt_options *opt)
478 {
479 	pipeline_opt_dump(opt, pipeline_queue_nb_event_queues(opt));
480 }
481 
482 static int
483 pipeline_queue_opt_check(struct evt_options *opt)
484 {
485 	return pipeline_opt_check(opt, pipeline_queue_nb_event_queues(opt));
486 }
487 
488 static bool
489 pipeline_queue_capability_check(struct evt_options *opt)
490 {
491 	struct rte_event_dev_info dev_info;
492 
493 	rte_event_dev_info_get(opt->dev_id, &dev_info);
494 	if (dev_info.max_event_queues < pipeline_queue_nb_event_queues(opt) ||
495 			dev_info.max_event_ports <
496 			evt_nr_active_lcores(opt->wlcores)) {
497 		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
498 			pipeline_queue_nb_event_queues(opt),
499 			dev_info.max_event_queues,
500 			evt_nr_active_lcores(opt->wlcores),
501 			dev_info.max_event_ports);
502 	}
503 
504 	return true;
505 }
506 
507 static const struct evt_test_ops pipeline_queue =  {
508 	.cap_check          = pipeline_queue_capability_check,
509 	.opt_check          = pipeline_queue_opt_check,
510 	.opt_dump           = pipeline_queue_opt_dump,
511 	.test_setup         = pipeline_test_setup,
512 	.mempool_setup      = pipeline_mempool_setup,
513 	.ethdev_setup	    = pipeline_ethdev_setup,
514 	.eventdev_setup     = pipeline_queue_eventdev_setup,
515 	.launch_lcores      = pipeline_queue_launch_lcores,
516 	.eventdev_destroy   = pipeline_eventdev_destroy,
517 	.mempool_destroy    = pipeline_mempool_destroy,
518 	.ethdev_destroy	    = pipeline_ethdev_destroy,
519 	.test_result        = pipeline_test_result,
520 	.test_destroy       = pipeline_test_destroy,
521 };
522 
523 EVT_TEST_REGISTER(pipeline_queue);
524