xref: /dpdk/app/test-eventdev/test_pipeline_common.c (revision f9e1d67f237a00cf94feb4413e3d978fdd632052)
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5 
6 #include "test_pipeline_common.h"
7 
8 int
9 pipeline_test_result(struct evt_test *test, struct evt_options *opt)
10 {
11 	RTE_SET_USED(opt);
12 	int i;
13 	uint64_t total = 0;
14 	struct test_pipeline *t = evt_test_priv(test);
15 
16 	evt_info("Packet distribution across worker cores :");
17 	for (i = 0; i < t->nb_workers; i++)
18 		total += t->worker[i].processed_pkts;
19 	for (i = 0; i < t->nb_workers; i++)
20 		evt_info("Worker %d packets: "CLGRN"%"PRIx64""CLNRM" percentage:"
21 				CLGRN" %3.2f"CLNRM, i,
22 				t->worker[i].processed_pkts,
23 				(((double)t->worker[i].processed_pkts)/total)
24 				* 100);
25 	return t->result;
26 }
27 
28 void
29 pipeline_opt_dump(struct evt_options *opt, uint8_t nb_queues)
30 {
31 	evt_dump("nb_worker_lcores", "%d", evt_nr_active_lcores(opt->wlcores));
32 	evt_dump_worker_lcores(opt);
33 	evt_dump_nb_stages(opt);
34 	evt_dump("nb_evdev_ports", "%d", pipeline_nb_event_ports(opt));
35 	evt_dump("nb_evdev_queues", "%d", nb_queues);
36 	evt_dump_queue_priority(opt);
37 	evt_dump_sched_type_list(opt);
38 	evt_dump_producer_type(opt);
39 	evt_dump("nb_eth_rx_queues", "%d", opt->eth_queues);
40 	evt_dump("event_vector", "%d", opt->ena_vector);
41 	if (opt->ena_vector) {
42 		evt_dump("vector_size", "%d", opt->vector_size);
43 		evt_dump("vector_tmo_ns", "%" PRIu64 "", opt->vector_tmo_nsec);
44 	}
45 }
46 
47 static inline uint64_t
48 processed_pkts(struct test_pipeline *t)
49 {
50 	uint8_t i;
51 	uint64_t total = 0;
52 
53 	for (i = 0; i < t->nb_workers; i++)
54 		total += t->worker[i].processed_pkts;
55 
56 	return total;
57 }
58 
59 /* RFC863 discard port */
60 #define UDP_SRC_PORT 9
61 #define UDP_DST_PORT 9
62 
63 /* RFC2544 reserved test subnet 192.18.0.0 */
64 #define IP_SRC_ADDR(x, y) ((192U << 24) | (18 << 16) | ((x) << 8) | (y))
65 #define IP_DST_ADDR(x, y) ((192U << 24) | (18 << 16) | ((x) << 8) | (y))
66 
67 #define IP_DEFTTL  64 /* from RFC 1340. */
68 #define IP_VERSION 0x40
69 #define IP_HDRLEN  0x05 /* default IP header length == five 32-bits words. */
70 #define IP_VHL_DEF (IP_VERSION | IP_HDRLEN)
71 
72 static void
73 setup_pkt_udp_ip_headers(struct rte_ipv4_hdr *ip_hdr,
74 			 struct rte_udp_hdr *udp_hdr, uint16_t pkt_data_len,
75 			 uint8_t port, uint8_t flow)
76 {
77 	uint16_t pkt_len;
78 
79 	/*
80 	 * Initialize UDP header.
81 	 */
82 	pkt_len = (uint16_t)(pkt_data_len + sizeof(struct rte_udp_hdr));
83 	udp_hdr->src_port = rte_cpu_to_be_16(UDP_SRC_PORT);
84 	udp_hdr->dst_port = rte_cpu_to_be_16(UDP_DST_PORT);
85 	udp_hdr->dgram_len = rte_cpu_to_be_16(pkt_len);
86 	udp_hdr->dgram_cksum = 0; /* No UDP checksum. */
87 
88 	/*
89 	 * Initialize IP header.
90 	 */
91 	pkt_len = (uint16_t)(pkt_len + sizeof(struct rte_ipv4_hdr));
92 	ip_hdr->version_ihl = IP_VHL_DEF;
93 	ip_hdr->type_of_service = 0;
94 	ip_hdr->fragment_offset = 0;
95 	ip_hdr->time_to_live = IP_DEFTTL;
96 	ip_hdr->next_proto_id = IPPROTO_UDP;
97 	ip_hdr->packet_id = 0;
98 	ip_hdr->total_length = rte_cpu_to_be_16(pkt_len);
99 	ip_hdr->src_addr = rte_cpu_to_be_32(IP_SRC_ADDR(port, 1));
100 	ip_hdr->dst_addr = rte_cpu_to_be_32(IP_DST_ADDR(port + 1, flow));
101 
102 	/*
103 	 * Compute IP header checksum.
104 	 */
105 	ip_hdr->hdr_checksum = rte_ipv4_cksum_simple(ip_hdr);
106 }
107 
108 static void
109 pipeline_tx_first(struct test_pipeline *t, struct evt_options *opt)
110 {
111 #define TX_DEF_PACKET_LEN 64
112 	uint16_t eth_port_id = 0;
113 	uint16_t pkt_sz, rc;
114 	uint32_t i;
115 
116 	pkt_sz = opt->tx_pkt_sz;
117 	if (pkt_sz > opt->max_pkt_sz)
118 		pkt_sz = opt->max_pkt_sz;
119 	if (!pkt_sz)
120 		pkt_sz = TX_DEF_PACKET_LEN;
121 
122 	RTE_ETH_FOREACH_DEV(eth_port_id) {
123 		struct rte_ether_addr src_mac;
124 		struct rte_ether_addr dst_mac;
125 		struct rte_ether_hdr eth_hdr;
126 
127 		/* Send to the same dest.mac as port mac */
128 		rte_eth_macaddr_get(eth_port_id, &dst_mac);
129 		rte_eth_random_addr((uint8_t *)&src_mac);
130 
131 		rte_ether_addr_copy(&dst_mac, &eth_hdr.dst_addr);
132 		rte_ether_addr_copy(&src_mac, &eth_hdr.src_addr);
133 		eth_hdr.ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
134 
135 		for (i = 0; i < opt->tx_first; i++) {
136 			struct rte_udp_hdr *pkt_udp_hdr;
137 			struct rte_ipv4_hdr ip_hdr;
138 			struct rte_udp_hdr udp_hdr;
139 			struct rte_mbuf *mbuf;
140 
141 			mbuf = rte_pktmbuf_alloc(
142 				opt->per_port_pool ? t->pool[i] : t->pool[0]);
143 			if (mbuf == NULL)
144 				continue;
145 
146 			setup_pkt_udp_ip_headers(
147 				&ip_hdr, &udp_hdr,
148 				pkt_sz - sizeof(struct rte_ether_hdr) -
149 					sizeof(struct rte_ipv4_hdr) -
150 					sizeof(struct rte_udp_hdr),
151 				eth_port_id, i);
152 			mbuf->port = eth_port_id;
153 			mbuf->data_len = pkt_sz;
154 			mbuf->pkt_len = pkt_sz;
155 
156 			/* Copy Ethernet header */
157 			rte_memcpy(rte_pktmbuf_mtod_offset(mbuf, char *, 0),
158 				   &eth_hdr, sizeof(struct rte_ether_hdr));
159 
160 			/* Copy Ipv4 header */
161 			rte_memcpy(rte_pktmbuf_mtod_offset(
162 					   mbuf, char *,
163 					   sizeof(struct rte_ether_hdr)),
164 				   &ip_hdr, sizeof(struct rte_ipv4_hdr));
165 
166 			/* Copy UDP header */
167 			rte_memcpy(
168 				rte_pktmbuf_mtod_offset(
169 					mbuf, char *,
170 					sizeof(struct rte_ipv4_hdr) +
171 						sizeof(struct rte_ether_hdr)),
172 				&udp_hdr, sizeof(struct rte_udp_hdr));
173 			pkt_udp_hdr = rte_pktmbuf_mtod_offset(
174 				mbuf, struct rte_udp_hdr *,
175 				sizeof(struct rte_ipv4_hdr) +
176 					sizeof(struct rte_ether_hdr));
177 			pkt_udp_hdr->src_port =
178 				rte_cpu_to_be_16(UDP_SRC_PORT + i);
179 			pkt_udp_hdr->dst_port =
180 				rte_cpu_to_be_16(UDP_SRC_PORT + i);
181 
182 			rc = rte_eth_tx_burst(eth_port_id, 0, &mbuf, 1);
183 			if (rc == 0)
184 				rte_pktmbuf_free(mbuf);
185 		}
186 	}
187 }
188 
189 int
190 pipeline_launch_lcores(struct evt_test *test, struct evt_options *opt,
191 		int (*worker)(void *))
192 {
193 	struct test_pipeline *t = evt_test_priv(test);
194 	int ret, lcore_id;
195 	int port_idx = 0;
196 
197 	if (opt->tx_first)
198 		pipeline_tx_first(t, opt);
199 
200 	/* launch workers */
201 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
202 		if (!(opt->wlcores[lcore_id]))
203 			continue;
204 
205 		ret = rte_eal_remote_launch(worker,
206 				 &t->worker[port_idx], lcore_id);
207 		if (ret) {
208 			evt_err("failed to launch worker %d", lcore_id);
209 			return ret;
210 		}
211 		port_idx++;
212 	}
213 
214 	uint64_t perf_cycles = rte_get_timer_cycles();
215 	const uint64_t perf_sample = rte_get_timer_hz();
216 
217 	static float total_mpps;
218 	static uint64_t samples;
219 
220 	uint64_t prev_pkts = 0;
221 
222 	while (t->done == false) {
223 		const uint64_t new_cycles = rte_get_timer_cycles();
224 
225 		if ((new_cycles - perf_cycles) > perf_sample) {
226 			const uint64_t curr_pkts = processed_pkts(t);
227 
228 			float mpps = (float)(curr_pkts - prev_pkts)/1000000;
229 
230 			prev_pkts = curr_pkts;
231 			perf_cycles = new_cycles;
232 			total_mpps += mpps;
233 			++samples;
234 			printf(CLGRN"\r%.3f mpps avg %.3f mpps"CLNRM,
235 					mpps, total_mpps/samples);
236 			fflush(stdout);
237 		}
238 	}
239 	printf("\n");
240 	return 0;
241 }
242 
243 int
244 pipeline_opt_check(struct evt_options *opt, uint64_t nb_queues)
245 {
246 	unsigned int lcores;
247 
248 	/* N worker + main */
249 	lcores = 2;
250 
251 	if (opt->prod_type != EVT_PROD_TYPE_ETH_RX_ADPTR) {
252 		evt_err("Invalid producer type '%s' valid producer '%s'",
253 			evt_prod_id_to_name(opt->prod_type),
254 			evt_prod_id_to_name(EVT_PROD_TYPE_ETH_RX_ADPTR));
255 		return -1;
256 	}
257 
258 	if (!rte_eth_dev_count_avail()) {
259 		evt_err("test needs minimum 1 ethernet dev");
260 		return -1;
261 	}
262 
263 	if (rte_lcore_count() < lcores) {
264 		evt_err("test need minimum %d lcores", lcores);
265 		return -1;
266 	}
267 
268 	/* Validate worker lcores */
269 	if (evt_lcores_has_overlap(opt->wlcores, rte_get_main_lcore())) {
270 		evt_err("worker lcores overlaps with main lcore");
271 		return -1;
272 	}
273 	if (evt_has_disabled_lcore(opt->wlcores)) {
274 		evt_err("one or more workers lcores are not enabled");
275 		return -1;
276 	}
277 	if (!evt_has_active_lcore(opt->wlcores)) {
278 		evt_err("minimum one worker is required");
279 		return -1;
280 	}
281 
282 	if (nb_queues > EVT_MAX_QUEUES) {
283 		evt_err("number of queues exceeds %d", EVT_MAX_QUEUES);
284 		return -1;
285 	}
286 	if (pipeline_nb_event_ports(opt) > EVT_MAX_PORTS) {
287 		evt_err("number of ports exceeds %d", EVT_MAX_PORTS);
288 		return -1;
289 	}
290 
291 	if (opt->prod_type != EVT_PROD_TYPE_ETH_RX_ADPTR) {
292 		evt_err("Invalid producer type, only --prod_type_ethdev is supported");
293 		return -1;
294 	}
295 
296 	if (evt_has_invalid_stage(opt))
297 		return -1;
298 
299 	if (evt_has_invalid_sched_type(opt))
300 		return -1;
301 
302 	return 0;
303 }
304 
305 #define NB_RX_DESC			128
306 #define NB_TX_DESC			512
307 int
308 pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
309 {
310 	uint16_t i, j;
311 	int ret;
312 	uint8_t nb_queues = 1;
313 	struct test_pipeline *t = evt_test_priv(test);
314 	struct rte_eth_rxconf rx_conf;
315 	struct rte_eth_conf port_conf = {
316 		.rxmode = {
317 			.mq_mode = RTE_ETH_MQ_RX_RSS,
318 		},
319 		.rx_adv_conf = {
320 			.rss_conf = {
321 				.rss_key = NULL,
322 				.rss_hf = RTE_ETH_RSS_IP,
323 			},
324 		},
325 	};
326 
327 	if (!rte_eth_dev_count_avail()) {
328 		evt_err("No ethernet ports found.");
329 		return -ENODEV;
330 	}
331 
332 	if (opt->max_pkt_sz < RTE_ETHER_MIN_LEN) {
333 		evt_err("max_pkt_sz can not be less than %d",
334 			RTE_ETHER_MIN_LEN);
335 		return -EINVAL;
336 	}
337 
338 	port_conf.rxmode.mtu = opt->max_pkt_sz - RTE_ETHER_HDR_LEN -
339 		RTE_ETHER_CRC_LEN;
340 
341 	t->internal_port = 1;
342 	RTE_ETH_FOREACH_DEV(i) {
343 		struct rte_eth_dev_info dev_info;
344 		struct rte_eth_conf local_port_conf = port_conf;
345 		uint32_t caps = 0;
346 
347 		ret = rte_event_eth_tx_adapter_caps_get(opt->dev_id, i, &caps);
348 		if (ret != 0) {
349 			evt_err("failed to get event tx adapter[%d] caps", i);
350 			return ret;
351 		}
352 
353 		if (!(caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT))
354 			t->internal_port = 0;
355 
356 		ret = rte_event_eth_rx_adapter_caps_get(opt->dev_id, i, &caps);
357 		if (ret != 0) {
358 			evt_err("failed to get event tx adapter[%d] caps", i);
359 			return ret;
360 		}
361 
362 		if (!(caps & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT))
363 			local_port_conf.rxmode.offloads |=
364 				RTE_ETH_RX_OFFLOAD_RSS_HASH;
365 
366 		ret = rte_eth_dev_info_get(i, &dev_info);
367 		if (ret != 0) {
368 			evt_err("Error during getting device (port %u) info: %s\n",
369 				i, strerror(-ret));
370 			return ret;
371 		}
372 
373 		/* Enable mbuf fast free if PMD has the capability. */
374 		if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
375 			local_port_conf.txmode.offloads |=
376 				RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
377 
378 		rx_conf = dev_info.default_rxconf;
379 		rx_conf.offloads = port_conf.rxmode.offloads;
380 
381 		local_port_conf.rx_adv_conf.rss_conf.rss_hf &=
382 			dev_info.flow_type_rss_offloads;
383 		if (local_port_conf.rx_adv_conf.rss_conf.rss_hf !=
384 				port_conf.rx_adv_conf.rss_conf.rss_hf) {
385 			evt_info("Port %u modified RSS hash function based on hardware support,"
386 				"requested:%#"PRIx64" configured:%#"PRIx64"",
387 				i,
388 				port_conf.rx_adv_conf.rss_conf.rss_hf,
389 				local_port_conf.rx_adv_conf.rss_conf.rss_hf);
390 		}
391 
392 		if (rte_eth_dev_configure(i, opt->eth_queues, nb_queues,
393 					  &local_port_conf) < 0) {
394 			evt_err("Failed to configure eth port [%d]", i);
395 			return -EINVAL;
396 		}
397 
398 		for (j = 0; j < opt->eth_queues; j++) {
399 			if (rte_eth_rx_queue_setup(
400 				    i, j, NB_RX_DESC, rte_socket_id(), &rx_conf,
401 				    opt->per_port_pool ? t->pool[i] :
402 							      t->pool[0]) < 0) {
403 				evt_err("Failed to setup eth port [%d] rx_queue: %d.",
404 					i, 0);
405 				return -EINVAL;
406 			}
407 		}
408 
409 		if (rte_eth_tx_queue_setup(i, 0, NB_TX_DESC,
410 					rte_socket_id(), NULL) < 0) {
411 			evt_err("Failed to setup eth port [%d] tx_queue: %d.",
412 					i, 0);
413 			return -EINVAL;
414 		}
415 
416 		ret = rte_eth_promiscuous_enable(i);
417 		if (ret != 0) {
418 			evt_err("Failed to enable promiscuous mode for eth port [%d]: %s",
419 				i, rte_strerror(-ret));
420 			return ret;
421 		}
422 	}
423 
424 	return 0;
425 }
426 
427 int
428 pipeline_event_port_setup(struct evt_test *test, struct evt_options *opt,
429 		uint8_t *queue_arr, uint8_t nb_queues,
430 		const struct rte_event_port_conf p_conf)
431 {
432 	int ret;
433 	uint8_t port;
434 	struct test_pipeline *t = evt_test_priv(test);
435 
436 
437 	/* setup one port per worker, linking to all queues */
438 	for (port = 0; port < evt_nr_active_lcores(opt->wlcores); port++) {
439 		struct worker_data *w = &t->worker[port];
440 
441 		w->dev_id = opt->dev_id;
442 		w->port_id = port;
443 		w->t = t;
444 		w->processed_pkts = 0;
445 
446 		ret = rte_event_port_setup(opt->dev_id, port, &p_conf);
447 		if (ret) {
448 			evt_err("failed to setup port %d", port);
449 			return ret;
450 		}
451 
452 		if (rte_event_port_link(opt->dev_id, port, queue_arr, NULL,
453 					nb_queues) != nb_queues)
454 			goto link_fail;
455 	}
456 
457 	return 0;
458 
459 link_fail:
460 	evt_err("failed to link queues to port %d", port);
461 	return -EINVAL;
462 }
463 
464 int
465 pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride,
466 		struct rte_event_port_conf prod_conf)
467 {
468 	int ret = 0;
469 	uint16_t prod;
470 	struct rte_mempool *vector_pool = NULL;
471 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
472 
473 	memset(&queue_conf, 0,
474 			sizeof(struct rte_event_eth_rx_adapter_queue_conf));
475 	queue_conf.ev.sched_type = opt->sched_type_list[0];
476 	if (opt->ena_vector) {
477 		unsigned int nb_elem = (opt->pool_sz / opt->vector_size) << 1;
478 
479 		nb_elem = RTE_MAX(512U, nb_elem);
480 		nb_elem += evt_nr_active_lcores(opt->wlcores) * 32;
481 		vector_pool = rte_event_vector_pool_create(
482 			"vector_pool", nb_elem, 32, opt->vector_size,
483 			opt->socket_id);
484 		if (vector_pool == NULL) {
485 			evt_err("failed to create event vector pool");
486 			return -ENOMEM;
487 		}
488 	}
489 	RTE_ETH_FOREACH_DEV(prod) {
490 		struct rte_event_eth_rx_adapter_vector_limits limits;
491 		uint32_t cap;
492 
493 		ret = rte_event_eth_rx_adapter_caps_get(opt->dev_id,
494 				prod, &cap);
495 		if (ret) {
496 			evt_err("failed to get event rx adapter[%d]"
497 					" capabilities",
498 					opt->dev_id);
499 			return ret;
500 		}
501 
502 		if (opt->ena_vector) {
503 			memset(&limits, 0, sizeof(limits));
504 			ret = rte_event_eth_rx_adapter_vector_limits_get(
505 				opt->dev_id, prod, &limits);
506 			if (ret) {
507 				evt_err("failed to get vector limits");
508 				return ret;
509 			}
510 
511 			if (opt->vector_size < limits.min_sz ||
512 			    opt->vector_size > limits.max_sz) {
513 				evt_err("Vector size [%d] not within limits max[%d] min[%d]",
514 					opt->vector_size, limits.max_sz,
515 					limits.min_sz);
516 				return -EINVAL;
517 			}
518 
519 			if (limits.log2_sz &&
520 			    !rte_is_power_of_2(opt->vector_size)) {
521 				evt_err("Vector size [%d] not power of 2",
522 					opt->vector_size);
523 				return -EINVAL;
524 			}
525 
526 			if (opt->vector_tmo_nsec > limits.max_timeout_ns ||
527 			    opt->vector_tmo_nsec < limits.min_timeout_ns) {
528 				evt_err("Vector timeout [%" PRIu64
529 					"] not within limits max[%" PRIu64
530 					"] min[%" PRIu64 "]",
531 					opt->vector_tmo_nsec,
532 					limits.max_timeout_ns,
533 					limits.min_timeout_ns);
534 				return -EINVAL;
535 			}
536 
537 			if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) {
538 				queue_conf.vector_sz = opt->vector_size;
539 				queue_conf.vector_timeout_ns =
540 					opt->vector_tmo_nsec;
541 				queue_conf.rx_queue_flags |=
542 				RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR;
543 				queue_conf.vector_mp = vector_pool;
544 			} else {
545 				evt_err("Rx adapter doesn't support event vector");
546 				return -EINVAL;
547 			}
548 		}
549 		queue_conf.ev.queue_id = prod * stride;
550 		ret = rte_event_eth_rx_adapter_create(prod, opt->dev_id,
551 				&prod_conf);
552 		if (ret) {
553 			evt_err("failed to create rx adapter[%d]", prod);
554 			return ret;
555 		}
556 		ret = rte_event_eth_rx_adapter_queue_add(prod, prod, -1,
557 				&queue_conf);
558 		if (ret) {
559 			evt_err("failed to add rx queues to adapter[%d]", prod);
560 			return ret;
561 		}
562 
563 		if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
564 			uint32_t service_id = -1U;
565 
566 			rte_event_eth_rx_adapter_service_id_get(prod,
567 					&service_id);
568 			ret = evt_service_setup(service_id);
569 			if (ret) {
570 				evt_err("Failed to setup service core"
571 						" for Rx adapter");
572 				return ret;
573 			}
574 		}
575 
576 		evt_info("Port[%d] using Rx adapter[%d] configured", prod,
577 				prod);
578 	}
579 
580 	return ret;
581 }
582 
583 int
584 pipeline_event_tx_adapter_setup(struct evt_options *opt,
585 		struct rte_event_port_conf port_conf)
586 {
587 	int ret = 0;
588 	uint16_t consm;
589 
590 	RTE_ETH_FOREACH_DEV(consm) {
591 		uint32_t cap;
592 
593 		ret = rte_event_eth_tx_adapter_caps_get(opt->dev_id,
594 				consm, &cap);
595 		if (ret) {
596 			evt_err("failed to get event tx adapter[%d] caps",
597 					consm);
598 			return ret;
599 		}
600 
601 		if (opt->ena_vector) {
602 			if (!(cap &
603 			      RTE_EVENT_ETH_TX_ADAPTER_CAP_EVENT_VECTOR)) {
604 				evt_err("Tx adapter doesn't support event vector");
605 				return -EINVAL;
606 			}
607 		}
608 
609 		ret = rte_event_eth_tx_adapter_create(consm, opt->dev_id,
610 				&port_conf);
611 		if (ret) {
612 			evt_err("failed to create tx adapter[%d]", consm);
613 			return ret;
614 		}
615 
616 		ret = rte_event_eth_tx_adapter_queue_add(consm, consm, -1);
617 		if (ret) {
618 			evt_err("failed to add tx queues to adapter[%d]",
619 					consm);
620 			return ret;
621 		}
622 
623 		if (!(cap & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)) {
624 			uint32_t service_id = -1U;
625 
626 			ret = rte_event_eth_tx_adapter_service_id_get(consm,
627 								   &service_id);
628 			if (ret != -ESRCH && ret != 0) {
629 				evt_err("Failed to get Tx adptr service ID");
630 				return ret;
631 			}
632 			ret = evt_service_setup(service_id);
633 			if (ret) {
634 				evt_err("Failed to setup service core"
635 						" for Tx adapter");
636 				return ret;
637 			}
638 		}
639 
640 		evt_info("Port[%d] using Tx adapter[%d] Configured", consm,
641 				consm);
642 	}
643 
644 	return ret;
645 }
646 
647 static void
648 pipeline_vector_array_free(struct rte_event events[], uint16_t num)
649 {
650 	uint16_t i;
651 
652 	for (i = 0; i < num; i++) {
653 		rte_pktmbuf_free_bulk(
654 			&events[i].vec->mbufs[events[i].vec->elem_offset],
655 			events[i].vec->nb_elem);
656 		rte_mempool_put(rte_mempool_from_obj(events[i].vec),
657 				events[i].vec);
658 	}
659 }
660 
661 static void
662 pipeline_event_port_flush(uint8_t dev_id __rte_unused, struct rte_event ev,
663 			  void *args __rte_unused)
664 {
665 	if (ev.event_type & RTE_EVENT_TYPE_VECTOR)
666 		pipeline_vector_array_free(&ev, 1);
667 	else
668 		rte_pktmbuf_free(ev.mbuf);
669 }
670 
671 void
672 pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[],
673 			uint16_t enq, uint16_t deq)
674 {
675 	int i;
676 
677 	if (deq) {
678 		for (i = enq; i < deq; i++) {
679 			if (ev[i].op == RTE_EVENT_OP_RELEASE)
680 				continue;
681 			if (ev[i].event_type & RTE_EVENT_TYPE_VECTOR)
682 				pipeline_vector_array_free(&ev[i], 1);
683 			else
684 				rte_pktmbuf_free(ev[i].mbuf);
685 		}
686 
687 		for (i = 0; i < deq; i++)
688 			ev[i].op = RTE_EVENT_OP_RELEASE;
689 
690 		rte_event_enqueue_burst(dev, port, ev, deq);
691 	}
692 
693 	rte_event_port_quiesce(dev, port, pipeline_event_port_flush, NULL);
694 }
695 
696 void
697 pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt)
698 {
699 	uint16_t i, j;
700 	RTE_SET_USED(test);
701 
702 	if (opt->prod_type == EVT_PROD_TYPE_ETH_RX_ADPTR) {
703 		RTE_ETH_FOREACH_DEV(i) {
704 			rte_event_eth_rx_adapter_stop(i);
705 			rte_event_eth_rx_adapter_queue_del(i, i, -1);
706 			for (j = 0; j < opt->eth_queues; j++)
707 				rte_eth_dev_rx_queue_stop(i, j);
708 		}
709 	}
710 }
711 
712 void
713 pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt)
714 {
715 	uint16_t i;
716 	RTE_SET_USED(test);
717 	RTE_SET_USED(opt);
718 
719 	RTE_ETH_FOREACH_DEV(i) {
720 		rte_event_eth_tx_adapter_stop(i);
721 		rte_event_eth_tx_adapter_queue_del(i, i, -1);
722 		rte_eth_dev_tx_queue_stop(i, 0);
723 		rte_eth_dev_stop(i);
724 	}
725 }
726 
727 void
728 pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt)
729 {
730 	RTE_SET_USED(test);
731 
732 	rte_event_dev_stop(opt->dev_id);
733 	rte_event_dev_close(opt->dev_id);
734 }
735 
736 int
737 pipeline_mempool_setup(struct evt_test *test, struct evt_options *opt)
738 {
739 	struct test_pipeline *t = evt_test_priv(test);
740 	int i, ret;
741 
742 	if (!opt->mbuf_sz)
743 		opt->mbuf_sz = RTE_MBUF_DEFAULT_BUF_SIZE;
744 
745 	if (!opt->max_pkt_sz)
746 		opt->max_pkt_sz = RTE_ETHER_MAX_LEN;
747 
748 	RTE_ETH_FOREACH_DEV(i) {
749 		struct rte_eth_dev_info dev_info;
750 		uint16_t data_size = 0;
751 
752 		memset(&dev_info, 0, sizeof(dev_info));
753 		ret = rte_eth_dev_info_get(i, &dev_info);
754 		if (ret != 0) {
755 			evt_err("Error during getting device (port %u) info: %s\n",
756 				i, strerror(-ret));
757 			return ret;
758 		}
759 
760 		if (dev_info.rx_desc_lim.nb_mtu_seg_max != UINT16_MAX &&
761 				dev_info.rx_desc_lim.nb_mtu_seg_max != 0) {
762 			data_size = opt->max_pkt_sz /
763 				dev_info.rx_desc_lim.nb_mtu_seg_max;
764 			data_size += RTE_PKTMBUF_HEADROOM;
765 
766 			if (data_size  > opt->mbuf_sz)
767 				opt->mbuf_sz = data_size;
768 		}
769 		if (opt->per_port_pool) {
770 			char name[RTE_MEMPOOL_NAMESIZE];
771 
772 			snprintf(name, RTE_MEMPOOL_NAMESIZE, "%s-%d",
773 				 test->name, i);
774 			t->pool[i] = rte_pktmbuf_pool_create(
775 				name,	      /* mempool name */
776 				opt->pool_sz, /* number of elements*/
777 				0,	      /* cache size*/
778 				0, opt->mbuf_sz, opt->socket_id); /* flags */
779 
780 			if (t->pool[i] == NULL) {
781 				evt_err("failed to create mempool %s", name);
782 				return -ENOMEM;
783 			}
784 		}
785 	}
786 
787 	if (!opt->per_port_pool) {
788 		t->pool[0] = rte_pktmbuf_pool_create(
789 			test->name,   /* mempool name */
790 			opt->pool_sz, /* number of elements*/
791 			0,	      /* cache size*/
792 			0, opt->mbuf_sz, opt->socket_id); /* flags */
793 
794 		if (t->pool[0] == NULL) {
795 			evt_err("failed to create mempool");
796 			return -ENOMEM;
797 		}
798 	}
799 
800 	return 0;
801 }
802 
803 void
804 pipeline_mempool_destroy(struct evt_test *test, struct evt_options *opt)
805 {
806 	struct test_pipeline *t = evt_test_priv(test);
807 	int i;
808 
809 	RTE_SET_USED(opt);
810 	if (opt->per_port_pool) {
811 		RTE_ETH_FOREACH_DEV(i)
812 			rte_mempool_free(t->pool[i]);
813 	} else {
814 		rte_mempool_free(t->pool[0]);
815 	}
816 }
817 
818 int
819 pipeline_test_setup(struct evt_test *test, struct evt_options *opt)
820 {
821 	void *test_pipeline;
822 
823 	test_pipeline = rte_zmalloc_socket(test->name,
824 			sizeof(struct test_pipeline), RTE_CACHE_LINE_SIZE,
825 			opt->socket_id);
826 	if (test_pipeline  == NULL) {
827 		evt_err("failed to allocate test_pipeline memory");
828 		goto nomem;
829 	}
830 	test->test_priv = test_pipeline;
831 
832 	struct test_pipeline *t = evt_test_priv(test);
833 
834 	t->nb_workers = evt_nr_active_lcores(opt->wlcores);
835 	t->outstand_pkts = opt->nb_pkts * evt_nr_active_lcores(opt->wlcores);
836 	t->done = false;
837 	t->nb_flows = opt->nb_flows;
838 	t->result = EVT_TEST_FAILED;
839 	t->opt = opt;
840 	opt->prod_type = EVT_PROD_TYPE_ETH_RX_ADPTR;
841 	memcpy(t->sched_type_list, opt->sched_type_list,
842 			sizeof(opt->sched_type_list));
843 	return 0;
844 nomem:
845 	return -ENOMEM;
846 }
847 
848 void
849 pipeline_test_destroy(struct evt_test *test, struct evt_options *opt)
850 {
851 	RTE_SET_USED(opt);
852 
853 	rte_free(test->test_priv);
854 }
855