xref: /dpdk/app/test-eventdev/test_pipeline_common.c (revision 5f111f924f1c1a0dc3a4737f62ed2b8cab058d68)
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5 
6 #include "test_pipeline_common.h"
7 
8 int
9 pipeline_test_result(struct evt_test *test, struct evt_options *opt)
10 {
11 	RTE_SET_USED(opt);
12 	int i;
13 	uint64_t total = 0;
14 	struct test_pipeline *t = evt_test_priv(test);
15 
16 	evt_info("Packet distribution across worker cores :");
17 	for (i = 0; i < t->nb_workers; i++)
18 		total += t->worker[i].processed_pkts;
19 	for (i = 0; i < t->nb_workers; i++)
20 		evt_info("Worker %d packets: "CLGRN"%"PRIx64""CLNRM" percentage:"
21 				CLGRN" %3.2f"CLNRM, i,
22 				t->worker[i].processed_pkts,
23 				(((double)t->worker[i].processed_pkts)/total)
24 				* 100);
25 	return t->result;
26 }
27 
28 void
29 pipeline_opt_dump(struct evt_options *opt, uint8_t nb_queues)
30 {
31 	evt_dump("nb_worker_lcores", "%d", evt_nr_active_lcores(opt->wlcores));
32 	evt_dump_worker_lcores(opt);
33 	evt_dump_nb_stages(opt);
34 	evt_dump("nb_evdev_ports", "%d", pipeline_nb_event_ports(opt));
35 	evt_dump("nb_evdev_queues", "%d", nb_queues);
36 	evt_dump_queue_priority(opt);
37 	evt_dump_sched_type_list(opt);
38 	evt_dump_producer_type(opt);
39 	evt_dump("nb_eth_rx_queues", "%d", opt->eth_queues);
40 	evt_dump("event_vector", "%d", opt->ena_vector);
41 	if (opt->ena_vector) {
42 		evt_dump("vector_size", "%d", opt->vector_size);
43 		evt_dump("vector_tmo_ns", "%" PRIu64 "", opt->vector_tmo_nsec);
44 	}
45 }
46 
47 static inline uint64_t
48 processed_pkts(struct test_pipeline *t)
49 {
50 	uint8_t i;
51 	uint64_t total = 0;
52 
53 	for (i = 0; i < t->nb_workers; i++)
54 		total += t->worker[i].processed_pkts;
55 
56 	return total;
57 }
58 
59 /* RFC863 discard port */
60 #define UDP_SRC_PORT 9
61 #define UDP_DST_PORT 9
62 
63 /* RFC2544 reserved test subnet 192.18.0.0 */
64 #define IP_SRC_ADDR(x, y) ((192U << 24) | (18 << 16) | ((x) << 8) | (y))
65 #define IP_DST_ADDR(x, y) ((192U << 24) | (18 << 16) | ((x) << 8) | (y))
66 
67 #define IP_DEFTTL  64 /* from RFC 1340. */
68 #define IP_VERSION 0x40
69 #define IP_HDRLEN  0x05 /* default IP header length == five 32-bits words. */
70 #define IP_VHL_DEF (IP_VERSION | IP_HDRLEN)
71 
72 static void
73 setup_pkt_udp_ip_headers(struct rte_ipv4_hdr *ip_hdr,
74 			 struct rte_udp_hdr *udp_hdr, uint16_t pkt_data_len,
75 			 uint8_t port, uint8_t flow)
76 {
77 	uint16_t *ptr16;
78 	uint32_t ip_cksum;
79 	uint16_t pkt_len;
80 
81 	/*
82 	 * Initialize UDP header.
83 	 */
84 	pkt_len = (uint16_t)(pkt_data_len + sizeof(struct rte_udp_hdr));
85 	udp_hdr->src_port = rte_cpu_to_be_16(UDP_SRC_PORT);
86 	udp_hdr->dst_port = rte_cpu_to_be_16(UDP_DST_PORT);
87 	udp_hdr->dgram_len = rte_cpu_to_be_16(pkt_len);
88 	udp_hdr->dgram_cksum = 0; /* No UDP checksum. */
89 
90 	/*
91 	 * Initialize IP header.
92 	 */
93 	pkt_len = (uint16_t)(pkt_len + sizeof(struct rte_ipv4_hdr));
94 	ip_hdr->version_ihl = IP_VHL_DEF;
95 	ip_hdr->type_of_service = 0;
96 	ip_hdr->fragment_offset = 0;
97 	ip_hdr->time_to_live = IP_DEFTTL;
98 	ip_hdr->next_proto_id = IPPROTO_UDP;
99 	ip_hdr->packet_id = 0;
100 	ip_hdr->total_length = rte_cpu_to_be_16(pkt_len);
101 	ip_hdr->src_addr = rte_cpu_to_be_32(IP_SRC_ADDR(port, 1));
102 	ip_hdr->dst_addr = rte_cpu_to_be_32(IP_DST_ADDR(port + 1, flow));
103 
104 	/*
105 	 * Compute IP header checksum.
106 	 */
107 	ptr16 = (unaligned_uint16_t *)ip_hdr;
108 	ip_cksum = 0;
109 	ip_cksum += ptr16[0];
110 	ip_cksum += ptr16[1];
111 	ip_cksum += ptr16[2];
112 	ip_cksum += ptr16[3];
113 	ip_cksum += ptr16[4];
114 	ip_cksum += ptr16[6];
115 	ip_cksum += ptr16[7];
116 	ip_cksum += ptr16[8];
117 	ip_cksum += ptr16[9];
118 
119 	/*
120 	 * Reduce 32 bit checksum to 16 bits and complement it.
121 	 */
122 	ip_cksum = ((ip_cksum & 0xFFFF0000) >> 16) + (ip_cksum & 0x0000FFFF);
123 	if (ip_cksum > 65535)
124 		ip_cksum -= 65535;
125 	ip_cksum = (~ip_cksum) & 0x0000FFFF;
126 	if (ip_cksum == 0)
127 		ip_cksum = 0xFFFF;
128 	ip_hdr->hdr_checksum = (uint16_t)ip_cksum;
129 }
130 
131 static void
132 pipeline_tx_first(struct test_pipeline *t, struct evt_options *opt)
133 {
134 #define TX_DEF_PACKET_LEN 64
135 	uint16_t eth_port_id = 0;
136 	uint16_t pkt_sz, rc;
137 	uint32_t i;
138 
139 	pkt_sz = opt->tx_pkt_sz;
140 	if (pkt_sz > opt->max_pkt_sz)
141 		pkt_sz = opt->max_pkt_sz;
142 	if (!pkt_sz)
143 		pkt_sz = TX_DEF_PACKET_LEN;
144 
145 	RTE_ETH_FOREACH_DEV(eth_port_id) {
146 		struct rte_ether_addr src_mac;
147 		struct rte_ether_addr dst_mac;
148 		struct rte_ether_hdr eth_hdr;
149 
150 		/* Send to the same dest.mac as port mac */
151 		rte_eth_macaddr_get(eth_port_id, &dst_mac);
152 		rte_eth_random_addr((uint8_t *)&src_mac);
153 
154 		rte_ether_addr_copy(&dst_mac, &eth_hdr.dst_addr);
155 		rte_ether_addr_copy(&src_mac, &eth_hdr.src_addr);
156 		eth_hdr.ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
157 
158 		for (i = 0; i < opt->tx_first; i++) {
159 			struct rte_udp_hdr *pkt_udp_hdr;
160 			struct rte_ipv4_hdr ip_hdr;
161 			struct rte_udp_hdr udp_hdr;
162 			struct rte_mbuf *mbuf;
163 
164 			mbuf = rte_pktmbuf_alloc(
165 				opt->per_port_pool ? t->pool[i] : t->pool[0]);
166 			if (mbuf == NULL)
167 				continue;
168 
169 			setup_pkt_udp_ip_headers(
170 				&ip_hdr, &udp_hdr,
171 				pkt_sz - sizeof(struct rte_ether_hdr) -
172 					sizeof(struct rte_ipv4_hdr) -
173 					sizeof(struct rte_udp_hdr),
174 				eth_port_id, i);
175 			mbuf->port = eth_port_id;
176 			mbuf->data_len = pkt_sz;
177 			mbuf->pkt_len = pkt_sz;
178 
179 			/* Copy Ethernet header */
180 			rte_memcpy(rte_pktmbuf_mtod_offset(mbuf, char *, 0),
181 				   &eth_hdr, sizeof(struct rte_ether_hdr));
182 
183 			/* Copy Ipv4 header */
184 			rte_memcpy(rte_pktmbuf_mtod_offset(
185 					   mbuf, char *,
186 					   sizeof(struct rte_ether_hdr)),
187 				   &ip_hdr, sizeof(struct rte_ipv4_hdr));
188 
189 			/* Copy UDP header */
190 			rte_memcpy(
191 				rte_pktmbuf_mtod_offset(
192 					mbuf, char *,
193 					sizeof(struct rte_ipv4_hdr) +
194 						sizeof(struct rte_ether_hdr)),
195 				&udp_hdr, sizeof(struct rte_udp_hdr));
196 			pkt_udp_hdr = rte_pktmbuf_mtod_offset(
197 				mbuf, struct rte_udp_hdr *,
198 				sizeof(struct rte_ipv4_hdr) +
199 					sizeof(struct rte_ether_hdr));
200 			pkt_udp_hdr->src_port =
201 				rte_cpu_to_be_16(UDP_SRC_PORT + i);
202 			pkt_udp_hdr->dst_port =
203 				rte_cpu_to_be_16(UDP_SRC_PORT + i);
204 
205 			rc = rte_eth_tx_burst(eth_port_id, 0, &mbuf, 1);
206 			if (rc == 0)
207 				rte_pktmbuf_free(mbuf);
208 		}
209 	}
210 }
211 
212 int
213 pipeline_launch_lcores(struct evt_test *test, struct evt_options *opt,
214 		int (*worker)(void *))
215 {
216 	struct test_pipeline *t = evt_test_priv(test);
217 	int ret, lcore_id;
218 	int port_idx = 0;
219 
220 	if (opt->tx_first)
221 		pipeline_tx_first(t, opt);
222 
223 	/* launch workers */
224 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
225 		if (!(opt->wlcores[lcore_id]))
226 			continue;
227 
228 		ret = rte_eal_remote_launch(worker,
229 				 &t->worker[port_idx], lcore_id);
230 		if (ret) {
231 			evt_err("failed to launch worker %d", lcore_id);
232 			return ret;
233 		}
234 		port_idx++;
235 	}
236 
237 	uint64_t perf_cycles = rte_get_timer_cycles();
238 	const uint64_t perf_sample = rte_get_timer_hz();
239 
240 	static float total_mpps;
241 	static uint64_t samples;
242 
243 	uint64_t prev_pkts = 0;
244 
245 	while (t->done == false) {
246 		const uint64_t new_cycles = rte_get_timer_cycles();
247 
248 		if ((new_cycles - perf_cycles) > perf_sample) {
249 			const uint64_t curr_pkts = processed_pkts(t);
250 
251 			float mpps = (float)(curr_pkts - prev_pkts)/1000000;
252 
253 			prev_pkts = curr_pkts;
254 			perf_cycles = new_cycles;
255 			total_mpps += mpps;
256 			++samples;
257 			printf(CLGRN"\r%.3f mpps avg %.3f mpps"CLNRM,
258 					mpps, total_mpps/samples);
259 			fflush(stdout);
260 		}
261 	}
262 	printf("\n");
263 	return 0;
264 }
265 
266 int
267 pipeline_opt_check(struct evt_options *opt, uint64_t nb_queues)
268 {
269 	unsigned int lcores;
270 
271 	/* N worker + main */
272 	lcores = 2;
273 
274 	if (opt->prod_type != EVT_PROD_TYPE_ETH_RX_ADPTR) {
275 		evt_err("Invalid producer type '%s' valid producer '%s'",
276 			evt_prod_id_to_name(opt->prod_type),
277 			evt_prod_id_to_name(EVT_PROD_TYPE_ETH_RX_ADPTR));
278 		return -1;
279 	}
280 
281 	if (!rte_eth_dev_count_avail()) {
282 		evt_err("test needs minimum 1 ethernet dev");
283 		return -1;
284 	}
285 
286 	if (rte_lcore_count() < lcores) {
287 		evt_err("test need minimum %d lcores", lcores);
288 		return -1;
289 	}
290 
291 	/* Validate worker lcores */
292 	if (evt_lcores_has_overlap(opt->wlcores, rte_get_main_lcore())) {
293 		evt_err("worker lcores overlaps with main lcore");
294 		return -1;
295 	}
296 	if (evt_has_disabled_lcore(opt->wlcores)) {
297 		evt_err("one or more workers lcores are not enabled");
298 		return -1;
299 	}
300 	if (!evt_has_active_lcore(opt->wlcores)) {
301 		evt_err("minimum one worker is required");
302 		return -1;
303 	}
304 
305 	if (nb_queues > EVT_MAX_QUEUES) {
306 		evt_err("number of queues exceeds %d", EVT_MAX_QUEUES);
307 		return -1;
308 	}
309 	if (pipeline_nb_event_ports(opt) > EVT_MAX_PORTS) {
310 		evt_err("number of ports exceeds %d", EVT_MAX_PORTS);
311 		return -1;
312 	}
313 
314 	if (opt->prod_type != EVT_PROD_TYPE_ETH_RX_ADPTR) {
315 		evt_err("Invalid producer type, only --prod_type_ethdev is supported");
316 		return -1;
317 	}
318 
319 	if (evt_has_invalid_stage(opt))
320 		return -1;
321 
322 	if (evt_has_invalid_sched_type(opt))
323 		return -1;
324 
325 	return 0;
326 }
327 
328 #define NB_RX_DESC			128
329 #define NB_TX_DESC			512
330 int
331 pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
332 {
333 	uint16_t i, j;
334 	int ret;
335 	uint8_t nb_queues = 1;
336 	struct test_pipeline *t = evt_test_priv(test);
337 	struct rte_eth_rxconf rx_conf;
338 	struct rte_eth_conf port_conf = {
339 		.rxmode = {
340 			.mq_mode = RTE_ETH_MQ_RX_RSS,
341 		},
342 		.rx_adv_conf = {
343 			.rss_conf = {
344 				.rss_key = NULL,
345 				.rss_hf = RTE_ETH_RSS_IP,
346 			},
347 		},
348 	};
349 
350 	if (!rte_eth_dev_count_avail()) {
351 		evt_err("No ethernet ports found.");
352 		return -ENODEV;
353 	}
354 
355 	if (opt->max_pkt_sz < RTE_ETHER_MIN_LEN) {
356 		evt_err("max_pkt_sz can not be less than %d",
357 			RTE_ETHER_MIN_LEN);
358 		return -EINVAL;
359 	}
360 
361 	port_conf.rxmode.mtu = opt->max_pkt_sz - RTE_ETHER_HDR_LEN -
362 		RTE_ETHER_CRC_LEN;
363 
364 	t->internal_port = 1;
365 	RTE_ETH_FOREACH_DEV(i) {
366 		struct rte_eth_dev_info dev_info;
367 		struct rte_eth_conf local_port_conf = port_conf;
368 		uint32_t caps = 0;
369 
370 		ret = rte_event_eth_tx_adapter_caps_get(opt->dev_id, i, &caps);
371 		if (ret != 0) {
372 			evt_err("failed to get event tx adapter[%d] caps", i);
373 			return ret;
374 		}
375 
376 		if (!(caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT))
377 			t->internal_port = 0;
378 
379 		ret = rte_event_eth_rx_adapter_caps_get(opt->dev_id, i, &caps);
380 		if (ret != 0) {
381 			evt_err("failed to get event tx adapter[%d] caps", i);
382 			return ret;
383 		}
384 
385 		if (!(caps & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT))
386 			local_port_conf.rxmode.offloads |=
387 				RTE_ETH_RX_OFFLOAD_RSS_HASH;
388 
389 		ret = rte_eth_dev_info_get(i, &dev_info);
390 		if (ret != 0) {
391 			evt_err("Error during getting device (port %u) info: %s\n",
392 				i, strerror(-ret));
393 			return ret;
394 		}
395 
396 		/* Enable mbuf fast free if PMD has the capability. */
397 		if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
398 			local_port_conf.txmode.offloads |=
399 				RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
400 
401 		rx_conf = dev_info.default_rxconf;
402 		rx_conf.offloads = port_conf.rxmode.offloads;
403 
404 		local_port_conf.rx_adv_conf.rss_conf.rss_hf &=
405 			dev_info.flow_type_rss_offloads;
406 		if (local_port_conf.rx_adv_conf.rss_conf.rss_hf !=
407 				port_conf.rx_adv_conf.rss_conf.rss_hf) {
408 			evt_info("Port %u modified RSS hash function based on hardware support,"
409 				"requested:%#"PRIx64" configured:%#"PRIx64"",
410 				i,
411 				port_conf.rx_adv_conf.rss_conf.rss_hf,
412 				local_port_conf.rx_adv_conf.rss_conf.rss_hf);
413 		}
414 
415 		if (rte_eth_dev_configure(i, opt->eth_queues, nb_queues,
416 					  &local_port_conf) < 0) {
417 			evt_err("Failed to configure eth port [%d]", i);
418 			return -EINVAL;
419 		}
420 
421 		for (j = 0; j < opt->eth_queues; j++) {
422 			if (rte_eth_rx_queue_setup(
423 				    i, j, NB_RX_DESC, rte_socket_id(), &rx_conf,
424 				    opt->per_port_pool ? t->pool[i] :
425 							      t->pool[0]) < 0) {
426 				evt_err("Failed to setup eth port [%d] rx_queue: %d.",
427 					i, 0);
428 				return -EINVAL;
429 			}
430 		}
431 
432 		if (rte_eth_tx_queue_setup(i, 0, NB_TX_DESC,
433 					rte_socket_id(), NULL) < 0) {
434 			evt_err("Failed to setup eth port [%d] tx_queue: %d.",
435 					i, 0);
436 			return -EINVAL;
437 		}
438 
439 		ret = rte_eth_promiscuous_enable(i);
440 		if (ret != 0) {
441 			evt_err("Failed to enable promiscuous mode for eth port [%d]: %s",
442 				i, rte_strerror(-ret));
443 			return ret;
444 		}
445 	}
446 
447 	return 0;
448 }
449 
450 int
451 pipeline_event_port_setup(struct evt_test *test, struct evt_options *opt,
452 		uint8_t *queue_arr, uint8_t nb_queues,
453 		const struct rte_event_port_conf p_conf)
454 {
455 	int ret;
456 	uint8_t port;
457 	struct test_pipeline *t = evt_test_priv(test);
458 
459 
460 	/* setup one port per worker, linking to all queues */
461 	for (port = 0; port < evt_nr_active_lcores(opt->wlcores); port++) {
462 		struct worker_data *w = &t->worker[port];
463 
464 		w->dev_id = opt->dev_id;
465 		w->port_id = port;
466 		w->t = t;
467 		w->processed_pkts = 0;
468 
469 		ret = rte_event_port_setup(opt->dev_id, port, &p_conf);
470 		if (ret) {
471 			evt_err("failed to setup port %d", port);
472 			return ret;
473 		}
474 
475 		if (rte_event_port_link(opt->dev_id, port, queue_arr, NULL,
476 					nb_queues) != nb_queues)
477 			goto link_fail;
478 	}
479 
480 	return 0;
481 
482 link_fail:
483 	evt_err("failed to link queues to port %d", port);
484 	return -EINVAL;
485 }
486 
487 int
488 pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride,
489 		struct rte_event_port_conf prod_conf)
490 {
491 	int ret = 0;
492 	uint16_t prod;
493 	struct rte_mempool *vector_pool = NULL;
494 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
495 
496 	memset(&queue_conf, 0,
497 			sizeof(struct rte_event_eth_rx_adapter_queue_conf));
498 	queue_conf.ev.sched_type = opt->sched_type_list[0];
499 	if (opt->ena_vector) {
500 		unsigned int nb_elem = (opt->pool_sz / opt->vector_size) << 1;
501 
502 		nb_elem = RTE_MAX(512U, nb_elem);
503 		nb_elem += evt_nr_active_lcores(opt->wlcores) * 32;
504 		vector_pool = rte_event_vector_pool_create(
505 			"vector_pool", nb_elem, 32, opt->vector_size,
506 			opt->socket_id);
507 		if (vector_pool == NULL) {
508 			evt_err("failed to create event vector pool");
509 			return -ENOMEM;
510 		}
511 	}
512 	RTE_ETH_FOREACH_DEV(prod) {
513 		struct rte_event_eth_rx_adapter_vector_limits limits;
514 		uint32_t cap;
515 
516 		ret = rte_event_eth_rx_adapter_caps_get(opt->dev_id,
517 				prod, &cap);
518 		if (ret) {
519 			evt_err("failed to get event rx adapter[%d]"
520 					" capabilities",
521 					opt->dev_id);
522 			return ret;
523 		}
524 
525 		if (opt->ena_vector) {
526 			memset(&limits, 0, sizeof(limits));
527 			ret = rte_event_eth_rx_adapter_vector_limits_get(
528 				opt->dev_id, prod, &limits);
529 			if (ret) {
530 				evt_err("failed to get vector limits");
531 				return ret;
532 			}
533 
534 			if (opt->vector_size < limits.min_sz ||
535 			    opt->vector_size > limits.max_sz) {
536 				evt_err("Vector size [%d] not within limits max[%d] min[%d]",
537 					opt->vector_size, limits.max_sz,
538 					limits.min_sz);
539 				return -EINVAL;
540 			}
541 
542 			if (limits.log2_sz &&
543 			    !rte_is_power_of_2(opt->vector_size)) {
544 				evt_err("Vector size [%d] not power of 2",
545 					opt->vector_size);
546 				return -EINVAL;
547 			}
548 
549 			if (opt->vector_tmo_nsec > limits.max_timeout_ns ||
550 			    opt->vector_tmo_nsec < limits.min_timeout_ns) {
551 				evt_err("Vector timeout [%" PRIu64
552 					"] not within limits max[%" PRIu64
553 					"] min[%" PRIu64 "]",
554 					opt->vector_tmo_nsec,
555 					limits.max_timeout_ns,
556 					limits.min_timeout_ns);
557 				return -EINVAL;
558 			}
559 
560 			if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) {
561 				queue_conf.vector_sz = opt->vector_size;
562 				queue_conf.vector_timeout_ns =
563 					opt->vector_tmo_nsec;
564 				queue_conf.rx_queue_flags |=
565 				RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR;
566 				queue_conf.vector_mp = vector_pool;
567 			} else {
568 				evt_err("Rx adapter doesn't support event vector");
569 				return -EINVAL;
570 			}
571 		}
572 		queue_conf.ev.queue_id = prod * stride;
573 		ret = rte_event_eth_rx_adapter_create(prod, opt->dev_id,
574 				&prod_conf);
575 		if (ret) {
576 			evt_err("failed to create rx adapter[%d]", prod);
577 			return ret;
578 		}
579 		ret = rte_event_eth_rx_adapter_queue_add(prod, prod, -1,
580 				&queue_conf);
581 		if (ret) {
582 			evt_err("failed to add rx queues to adapter[%d]", prod);
583 			return ret;
584 		}
585 
586 		if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
587 			uint32_t service_id = -1U;
588 
589 			rte_event_eth_rx_adapter_service_id_get(prod,
590 					&service_id);
591 			ret = evt_service_setup(service_id);
592 			if (ret) {
593 				evt_err("Failed to setup service core"
594 						" for Rx adapter");
595 				return ret;
596 			}
597 		}
598 
599 		evt_info("Port[%d] using Rx adapter[%d] configured", prod,
600 				prod);
601 	}
602 
603 	return ret;
604 }
605 
606 int
607 pipeline_event_tx_adapter_setup(struct evt_options *opt,
608 		struct rte_event_port_conf port_conf)
609 {
610 	int ret = 0;
611 	uint16_t consm;
612 
613 	RTE_ETH_FOREACH_DEV(consm) {
614 		uint32_t cap;
615 
616 		ret = rte_event_eth_tx_adapter_caps_get(opt->dev_id,
617 				consm, &cap);
618 		if (ret) {
619 			evt_err("failed to get event tx adapter[%d] caps",
620 					consm);
621 			return ret;
622 		}
623 
624 		if (opt->ena_vector) {
625 			if (!(cap &
626 			      RTE_EVENT_ETH_TX_ADAPTER_CAP_EVENT_VECTOR)) {
627 				evt_err("Tx adapter doesn't support event vector");
628 				return -EINVAL;
629 			}
630 		}
631 
632 		ret = rte_event_eth_tx_adapter_create(consm, opt->dev_id,
633 				&port_conf);
634 		if (ret) {
635 			evt_err("failed to create tx adapter[%d]", consm);
636 			return ret;
637 		}
638 
639 		ret = rte_event_eth_tx_adapter_queue_add(consm, consm, -1);
640 		if (ret) {
641 			evt_err("failed to add tx queues to adapter[%d]",
642 					consm);
643 			return ret;
644 		}
645 
646 		if (!(cap & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)) {
647 			uint32_t service_id = -1U;
648 
649 			ret = rte_event_eth_tx_adapter_service_id_get(consm,
650 								   &service_id);
651 			if (ret != -ESRCH && ret != 0) {
652 				evt_err("Failed to get Tx adptr service ID");
653 				return ret;
654 			}
655 			ret = evt_service_setup(service_id);
656 			if (ret) {
657 				evt_err("Failed to setup service core"
658 						" for Tx adapter");
659 				return ret;
660 			}
661 		}
662 
663 		evt_info("Port[%d] using Tx adapter[%d] Configured", consm,
664 				consm);
665 	}
666 
667 	return ret;
668 }
669 
670 static void
671 pipeline_vector_array_free(struct rte_event events[], uint16_t num)
672 {
673 	uint16_t i;
674 
675 	for (i = 0; i < num; i++) {
676 		rte_pktmbuf_free_bulk(
677 			&events[i].vec->mbufs[events[i].vec->elem_offset],
678 			events[i].vec->nb_elem);
679 		rte_mempool_put(rte_mempool_from_obj(events[i].vec),
680 				events[i].vec);
681 	}
682 }
683 
684 static void
685 pipeline_event_port_flush(uint8_t dev_id __rte_unused, struct rte_event ev,
686 			  void *args __rte_unused)
687 {
688 	if (ev.event_type & RTE_EVENT_TYPE_VECTOR)
689 		pipeline_vector_array_free(&ev, 1);
690 	else
691 		rte_pktmbuf_free(ev.mbuf);
692 }
693 
694 void
695 pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[],
696 			uint16_t enq, uint16_t deq)
697 {
698 	int i;
699 
700 	if (deq) {
701 		for (i = enq; i < deq; i++) {
702 			if (ev[i].op == RTE_EVENT_OP_RELEASE)
703 				continue;
704 			if (ev[i].event_type & RTE_EVENT_TYPE_VECTOR)
705 				pipeline_vector_array_free(&ev[i], 1);
706 			else
707 				rte_pktmbuf_free(ev[i].mbuf);
708 		}
709 
710 		for (i = 0; i < deq; i++)
711 			ev[i].op = RTE_EVENT_OP_RELEASE;
712 
713 		rte_event_enqueue_burst(dev, port, ev, deq);
714 	}
715 
716 	rte_event_port_quiesce(dev, port, pipeline_event_port_flush, NULL);
717 }
718 
719 void
720 pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt)
721 {
722 	uint16_t i, j;
723 	RTE_SET_USED(test);
724 
725 	if (opt->prod_type == EVT_PROD_TYPE_ETH_RX_ADPTR) {
726 		RTE_ETH_FOREACH_DEV(i) {
727 			rte_event_eth_rx_adapter_stop(i);
728 			rte_event_eth_rx_adapter_queue_del(i, i, -1);
729 			for (j = 0; j < opt->eth_queues; j++)
730 				rte_eth_dev_rx_queue_stop(i, j);
731 		}
732 	}
733 }
734 
735 void
736 pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt)
737 {
738 	uint16_t i;
739 	RTE_SET_USED(test);
740 	RTE_SET_USED(opt);
741 
742 	RTE_ETH_FOREACH_DEV(i) {
743 		rte_event_eth_tx_adapter_stop(i);
744 		rte_event_eth_tx_adapter_queue_del(i, i, -1);
745 		rte_eth_dev_tx_queue_stop(i, 0);
746 		rte_eth_dev_stop(i);
747 	}
748 }
749 
750 void
751 pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt)
752 {
753 	RTE_SET_USED(test);
754 
755 	rte_event_dev_stop(opt->dev_id);
756 	rte_event_dev_close(opt->dev_id);
757 }
758 
759 int
760 pipeline_mempool_setup(struct evt_test *test, struct evt_options *opt)
761 {
762 	struct test_pipeline *t = evt_test_priv(test);
763 	int i, ret;
764 
765 	if (!opt->mbuf_sz)
766 		opt->mbuf_sz = RTE_MBUF_DEFAULT_BUF_SIZE;
767 
768 	if (!opt->max_pkt_sz)
769 		opt->max_pkt_sz = RTE_ETHER_MAX_LEN;
770 
771 	RTE_ETH_FOREACH_DEV(i) {
772 		struct rte_eth_dev_info dev_info;
773 		uint16_t data_size = 0;
774 
775 		memset(&dev_info, 0, sizeof(dev_info));
776 		ret = rte_eth_dev_info_get(i, &dev_info);
777 		if (ret != 0) {
778 			evt_err("Error during getting device (port %u) info: %s\n",
779 				i, strerror(-ret));
780 			return ret;
781 		}
782 
783 		if (dev_info.rx_desc_lim.nb_mtu_seg_max != UINT16_MAX &&
784 				dev_info.rx_desc_lim.nb_mtu_seg_max != 0) {
785 			data_size = opt->max_pkt_sz /
786 				dev_info.rx_desc_lim.nb_mtu_seg_max;
787 			data_size += RTE_PKTMBUF_HEADROOM;
788 
789 			if (data_size  > opt->mbuf_sz)
790 				opt->mbuf_sz = data_size;
791 		}
792 		if (opt->per_port_pool) {
793 			char name[RTE_MEMPOOL_NAMESIZE];
794 
795 			snprintf(name, RTE_MEMPOOL_NAMESIZE, "%s-%d",
796 				 test->name, i);
797 			t->pool[i] = rte_pktmbuf_pool_create(
798 				name,	      /* mempool name */
799 				opt->pool_sz, /* number of elements*/
800 				0,	      /* cache size*/
801 				0, opt->mbuf_sz, opt->socket_id); /* flags */
802 
803 			if (t->pool[i] == NULL) {
804 				evt_err("failed to create mempool %s", name);
805 				return -ENOMEM;
806 			}
807 		}
808 	}
809 
810 	if (!opt->per_port_pool) {
811 		t->pool[0] = rte_pktmbuf_pool_create(
812 			test->name,   /* mempool name */
813 			opt->pool_sz, /* number of elements*/
814 			0,	      /* cache size*/
815 			0, opt->mbuf_sz, opt->socket_id); /* flags */
816 
817 		if (t->pool[0] == NULL) {
818 			evt_err("failed to create mempool");
819 			return -ENOMEM;
820 		}
821 	}
822 
823 	return 0;
824 }
825 
826 void
827 pipeline_mempool_destroy(struct evt_test *test, struct evt_options *opt)
828 {
829 	struct test_pipeline *t = evt_test_priv(test);
830 	int i;
831 
832 	RTE_SET_USED(opt);
833 	if (opt->per_port_pool) {
834 		RTE_ETH_FOREACH_DEV(i)
835 			rte_mempool_free(t->pool[i]);
836 	} else {
837 		rte_mempool_free(t->pool[0]);
838 	}
839 }
840 
841 int
842 pipeline_test_setup(struct evt_test *test, struct evt_options *opt)
843 {
844 	void *test_pipeline;
845 
846 	test_pipeline = rte_zmalloc_socket(test->name,
847 			sizeof(struct test_pipeline), RTE_CACHE_LINE_SIZE,
848 			opt->socket_id);
849 	if (test_pipeline  == NULL) {
850 		evt_err("failed to allocate test_pipeline memory");
851 		goto nomem;
852 	}
853 	test->test_priv = test_pipeline;
854 
855 	struct test_pipeline *t = evt_test_priv(test);
856 
857 	t->nb_workers = evt_nr_active_lcores(opt->wlcores);
858 	t->outstand_pkts = opt->nb_pkts * evt_nr_active_lcores(opt->wlcores);
859 	t->done = false;
860 	t->nb_flows = opt->nb_flows;
861 	t->result = EVT_TEST_FAILED;
862 	t->opt = opt;
863 	opt->prod_type = EVT_PROD_TYPE_ETH_RX_ADPTR;
864 	memcpy(t->sched_type_list, opt->sched_type_list,
865 			sizeof(opt->sched_type_list));
866 	return 0;
867 nomem:
868 	return -ENOMEM;
869 }
870 
871 void
872 pipeline_test_destroy(struct evt_test *test, struct evt_options *opt)
873 {
874 	RTE_SET_USED(opt);
875 
876 	rte_free(test->test_priv);
877 }
878