xref: /dpdk/app/test-eventdev/test_pipeline_atq.c (revision 945acb4a0d644d194f1823084a234f9c286dcf8c)
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5 
6 #include "test_pipeline_common.h"
7 
8 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */
9 
10 static __rte_always_inline int
11 pipeline_atq_nb_event_queues(struct evt_options *opt)
12 {
13 	RTE_SET_USED(opt);
14 
15 	return rte_eth_dev_count();
16 }
17 
18 static int
19 pipeline_atq_worker_single_stage_tx(void *arg)
20 {
21 	PIPELINE_WROKER_SINGLE_STAGE_INIT;
22 
23 	while (t->done == false) {
24 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
25 
26 		if (!event) {
27 			rte_pause();
28 			continue;
29 		}
30 
31 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
32 			pipeline_tx_pkt(ev.mbuf);
33 			w->processed_pkts++;
34 			continue;
35 		}
36 		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
37 		pipeline_event_enqueue(dev, port, &ev);
38 	}
39 
40 	return 0;
41 }
42 
43 static int
44 pipeline_atq_worker_single_stage_fwd(void *arg)
45 {
46 	PIPELINE_WROKER_SINGLE_STAGE_INIT;
47 	const uint8_t tx_queue = t->tx_service.queue_id;
48 
49 	while (t->done == false) {
50 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
51 
52 		if (!event) {
53 			rte_pause();
54 			continue;
55 		}
56 
57 		w->processed_pkts++;
58 		ev.queue_id = tx_queue;
59 		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
60 		pipeline_event_enqueue(dev, port, &ev);
61 	}
62 
63 	return 0;
64 }
65 
66 static int
67 pipeline_atq_worker_single_stage_burst_tx(void *arg)
68 {
69 	PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
70 
71 	while (t->done == false) {
72 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
73 				BURST_SIZE, 0);
74 
75 		if (!nb_rx) {
76 			rte_pause();
77 			continue;
78 		}
79 
80 		for (i = 0; i < nb_rx; i++) {
81 			rte_prefetch0(ev[i + 1].mbuf);
82 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
83 
84 				pipeline_tx_pkt(ev[i].mbuf);
85 				ev[i].op = RTE_EVENT_OP_RELEASE;
86 				w->processed_pkts++;
87 			} else
88 				pipeline_fwd_event(&ev[i],
89 						RTE_SCHED_TYPE_ATOMIC);
90 		}
91 
92 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
93 	}
94 
95 	return 0;
96 }
97 
98 static int
99 pipeline_atq_worker_single_stage_burst_fwd(void *arg)
100 {
101 	PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
102 	const uint8_t tx_queue = t->tx_service.queue_id;
103 
104 	while (t->done == false) {
105 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
106 				BURST_SIZE, 0);
107 
108 		if (!nb_rx) {
109 			rte_pause();
110 			continue;
111 		}
112 
113 		for (i = 0; i < nb_rx; i++) {
114 			rte_prefetch0(ev[i + 1].mbuf);
115 			ev[i].queue_id = tx_queue;
116 			pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
117 			w->processed_pkts++;
118 		}
119 
120 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
121 	}
122 
123 	return 0;
124 }
125 
126 static int
127 pipeline_atq_worker_multi_stage_tx(void *arg)
128 {
129 	PIPELINE_WROKER_MULTI_STAGE_INIT;
130 	const uint8_t nb_stages = t->opt->nb_stages;
131 
132 
133 	while (t->done == false) {
134 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
135 
136 		if (!event) {
137 			rte_pause();
138 			continue;
139 		}
140 
141 		cq_id = ev.sub_event_type % nb_stages;
142 
143 		if (cq_id == last_queue) {
144 			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
145 
146 				pipeline_tx_pkt(ev.mbuf);
147 				w->processed_pkts++;
148 				continue;
149 			}
150 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
151 		} else {
152 			ev.sub_event_type++;
153 			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
154 		}
155 
156 		pipeline_event_enqueue(dev, port, &ev);
157 	}
158 	return 0;
159 }
160 
161 static int
162 pipeline_atq_worker_multi_stage_fwd(void *arg)
163 {
164 	PIPELINE_WROKER_MULTI_STAGE_INIT;
165 	const uint8_t nb_stages = t->opt->nb_stages;
166 	const uint8_t tx_queue = t->tx_service.queue_id;
167 
168 	while (t->done == false) {
169 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
170 
171 		if (!event) {
172 			rte_pause();
173 			continue;
174 		}
175 
176 		cq_id = ev.sub_event_type % nb_stages;
177 
178 		if (cq_id == last_queue) {
179 			w->processed_pkts++;
180 			ev.queue_id = tx_queue;
181 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
182 		} else {
183 			ev.sub_event_type++;
184 			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
185 		}
186 
187 		pipeline_event_enqueue(dev, port, &ev);
188 	}
189 	return 0;
190 }
191 
192 static int
193 pipeline_atq_worker_multi_stage_burst_tx(void *arg)
194 {
195 	PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
196 	const uint8_t nb_stages = t->opt->nb_stages;
197 
198 	while (t->done == false) {
199 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
200 				BURST_SIZE, 0);
201 
202 		if (!nb_rx) {
203 			rte_pause();
204 			continue;
205 		}
206 
207 		for (i = 0; i < nb_rx; i++) {
208 			rte_prefetch0(ev[i + 1].mbuf);
209 			cq_id = ev[i].sub_event_type % nb_stages;
210 
211 			if (cq_id == last_queue) {
212 				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
213 
214 					pipeline_tx_pkt(ev[i].mbuf);
215 					ev[i].op = RTE_EVENT_OP_RELEASE;
216 					w->processed_pkts++;
217 					continue;
218 				}
219 
220 				pipeline_fwd_event(&ev[i],
221 						RTE_SCHED_TYPE_ATOMIC);
222 			} else {
223 				ev[i].sub_event_type++;
224 				pipeline_fwd_event(&ev[i],
225 						sched_type_list[cq_id]);
226 			}
227 		}
228 
229 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
230 	}
231 	return 0;
232 }
233 
234 static int
235 pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
236 {
237 	PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
238 	const uint8_t nb_stages = t->opt->nb_stages;
239 	const uint8_t tx_queue = t->tx_service.queue_id;
240 
241 	while (t->done == false) {
242 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
243 				BURST_SIZE, 0);
244 
245 		if (!nb_rx) {
246 			rte_pause();
247 			continue;
248 		}
249 
250 		for (i = 0; i < nb_rx; i++) {
251 			rte_prefetch0(ev[i + 1].mbuf);
252 			cq_id = ev[i].sub_event_type % nb_stages;
253 
254 			if (cq_id == last_queue) {
255 				w->processed_pkts++;
256 				ev[i].queue_id = tx_queue;
257 				pipeline_fwd_event(&ev[i],
258 						RTE_SCHED_TYPE_ATOMIC);
259 			} else {
260 				ev[i].sub_event_type++;
261 				pipeline_fwd_event(&ev[i],
262 						sched_type_list[cq_id]);
263 			}
264 		}
265 
266 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
267 	}
268 	return 0;
269 }
270 
271 static int
272 worker_wrapper(void *arg)
273 {
274 	struct worker_data *w  = arg;
275 	struct evt_options *opt = w->t->opt;
276 	const bool burst = evt_has_burst_mode(w->dev_id);
277 	const bool mt_safe = !w->t->mt_unsafe;
278 	const uint8_t nb_stages = opt->nb_stages;
279 	RTE_SET_USED(opt);
280 
281 	if (nb_stages == 1) {
282 		if (!burst && mt_safe)
283 			return pipeline_atq_worker_single_stage_tx(arg);
284 		else if (!burst && !mt_safe)
285 			return pipeline_atq_worker_single_stage_fwd(arg);
286 		else if (burst && mt_safe)
287 			return pipeline_atq_worker_single_stage_burst_tx(arg);
288 		else if (burst && !mt_safe)
289 			return pipeline_atq_worker_single_stage_burst_fwd(arg);
290 	} else {
291 		if (!burst && mt_safe)
292 			return pipeline_atq_worker_multi_stage_tx(arg);
293 		else if (!burst && !mt_safe)
294 			return pipeline_atq_worker_multi_stage_fwd(arg);
295 		if (burst && mt_safe)
296 			return pipeline_atq_worker_multi_stage_burst_tx(arg);
297 		else if (burst && !mt_safe)
298 			return pipeline_atq_worker_multi_stage_burst_fwd(arg);
299 	}
300 	rte_panic("invalid worker\n");
301 }
302 
303 static int
304 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
305 {
306 	struct test_pipeline *t = evt_test_priv(test);
307 
308 	if (t->mt_unsafe)
309 		rte_service_component_runstate_set(t->tx_service.service_id, 1);
310 	return pipeline_launch_lcores(test, opt, worker_wrapper);
311 }
312 
313 static int
314 pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
315 {
316 	int ret;
317 	int nb_ports;
318 	int nb_queues;
319 	uint8_t queue;
320 	struct rte_event_dev_info info;
321 	struct test_pipeline *t = evt_test_priv(test);
322 	uint8_t tx_evqueue_id = 0;
323 	uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
324 	uint8_t nb_worker_queues = 0;
325 
326 	nb_ports = evt_nr_active_lcores(opt->wlcores);
327 	nb_queues = rte_eth_dev_count();
328 
329 	/* One extra port and queueu for Tx service */
330 	if (t->mt_unsafe) {
331 		tx_evqueue_id = nb_queues;
332 		nb_ports++;
333 		nb_queues++;
334 	}
335 
336 
337 	rte_event_dev_info_get(opt->dev_id, &info);
338 
339 	const struct rte_event_dev_config config = {
340 			.nb_event_queues = nb_queues,
341 			.nb_event_ports = nb_ports,
342 			.nb_events_limit  = info.max_num_events,
343 			.nb_event_queue_flows = opt->nb_flows,
344 			.nb_event_port_dequeue_depth =
345 				info.max_event_port_dequeue_depth,
346 			.nb_event_port_enqueue_depth =
347 				info.max_event_port_enqueue_depth,
348 	};
349 	ret = rte_event_dev_configure(opt->dev_id, &config);
350 	if (ret) {
351 		evt_err("failed to configure eventdev %d", opt->dev_id);
352 		return ret;
353 	}
354 
355 	struct rte_event_queue_conf q_conf = {
356 			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
357 			.nb_atomic_flows = opt->nb_flows,
358 			.nb_atomic_order_sequences = opt->nb_flows,
359 	};
360 	/* queue configurations */
361 	for (queue = 0; queue < nb_queues; queue++) {
362 		q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
363 
364 		if (t->mt_unsafe) {
365 			if (queue == tx_evqueue_id) {
366 				q_conf.event_queue_cfg =
367 					RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
368 			} else {
369 				queue_arr[nb_worker_queues] = queue;
370 				nb_worker_queues++;
371 			}
372 		}
373 
374 		ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
375 		if (ret) {
376 			evt_err("failed to setup queue=%d", queue);
377 			return ret;
378 		}
379 	}
380 
381 	/* port configuration */
382 	const struct rte_event_port_conf p_conf = {
383 			.dequeue_depth = opt->wkr_deq_dep,
384 			.enqueue_depth = info.max_event_port_dequeue_depth,
385 			.new_event_threshold = info.max_num_events,
386 	};
387 
388 	if (t->mt_unsafe) {
389 		ret = pipeline_event_port_setup(test, opt, queue_arr,
390 				nb_worker_queues, p_conf);
391 		if (ret)
392 			return ret;
393 
394 		ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id,
395 				nb_ports - 1, p_conf);
396 	} else
397 		ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
398 				p_conf);
399 
400 	if (ret)
401 		return ret;
402 
403 	/*
404 	 * The pipelines are setup in the following manner:
405 	 *
406 	 * eth_dev_count = 2, nb_stages = 2, atq mode
407 	 *
408 	 * Multi thread safe :
409 	 *	queues = 2
410 	 *	stride = 1
411 	 *
412 	 *	event queue pipelines:
413 	 *	eth0 -> q0 ->tx
414 	 *	eth1 -> q1 ->tx
415 	 *
416 	 *	q0, q1 are configured as ATQ so, all the different stages can
417 	 *	be enqueued on the same queue.
418 	 *
419 	 * Multi thread unsafe :
420 	 *	queues = 3
421 	 *	stride = 1
422 	 *
423 	 *	event queue pipelines:
424 	 *	eth0 -> q0
425 	 *		  } (q3->tx) Tx service
426 	 *	eth1 -> q1
427 	 *
428 	 *	q0,q1 are configured as stated above.
429 	 *	q3 configured as SINGLE_LINK|ATOMIC.
430 	 */
431 	ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
432 	if (ret)
433 		return ret;
434 
435 	if (!evt_has_distributed_sched(opt->dev_id)) {
436 		uint32_t service_id;
437 		rte_event_dev_service_id_get(opt->dev_id, &service_id);
438 		ret = evt_service_setup(service_id);
439 		if (ret) {
440 			evt_err("No service lcore found to run event dev.");
441 			return ret;
442 		}
443 	}
444 
445 	ret = rte_event_dev_start(opt->dev_id);
446 	if (ret) {
447 		evt_err("failed to start eventdev %d", opt->dev_id);
448 		return ret;
449 	}
450 
451 	return 0;
452 }
453 
454 static void
455 pipeline_atq_opt_dump(struct evt_options *opt)
456 {
457 	pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt));
458 }
459 
460 static int
461 pipeline_atq_opt_check(struct evt_options *opt)
462 {
463 	return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt));
464 }
465 
466 static bool
467 pipeline_atq_capability_check(struct evt_options *opt)
468 {
469 	struct rte_event_dev_info dev_info;
470 
471 	rte_event_dev_info_get(opt->dev_id, &dev_info);
472 	if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) ||
473 			dev_info.max_event_ports <
474 			evt_nr_active_lcores(opt->wlcores)) {
475 		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
476 			pipeline_atq_nb_event_queues(opt),
477 			dev_info.max_event_queues,
478 			evt_nr_active_lcores(opt->wlcores),
479 			dev_info.max_event_ports);
480 	}
481 
482 	return true;
483 }
484 
485 static const struct evt_test_ops pipeline_atq =  {
486 	.cap_check          = pipeline_atq_capability_check,
487 	.opt_check          = pipeline_atq_opt_check,
488 	.opt_dump           = pipeline_atq_opt_dump,
489 	.test_setup         = pipeline_test_setup,
490 	.mempool_setup      = pipeline_mempool_setup,
491 	.ethdev_setup	    = pipeline_ethdev_setup,
492 	.eventdev_setup     = pipeline_atq_eventdev_setup,
493 	.launch_lcores      = pipeline_atq_launch_lcores,
494 	.eventdev_destroy   = pipeline_eventdev_destroy,
495 	.mempool_destroy    = pipeline_mempool_destroy,
496 	.ethdev_destroy	    = pipeline_ethdev_destroy,
497 	.test_result        = pipeline_test_result,
498 	.test_destroy       = pipeline_test_destroy,
499 };
500 
501 EVT_TEST_REGISTER(pipeline_atq);
502