xref: /dpdk/app/test-eventdev/test_pipeline_atq.c (revision 6491dbbecebb1e4f07fc970ef90b34119d8be2e3)
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5 
6 #include "test_pipeline_common.h"
7 
8 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */
9 
10 static __rte_always_inline int
11 pipeline_atq_nb_event_queues(struct evt_options *opt)
12 {
13 	RTE_SET_USED(opt);
14 
15 	return rte_eth_dev_count_avail();
16 }
17 
18 static int
19 pipeline_atq_worker_single_stage_tx(void *arg)
20 {
21 	PIPELINE_WROKER_SINGLE_STAGE_INIT;
22 
23 	while (t->done == false) {
24 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
25 
26 		if (!event) {
27 			rte_pause();
28 			continue;
29 		}
30 
31 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
32 			pipeline_tx_pkt(ev.mbuf);
33 			w->processed_pkts++;
34 			continue;
35 		}
36 		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
37 		pipeline_event_enqueue(dev, port, &ev);
38 	}
39 
40 	return 0;
41 }
42 
43 static int
44 pipeline_atq_worker_single_stage_fwd(void *arg)
45 {
46 	PIPELINE_WROKER_SINGLE_STAGE_INIT;
47 	const uint8_t tx_queue = t->tx_service.queue_id;
48 
49 	while (t->done == false) {
50 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
51 
52 		if (!event) {
53 			rte_pause();
54 			continue;
55 		}
56 
57 		w->processed_pkts++;
58 		ev.queue_id = tx_queue;
59 		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
60 		pipeline_event_enqueue(dev, port, &ev);
61 	}
62 
63 	return 0;
64 }
65 
66 static int
67 pipeline_atq_worker_single_stage_burst_tx(void *arg)
68 {
69 	PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
70 
71 	while (t->done == false) {
72 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
73 				BURST_SIZE, 0);
74 
75 		if (!nb_rx) {
76 			rte_pause();
77 			continue;
78 		}
79 
80 		for (i = 0; i < nb_rx; i++) {
81 			rte_prefetch0(ev[i + 1].mbuf);
82 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
83 
84 				pipeline_tx_pkt(ev[i].mbuf);
85 				ev[i].op = RTE_EVENT_OP_RELEASE;
86 				w->processed_pkts++;
87 			} else
88 				pipeline_fwd_event(&ev[i],
89 						RTE_SCHED_TYPE_ATOMIC);
90 		}
91 
92 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
93 	}
94 
95 	return 0;
96 }
97 
98 static int
99 pipeline_atq_worker_single_stage_burst_fwd(void *arg)
100 {
101 	PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
102 	const uint8_t tx_queue = t->tx_service.queue_id;
103 
104 	while (t->done == false) {
105 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
106 				BURST_SIZE, 0);
107 
108 		if (!nb_rx) {
109 			rte_pause();
110 			continue;
111 		}
112 
113 		for (i = 0; i < nb_rx; i++) {
114 			rte_prefetch0(ev[i + 1].mbuf);
115 			ev[i].queue_id = tx_queue;
116 			pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
117 			w->processed_pkts++;
118 		}
119 
120 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
121 	}
122 
123 	return 0;
124 }
125 
126 static int
127 pipeline_atq_worker_multi_stage_tx(void *arg)
128 {
129 	PIPELINE_WROKER_MULTI_STAGE_INIT;
130 	const uint8_t nb_stages = t->opt->nb_stages;
131 
132 
133 	while (t->done == false) {
134 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
135 
136 		if (!event) {
137 			rte_pause();
138 			continue;
139 		}
140 
141 		cq_id = ev.sub_event_type % nb_stages;
142 
143 		if (cq_id == last_queue) {
144 			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
145 
146 				pipeline_tx_pkt(ev.mbuf);
147 				w->processed_pkts++;
148 				continue;
149 			}
150 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
151 		} else {
152 			ev.sub_event_type++;
153 			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
154 		}
155 
156 		pipeline_event_enqueue(dev, port, &ev);
157 	}
158 	return 0;
159 }
160 
161 static int
162 pipeline_atq_worker_multi_stage_fwd(void *arg)
163 {
164 	PIPELINE_WROKER_MULTI_STAGE_INIT;
165 	const uint8_t nb_stages = t->opt->nb_stages;
166 	const uint8_t tx_queue = t->tx_service.queue_id;
167 
168 	while (t->done == false) {
169 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
170 
171 		if (!event) {
172 			rte_pause();
173 			continue;
174 		}
175 
176 		cq_id = ev.sub_event_type % nb_stages;
177 
178 		if (cq_id == last_queue) {
179 			w->processed_pkts++;
180 			ev.queue_id = tx_queue;
181 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
182 		} else {
183 			ev.sub_event_type++;
184 			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
185 		}
186 
187 		pipeline_event_enqueue(dev, port, &ev);
188 	}
189 	return 0;
190 }
191 
192 static int
193 pipeline_atq_worker_multi_stage_burst_tx(void *arg)
194 {
195 	PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
196 	const uint8_t nb_stages = t->opt->nb_stages;
197 
198 	while (t->done == false) {
199 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
200 				BURST_SIZE, 0);
201 
202 		if (!nb_rx) {
203 			rte_pause();
204 			continue;
205 		}
206 
207 		for (i = 0; i < nb_rx; i++) {
208 			rte_prefetch0(ev[i + 1].mbuf);
209 			cq_id = ev[i].sub_event_type % nb_stages;
210 
211 			if (cq_id == last_queue) {
212 				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
213 
214 					pipeline_tx_pkt(ev[i].mbuf);
215 					ev[i].op = RTE_EVENT_OP_RELEASE;
216 					w->processed_pkts++;
217 					continue;
218 				}
219 
220 				pipeline_fwd_event(&ev[i],
221 						RTE_SCHED_TYPE_ATOMIC);
222 			} else {
223 				ev[i].sub_event_type++;
224 				pipeline_fwd_event(&ev[i],
225 						sched_type_list[cq_id]);
226 			}
227 		}
228 
229 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
230 	}
231 	return 0;
232 }
233 
234 static int
235 pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
236 {
237 	PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
238 	const uint8_t nb_stages = t->opt->nb_stages;
239 	const uint8_t tx_queue = t->tx_service.queue_id;
240 
241 	while (t->done == false) {
242 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
243 				BURST_SIZE, 0);
244 
245 		if (!nb_rx) {
246 			rte_pause();
247 			continue;
248 		}
249 
250 		for (i = 0; i < nb_rx; i++) {
251 			rte_prefetch0(ev[i + 1].mbuf);
252 			cq_id = ev[i].sub_event_type % nb_stages;
253 
254 			if (cq_id == last_queue) {
255 				w->processed_pkts++;
256 				ev[i].queue_id = tx_queue;
257 				pipeline_fwd_event(&ev[i],
258 						RTE_SCHED_TYPE_ATOMIC);
259 			} else {
260 				ev[i].sub_event_type++;
261 				pipeline_fwd_event(&ev[i],
262 						sched_type_list[cq_id]);
263 			}
264 		}
265 
266 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
267 	}
268 	return 0;
269 }
270 
271 static int
272 worker_wrapper(void *arg)
273 {
274 	struct worker_data *w  = arg;
275 	struct evt_options *opt = w->t->opt;
276 	const bool burst = evt_has_burst_mode(w->dev_id);
277 	const bool mt_safe = !w->t->mt_unsafe;
278 	const uint8_t nb_stages = opt->nb_stages;
279 	RTE_SET_USED(opt);
280 
281 	if (nb_stages == 1) {
282 		if (!burst && mt_safe)
283 			return pipeline_atq_worker_single_stage_tx(arg);
284 		else if (!burst && !mt_safe)
285 			return pipeline_atq_worker_single_stage_fwd(arg);
286 		else if (burst && mt_safe)
287 			return pipeline_atq_worker_single_stage_burst_tx(arg);
288 		else if (burst && !mt_safe)
289 			return pipeline_atq_worker_single_stage_burst_fwd(arg);
290 	} else {
291 		if (!burst && mt_safe)
292 			return pipeline_atq_worker_multi_stage_tx(arg);
293 		else if (!burst && !mt_safe)
294 			return pipeline_atq_worker_multi_stage_fwd(arg);
295 		if (burst && mt_safe)
296 			return pipeline_atq_worker_multi_stage_burst_tx(arg);
297 		else if (burst && !mt_safe)
298 			return pipeline_atq_worker_multi_stage_burst_fwd(arg);
299 	}
300 	rte_panic("invalid worker\n");
301 }
302 
303 static int
304 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
305 {
306 	struct test_pipeline *t = evt_test_priv(test);
307 
308 	if (t->mt_unsafe)
309 		rte_service_component_runstate_set(t->tx_service.service_id, 1);
310 	return pipeline_launch_lcores(test, opt, worker_wrapper);
311 }
312 
313 static int
314 pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
315 {
316 	int ret;
317 	int nb_ports;
318 	int nb_queues;
319 	uint8_t queue;
320 	struct rte_event_dev_info info;
321 	struct test_pipeline *t = evt_test_priv(test);
322 	uint8_t tx_evqueue_id = 0;
323 	uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
324 	uint8_t nb_worker_queues = 0;
325 
326 	nb_ports = evt_nr_active_lcores(opt->wlcores);
327 	nb_queues = rte_eth_dev_count_avail();
328 
329 	/* One extra port and queueu for Tx service */
330 	if (t->mt_unsafe) {
331 		tx_evqueue_id = nb_queues;
332 		nb_ports++;
333 		nb_queues++;
334 	}
335 
336 
337 	rte_event_dev_info_get(opt->dev_id, &info);
338 
339 	const struct rte_event_dev_config config = {
340 			.nb_event_queues = nb_queues,
341 			.nb_event_ports = nb_ports,
342 			.nb_events_limit  = info.max_num_events,
343 			.nb_event_queue_flows = opt->nb_flows,
344 			.nb_event_port_dequeue_depth =
345 				info.max_event_port_dequeue_depth,
346 			.nb_event_port_enqueue_depth =
347 				info.max_event_port_enqueue_depth,
348 	};
349 	ret = rte_event_dev_configure(opt->dev_id, &config);
350 	if (ret) {
351 		evt_err("failed to configure eventdev %d", opt->dev_id);
352 		return ret;
353 	}
354 
355 	struct rte_event_queue_conf q_conf = {
356 			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
357 			.nb_atomic_flows = opt->nb_flows,
358 			.nb_atomic_order_sequences = opt->nb_flows,
359 	};
360 	/* queue configurations */
361 	for (queue = 0; queue < nb_queues; queue++) {
362 		q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
363 
364 		if (t->mt_unsafe) {
365 			if (queue == tx_evqueue_id) {
366 				q_conf.event_queue_cfg =
367 					RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
368 			} else {
369 				queue_arr[nb_worker_queues] = queue;
370 				nb_worker_queues++;
371 			}
372 		}
373 
374 		ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
375 		if (ret) {
376 			evt_err("failed to setup queue=%d", queue);
377 			return ret;
378 		}
379 	}
380 
381 	if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
382 		opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
383 
384 	/* port configuration */
385 	const struct rte_event_port_conf p_conf = {
386 			.dequeue_depth = opt->wkr_deq_dep,
387 			.enqueue_depth = info.max_event_port_dequeue_depth,
388 			.new_event_threshold = info.max_num_events,
389 	};
390 
391 	if (t->mt_unsafe) {
392 		ret = pipeline_event_port_setup(test, opt, queue_arr,
393 				nb_worker_queues, p_conf);
394 		if (ret)
395 			return ret;
396 
397 		ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id,
398 				nb_ports - 1, p_conf);
399 	} else
400 		ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
401 				p_conf);
402 
403 	if (ret)
404 		return ret;
405 
406 	/*
407 	 * The pipelines are setup in the following manner:
408 	 *
409 	 * eth_dev_count = 2, nb_stages = 2, atq mode
410 	 *
411 	 * Multi thread safe :
412 	 *	queues = 2
413 	 *	stride = 1
414 	 *
415 	 *	event queue pipelines:
416 	 *	eth0 -> q0 ->tx
417 	 *	eth1 -> q1 ->tx
418 	 *
419 	 *	q0, q1 are configured as ATQ so, all the different stages can
420 	 *	be enqueued on the same queue.
421 	 *
422 	 * Multi thread unsafe :
423 	 *	queues = 3
424 	 *	stride = 1
425 	 *
426 	 *	event queue pipelines:
427 	 *	eth0 -> q0
428 	 *		  } (q3->tx) Tx service
429 	 *	eth1 -> q1
430 	 *
431 	 *	q0,q1 are configured as stated above.
432 	 *	q3 configured as SINGLE_LINK|ATOMIC.
433 	 */
434 	ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
435 	if (ret)
436 		return ret;
437 
438 	if (!evt_has_distributed_sched(opt->dev_id)) {
439 		uint32_t service_id;
440 		rte_event_dev_service_id_get(opt->dev_id, &service_id);
441 		ret = evt_service_setup(service_id);
442 		if (ret) {
443 			evt_err("No service lcore found to run event dev.");
444 			return ret;
445 		}
446 	}
447 
448 	ret = rte_event_dev_start(opt->dev_id);
449 	if (ret) {
450 		evt_err("failed to start eventdev %d", opt->dev_id);
451 		return ret;
452 	}
453 
454 	return 0;
455 }
456 
457 static void
458 pipeline_atq_opt_dump(struct evt_options *opt)
459 {
460 	pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt));
461 }
462 
463 static int
464 pipeline_atq_opt_check(struct evt_options *opt)
465 {
466 	return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt));
467 }
468 
469 static bool
470 pipeline_atq_capability_check(struct evt_options *opt)
471 {
472 	struct rte_event_dev_info dev_info;
473 
474 	rte_event_dev_info_get(opt->dev_id, &dev_info);
475 	if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) ||
476 			dev_info.max_event_ports <
477 			evt_nr_active_lcores(opt->wlcores)) {
478 		evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
479 			pipeline_atq_nb_event_queues(opt),
480 			dev_info.max_event_queues,
481 			evt_nr_active_lcores(opt->wlcores),
482 			dev_info.max_event_ports);
483 	}
484 
485 	return true;
486 }
487 
488 static const struct evt_test_ops pipeline_atq =  {
489 	.cap_check          = pipeline_atq_capability_check,
490 	.opt_check          = pipeline_atq_opt_check,
491 	.opt_dump           = pipeline_atq_opt_dump,
492 	.test_setup         = pipeline_test_setup,
493 	.mempool_setup      = pipeline_mempool_setup,
494 	.ethdev_setup	    = pipeline_ethdev_setup,
495 	.eventdev_setup     = pipeline_atq_eventdev_setup,
496 	.launch_lcores      = pipeline_atq_launch_lcores,
497 	.eventdev_destroy   = pipeline_eventdev_destroy,
498 	.mempool_destroy    = pipeline_mempool_destroy,
499 	.ethdev_destroy	    = pipeline_ethdev_destroy,
500 	.test_result        = pipeline_test_result,
501 	.test_destroy       = pipeline_test_destroy,
502 };
503 
504 EVT_TEST_REGISTER(pipeline_atq);
505