xref: /dpdk/lib/eventdev/rte_event_eth_tx_adapter.c (revision bc84182d6ae809d131467516056c7ea98bb4d961)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation.
3  */
4 #include <rte_spinlock.h>
5 #include <rte_service_component.h>
6 #include <ethdev_driver.h>
7 
8 #include "eventdev_pmd.h"
9 #include "eventdev_trace.h"
10 #include "rte_event_eth_tx_adapter.h"
11 
12 #define TXA_BATCH_SIZE		32
13 #define TXA_SERVICE_NAME_LEN	32
14 #define TXA_MEM_NAME_LEN	32
15 #define TXA_FLUSH_THRESHOLD	1024
16 #define TXA_RETRY_CNT		100
17 #define TXA_MAX_NB_TX		128
18 #define TXA_INVALID_DEV_ID	INT32_C(-1)
19 #define TXA_INVALID_SERVICE_ID	INT64_C(-1)
20 
21 #define TXA_ADAPTER_ARRAY "txa_adapter_array"
22 #define TXA_SERVICE_DATA_ARRAY "txa_service_data_array"
23 
24 #define txa_evdev(id) (&rte_eventdevs[txa_dev_id_array[(id)]])
25 
26 #define txa_dev_caps_get(id) txa_evdev((id))->dev_ops->eth_tx_adapter_caps_get
27 
28 #define txa_dev_adapter_create(t) txa_evdev(t)->dev_ops->eth_tx_adapter_create
29 
30 #define txa_dev_adapter_create_ext(t) \
31 				txa_evdev(t)->dev_ops->eth_tx_adapter_create
32 
33 #define txa_dev_adapter_free(t) txa_evdev(t)->dev_ops->eth_tx_adapter_free
34 
35 #define txa_dev_queue_add(id) txa_evdev(id)->dev_ops->eth_tx_adapter_queue_add
36 
37 #define txa_dev_queue_del(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_del
38 
39 #define txa_dev_start(t) txa_evdev(t)->dev_ops->eth_tx_adapter_start
40 
41 #define txa_dev_stop(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stop
42 
43 #define txa_dev_stats_reset(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_reset
44 
45 #define txa_dev_stats_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_get
46 
47 #define txa_dev_instance_get(id) \
48 			txa_evdev(id)->dev_ops->eth_tx_adapter_instance_get
49 
50 #define txa_dev_queue_start(id) \
51 			txa_evdev(id)->dev_ops->eth_tx_adapter_queue_start
52 
53 #define txa_dev_queue_stop(id) \
54 			txa_evdev(id)->dev_ops->eth_tx_adapter_queue_stop
55 
56 #define RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) \
57 do { \
58 	if (!txa_valid_id(id)) { \
59 		RTE_EDEV_LOG_ERR("Invalid eth Tx adapter id = %d", id); \
60 		return retval; \
61 	} \
62 } while (0)
63 
64 #define TXA_CHECK_OR_ERR_RET(id) \
65 do {\
66 	int ret; \
67 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET((id), -EINVAL); \
68 	ret = txa_init(); \
69 	if (ret != 0) \
70 		return ret; \
71 	if (!txa_adapter_exist((id))) \
72 		return -EINVAL; \
73 } while (0)
74 
75 #define TXA_CHECK_TXQ(dev, queue) \
76 do {\
77 	if ((dev)->data->nb_tx_queues == 0) { \
78 		RTE_EDEV_LOG_ERR("No tx queues configured"); \
79 		return -EINVAL; \
80 	} \
81 	if ((queue) != -1 && \
82 		(uint16_t)(queue) >= (dev)->data->nb_tx_queues) { \
83 		RTE_EDEV_LOG_ERR("Invalid tx queue_id %" PRIu16, \
84 				(uint16_t)(queue)); \
85 		return -EINVAL; \
86 	} \
87 } while (0)
88 
89 /* Tx retry callback structure */
90 struct txa_retry {
91 	/* Ethernet port id */
92 	uint16_t port_id;
93 	/* Tx queue */
94 	uint16_t tx_queue;
95 	/* Adapter ID */
96 	uint8_t id;
97 };
98 
99 /* Per queue structure */
100 struct txa_service_queue_info {
101 	/* Queue has been added */
102 	uint8_t added;
103 	/* Queue is stopped */
104 	bool stopped;
105 	/* Retry callback argument */
106 	struct txa_retry txa_retry;
107 	/* Tx buffer */
108 	struct rte_eth_dev_tx_buffer *tx_buf;
109 };
110 
111 /* PMD private structure */
112 struct txa_service_data {
113 	/* Max mbufs processed in any service function invocation */
114 	uint32_t max_nb_tx;
115 	/* Number of Tx queues in adapter */
116 	uint32_t nb_queues;
117 	/*  Synchronization with data path */
118 	rte_spinlock_t tx_lock;
119 	/* Event port ID */
120 	uint8_t port_id;
121 	/* Event device identifier */
122 	uint8_t eventdev_id;
123 	/* Highest port id supported + 1 */
124 	uint16_t dev_count;
125 	/* Loop count to flush Tx buffers */
126 	int loop_cnt;
127 	/* Per ethernet device structure */
128 	struct txa_service_ethdev *txa_ethdev;
129 	/* Statistics */
130 	struct rte_event_eth_tx_adapter_stats stats;
131 	/* Adapter Identifier */
132 	uint8_t id;
133 	/* Conf arg must be freed */
134 	uint8_t conf_free;
135 	/* Configuration callback */
136 	rte_event_eth_tx_adapter_conf_cb conf_cb;
137 	/* Configuration callback argument */
138 	void *conf_arg;
139 	/* socket id */
140 	int socket_id;
141 	/* Per adapter EAL service */
142 	int64_t service_id;
143 	/* Memory allocation name */
144 	char mem_name[TXA_MEM_NAME_LEN];
145 } __rte_cache_aligned;
146 
147 /* Per eth device structure */
148 struct txa_service_ethdev {
149 	/* Pointer to ethernet device */
150 	struct rte_eth_dev *dev;
151 	/* Number of queues added */
152 	uint16_t nb_queues;
153 	/* PMD specific queue data */
154 	void *queues;
155 };
156 
157 /* Array of adapter instances, initialized with event device id
158  * when adapter is created
159  */
160 static int *txa_dev_id_array;
161 
162 /* Array of pointers to service implementation data */
163 static struct txa_service_data **txa_service_data_array;
164 
165 static int32_t txa_service_func(void *args);
166 static int txa_service_adapter_create_ext(uint8_t id,
167 			struct rte_eventdev *dev,
168 			rte_event_eth_tx_adapter_conf_cb conf_cb,
169 			void *conf_arg);
170 static int txa_service_queue_del(uint8_t id,
171 				const struct rte_eth_dev *dev,
172 				int32_t tx_queue_id);
173 
174 static int
175 txa_adapter_exist(uint8_t id)
176 {
177 	return txa_dev_id_array[id] != TXA_INVALID_DEV_ID;
178 }
179 
180 static inline int
181 txa_valid_id(uint8_t id)
182 {
183 	return id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
184 }
185 
186 static void *
187 txa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
188 {
189 	const struct rte_memzone *mz;
190 	unsigned int sz;
191 
192 	sz = elt_size * nb_elems;
193 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
194 
195 	mz = rte_memzone_lookup(name);
196 	if (mz == NULL) {
197 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
198 						 RTE_CACHE_LINE_SIZE);
199 		if (mz == NULL) {
200 			RTE_EDEV_LOG_ERR("failed to reserve memzone"
201 					" name = %s err = %"
202 					PRId32, name, rte_errno);
203 			return NULL;
204 		}
205 	}
206 
207 	return  mz->addr;
208 }
209 
210 static int
211 txa_lookup(void)
212 {
213 	const struct rte_memzone *mz;
214 
215 	if (txa_dev_id_array == NULL) {
216 		mz = rte_memzone_lookup(TXA_ADAPTER_ARRAY);
217 		if (mz == NULL)
218 			return -ENOMEM;
219 		txa_dev_id_array = mz->addr;
220 	}
221 
222 	if (txa_service_data_array == NULL) {
223 		mz = rte_memzone_lookup(TXA_SERVICE_DATA_ARRAY);
224 		if (mz == NULL)
225 			return -ENOMEM;
226 		txa_service_data_array = mz->addr;
227 	}
228 
229 	return 0;
230 }
231 
232 static int
233 txa_dev_id_array_init(void)
234 {
235 	if (txa_dev_id_array == NULL) {
236 		int i;
237 
238 		txa_dev_id_array = txa_memzone_array_get(TXA_ADAPTER_ARRAY,
239 					sizeof(int),
240 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
241 		if (txa_dev_id_array == NULL)
242 			return -ENOMEM;
243 
244 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
245 			txa_dev_id_array[i] = TXA_INVALID_DEV_ID;
246 	}
247 
248 	return 0;
249 }
250 
251 static int
252 txa_init(void)
253 {
254 	return txa_dev_id_array_init();
255 }
256 
257 static int
258 txa_service_data_init(void)
259 {
260 	if (txa_service_data_array == NULL) {
261 		int i;
262 
263 		txa_service_data_array =
264 				txa_memzone_array_get(TXA_SERVICE_DATA_ARRAY,
265 					sizeof(*txa_service_data_array),
266 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
267 		if (txa_service_data_array == NULL)
268 			return -ENOMEM;
269 
270 		/* Reset the txa service pointers */
271 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
272 			txa_service_data_array[i] = NULL;
273 	}
274 
275 	return 0;
276 }
277 
278 static inline struct txa_service_data *
279 txa_service_id_to_data(uint8_t id)
280 {
281 	return txa_service_data_array[id];
282 }
283 
284 static inline struct txa_service_queue_info *
285 txa_service_queue(struct txa_service_data *txa, uint16_t port_id,
286 		uint16_t tx_queue_id)
287 {
288 	struct txa_service_queue_info *tqi;
289 
290 	if (unlikely(txa->txa_ethdev == NULL || txa->dev_count < port_id + 1))
291 		return NULL;
292 
293 	tqi = txa->txa_ethdev[port_id].queues;
294 
295 	return likely(tqi != NULL) ? tqi + tx_queue_id : NULL;
296 }
297 
298 static int
299 txa_service_conf_cb(uint8_t __rte_unused id, uint8_t dev_id,
300 		struct rte_event_eth_tx_adapter_conf *conf, void *arg)
301 {
302 	int ret;
303 	struct rte_eventdev *dev;
304 	struct rte_event_port_conf *pc;
305 	struct rte_event_dev_config dev_conf;
306 	int started;
307 	uint8_t port_id;
308 
309 	pc = arg;
310 	dev = &rte_eventdevs[dev_id];
311 	dev_conf = dev->data->dev_conf;
312 
313 	started = dev->data->dev_started;
314 	if (started)
315 		rte_event_dev_stop(dev_id);
316 
317 	port_id = dev_conf.nb_event_ports;
318 	dev_conf.nb_event_ports += 1;
319 	if (pc->event_port_cfg & RTE_EVENT_PORT_CFG_SINGLE_LINK)
320 		dev_conf.nb_single_link_event_port_queues += 1;
321 
322 	ret = rte_event_dev_configure(dev_id, &dev_conf);
323 	if (ret) {
324 		RTE_EDEV_LOG_ERR("failed to configure event dev %u",
325 						dev_id);
326 		if (started) {
327 			if (rte_event_dev_start(dev_id))
328 				return -EIO;
329 		}
330 		return ret;
331 	}
332 
333 	ret = rte_event_port_setup(dev_id, port_id, pc);
334 	if (ret) {
335 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
336 					port_id);
337 		if (started) {
338 			if (rte_event_dev_start(dev_id))
339 				return -EIO;
340 		}
341 		return ret;
342 	}
343 
344 	conf->event_port_id = port_id;
345 	conf->max_nb_tx = TXA_MAX_NB_TX;
346 	if (started)
347 		ret = rte_event_dev_start(dev_id);
348 	return ret;
349 }
350 
351 static int
352 txa_service_ethdev_alloc(struct txa_service_data *txa)
353 {
354 	struct txa_service_ethdev *txa_ethdev;
355 	uint16_t i, dev_count;
356 
357 	dev_count = rte_eth_dev_count_avail();
358 	if (txa->txa_ethdev && dev_count == txa->dev_count)
359 		return 0;
360 
361 	txa_ethdev = rte_zmalloc_socket(txa->mem_name,
362 					dev_count * sizeof(*txa_ethdev),
363 					0,
364 					txa->socket_id);
365 	if (txa_ethdev == NULL) {
366 		RTE_EDEV_LOG_ERR("Failed to alloc txa::txa_ethdev ");
367 		return -ENOMEM;
368 	}
369 
370 	if (txa->dev_count)
371 		memcpy(txa_ethdev, txa->txa_ethdev,
372 			txa->dev_count * sizeof(*txa_ethdev));
373 
374 	RTE_ETH_FOREACH_DEV(i) {
375 		if (i == dev_count)
376 			break;
377 		txa_ethdev[i].dev = &rte_eth_devices[i];
378 	}
379 
380 	txa->txa_ethdev = txa_ethdev;
381 	txa->dev_count = dev_count;
382 	return 0;
383 }
384 
385 static int
386 txa_service_queue_array_alloc(struct txa_service_data *txa,
387 			uint16_t port_id)
388 {
389 	struct txa_service_queue_info *tqi;
390 	uint16_t nb_queue;
391 	int ret;
392 
393 	ret = txa_service_ethdev_alloc(txa);
394 	if (ret != 0)
395 		return ret;
396 
397 	if (txa->txa_ethdev[port_id].queues)
398 		return 0;
399 
400 	nb_queue = txa->txa_ethdev[port_id].dev->data->nb_tx_queues;
401 	tqi = rte_zmalloc_socket(txa->mem_name,
402 				nb_queue *
403 				sizeof(struct txa_service_queue_info), 0,
404 				txa->socket_id);
405 	if (tqi == NULL)
406 		return -ENOMEM;
407 	txa->txa_ethdev[port_id].queues = tqi;
408 	return 0;
409 }
410 
411 static void
412 txa_service_queue_array_free(struct txa_service_data *txa,
413 			uint16_t port_id)
414 {
415 	struct txa_service_ethdev *txa_ethdev;
416 	struct txa_service_queue_info *tqi;
417 
418 	txa_ethdev = &txa->txa_ethdev[port_id];
419 	if (txa->txa_ethdev == NULL || txa_ethdev->nb_queues != 0)
420 		return;
421 
422 	tqi = txa_ethdev->queues;
423 	txa_ethdev->queues = NULL;
424 	rte_free(tqi);
425 
426 	if (txa->nb_queues == 0) {
427 		rte_free(txa->txa_ethdev);
428 		txa->txa_ethdev = NULL;
429 	}
430 }
431 
432 static void
433 txa_service_unregister(struct txa_service_data *txa)
434 {
435 	if (txa->service_id != TXA_INVALID_SERVICE_ID) {
436 		rte_service_component_runstate_set(txa->service_id, 0);
437 		while (rte_service_may_be_active(txa->service_id))
438 			rte_pause();
439 		rte_service_component_unregister(txa->service_id);
440 	}
441 	txa->service_id = TXA_INVALID_SERVICE_ID;
442 }
443 
444 static int
445 txa_service_register(struct txa_service_data *txa)
446 {
447 	int ret;
448 	struct rte_service_spec service;
449 	struct rte_event_eth_tx_adapter_conf conf;
450 
451 	if (txa->service_id != TXA_INVALID_SERVICE_ID)
452 		return 0;
453 
454 	memset(&service, 0, sizeof(service));
455 	snprintf(service.name, TXA_SERVICE_NAME_LEN, "txa_%d", txa->id);
456 	service.socket_id = txa->socket_id;
457 	service.callback = txa_service_func;
458 	service.callback_userdata = txa;
459 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
460 	ret = rte_service_component_register(&service,
461 					(uint32_t *)&txa->service_id);
462 	if (ret) {
463 		RTE_EDEV_LOG_ERR("failed to register service %s err = %"
464 				 PRId32, service.name, ret);
465 		return ret;
466 	}
467 
468 	ret = txa->conf_cb(txa->id, txa->eventdev_id, &conf, txa->conf_arg);
469 	if (ret) {
470 		txa_service_unregister(txa);
471 		return ret;
472 	}
473 
474 	rte_service_component_runstate_set(txa->service_id, 1);
475 	txa->port_id = conf.event_port_id;
476 	txa->max_nb_tx = conf.max_nb_tx;
477 	return 0;
478 }
479 
480 static struct rte_eth_dev_tx_buffer *
481 txa_service_tx_buf_alloc(struct txa_service_data *txa,
482 			const struct rte_eth_dev *dev)
483 {
484 	struct rte_eth_dev_tx_buffer *tb;
485 	uint16_t port_id;
486 
487 	port_id = dev->data->port_id;
488 	tb = rte_zmalloc_socket(txa->mem_name,
489 				RTE_ETH_TX_BUFFER_SIZE(TXA_BATCH_SIZE),
490 				0,
491 				rte_eth_dev_socket_id(port_id));
492 	if (tb == NULL)
493 		RTE_EDEV_LOG_ERR("Failed to allocate memory for tx buffer");
494 	return tb;
495 }
496 
497 static int
498 txa_service_is_queue_added(struct txa_service_data *txa,
499 			const struct rte_eth_dev *dev,
500 			uint16_t tx_queue_id)
501 {
502 	struct txa_service_queue_info *tqi;
503 
504 	tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
505 	return tqi && tqi->added;
506 }
507 
508 static int
509 txa_service_ctrl(uint8_t id, int start)
510 {
511 	int ret;
512 	struct txa_service_data *txa;
513 
514 	txa = txa_service_id_to_data(id);
515 	if (txa == NULL || txa->service_id == TXA_INVALID_SERVICE_ID)
516 		return 0;
517 
518 	rte_spinlock_lock(&txa->tx_lock);
519 	ret = rte_service_runstate_set(txa->service_id, start);
520 	rte_spinlock_unlock(&txa->tx_lock);
521 
522 	return ret;
523 }
524 
525 static void
526 txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
527 			void *userdata)
528 {
529 	struct txa_retry *tr;
530 	struct txa_service_data *data;
531 	struct rte_event_eth_tx_adapter_stats *stats;
532 	uint16_t sent = 0;
533 	unsigned int retry = 0;
534 	uint16_t i, n;
535 
536 	tr = (struct txa_retry *)(uintptr_t)userdata;
537 	data = txa_service_id_to_data(tr->id);
538 	stats = &data->stats;
539 
540 	do {
541 		n = rte_eth_tx_burst(tr->port_id, tr->tx_queue,
542 			       &pkts[sent], unsent - sent);
543 
544 		sent += n;
545 	} while (sent != unsent && retry++ < TXA_RETRY_CNT);
546 
547 	for (i = sent; i < unsent; i++)
548 		rte_pktmbuf_free(pkts[i]);
549 
550 	stats->tx_retry += retry;
551 	stats->tx_packets += sent;
552 	stats->tx_dropped += unsent - sent;
553 }
554 
555 static uint16_t
556 txa_process_event_vector(struct txa_service_data *txa,
557 			 struct rte_event_vector *vec)
558 {
559 	struct txa_service_queue_info *tqi;
560 	uint16_t port, queue, nb_tx = 0;
561 	struct rte_mbuf **mbufs;
562 	int i;
563 
564 	mbufs = (struct rte_mbuf **)vec->mbufs;
565 	if (vec->attr_valid) {
566 		port = vec->port;
567 		queue = vec->queue;
568 		tqi = txa_service_queue(txa, port, queue);
569 		if (unlikely(tqi == NULL || !tqi->added || tqi->stopped)) {
570 			rte_pktmbuf_free_bulk(&mbufs[vec->elem_offset],
571 					      vec->nb_elem);
572 			rte_mempool_put(rte_mempool_from_obj(vec), vec);
573 			return 0;
574 		}
575 		for (i = 0; i < vec->nb_elem; i++) {
576 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
577 						   mbufs[i + vec->elem_offset]);
578 		}
579 	} else {
580 		for (i = vec->elem_offset; i < vec->elem_offset + vec->nb_elem;
581 		     i++) {
582 			port = mbufs[i]->port;
583 			queue = rte_event_eth_tx_adapter_txq_get(mbufs[i]);
584 			tqi = txa_service_queue(txa, port, queue);
585 			if (unlikely(tqi == NULL || !tqi->added ||
586 				     tqi->stopped)) {
587 				rte_pktmbuf_free(mbufs[i]);
588 				continue;
589 			}
590 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
591 						   mbufs[i]);
592 		}
593 	}
594 	rte_mempool_put(rte_mempool_from_obj(vec), vec);
595 
596 	return nb_tx;
597 }
598 
599 static void
600 txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
601 	uint32_t n)
602 {
603 	uint32_t i;
604 	uint16_t nb_tx;
605 	struct rte_event_eth_tx_adapter_stats *stats;
606 
607 	stats = &txa->stats;
608 
609 	nb_tx = 0;
610 	for (i = 0; i < n; i++) {
611 		uint16_t port;
612 		uint16_t queue;
613 		struct txa_service_queue_info *tqi;
614 
615 		if (!(ev[i].event_type & RTE_EVENT_TYPE_VECTOR)) {
616 			struct rte_mbuf *m;
617 
618 			m = ev[i].mbuf;
619 			port = m->port;
620 			queue = rte_event_eth_tx_adapter_txq_get(m);
621 
622 			tqi = txa_service_queue(txa, port, queue);
623 			if (unlikely(tqi == NULL || !tqi->added ||
624 				     tqi->stopped)) {
625 				rte_pktmbuf_free(m);
626 				continue;
627 			}
628 
629 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
630 		} else {
631 			nb_tx += txa_process_event_vector(txa, ev[i].vec);
632 		}
633 	}
634 
635 	stats->tx_packets += nb_tx;
636 }
637 
638 static int32_t
639 txa_service_func(void *args)
640 {
641 	struct txa_service_data *txa = args;
642 	uint8_t dev_id;
643 	uint8_t port;
644 	int ret = -EAGAIN;
645 	uint16_t n;
646 	uint32_t nb_tx, max_nb_tx;
647 	struct rte_event ev[TXA_BATCH_SIZE];
648 
649 	dev_id = txa->eventdev_id;
650 	max_nb_tx = txa->max_nb_tx;
651 	port = txa->port_id;
652 
653 	if (txa->nb_queues == 0)
654 		return ret;
655 
656 	if (!rte_spinlock_trylock(&txa->tx_lock))
657 		return ret;
658 
659 	for (nb_tx = 0; nb_tx < max_nb_tx; nb_tx += n) {
660 
661 		n = rte_event_dequeue_burst(dev_id, port, ev, RTE_DIM(ev), 0);
662 		if (!n)
663 			break;
664 		txa_service_tx(txa, ev, n);
665 		ret = 0;
666 	}
667 
668 	if ((txa->loop_cnt++ & (TXA_FLUSH_THRESHOLD - 1)) == 0) {
669 
670 		struct txa_service_ethdev *tdi;
671 		struct txa_service_queue_info *tqi;
672 		struct rte_eth_dev *dev;
673 		uint16_t i;
674 
675 		tdi = txa->txa_ethdev;
676 		nb_tx = 0;
677 
678 		RTE_ETH_FOREACH_DEV(i) {
679 			uint16_t q;
680 
681 			if (i >= txa->dev_count)
682 				break;
683 
684 			dev = tdi[i].dev;
685 			if (tdi[i].nb_queues == 0)
686 				continue;
687 			for (q = 0; q < dev->data->nb_tx_queues; q++) {
688 
689 				tqi = txa_service_queue(txa, i, q);
690 				if (unlikely(tqi == NULL || !tqi->added ||
691 					     tqi->stopped))
692 					continue;
693 
694 				nb_tx += rte_eth_tx_buffer_flush(i, q,
695 							tqi->tx_buf);
696 			}
697 		}
698 
699 		if (likely(nb_tx > 0)) {
700 			txa->stats.tx_packets += nb_tx;
701 			ret = 0;
702 		}
703 	}
704 	rte_spinlock_unlock(&txa->tx_lock);
705 	return ret;
706 }
707 
708 static int
709 txa_service_adapter_create(uint8_t id, struct rte_eventdev *dev,
710 			struct rte_event_port_conf *port_conf)
711 {
712 	struct txa_service_data *txa;
713 	struct rte_event_port_conf *cb_conf;
714 	int ret;
715 
716 	cb_conf = rte_malloc(NULL, sizeof(*cb_conf), 0);
717 	if (cb_conf == NULL)
718 		return -ENOMEM;
719 
720 	*cb_conf = *port_conf;
721 	ret = txa_service_adapter_create_ext(id, dev, txa_service_conf_cb,
722 					cb_conf);
723 	if (ret) {
724 		rte_free(cb_conf);
725 		return ret;
726 	}
727 
728 	txa = txa_service_id_to_data(id);
729 	txa->conf_free = 1;
730 	return ret;
731 }
732 
733 static int
734 txa_service_adapter_create_ext(uint8_t id, struct rte_eventdev *dev,
735 			rte_event_eth_tx_adapter_conf_cb conf_cb,
736 			void *conf_arg)
737 {
738 	struct txa_service_data *txa;
739 	int socket_id;
740 	char mem_name[TXA_SERVICE_NAME_LEN];
741 	int ret;
742 
743 	if (conf_cb == NULL)
744 		return -EINVAL;
745 
746 	socket_id = dev->data->socket_id;
747 	snprintf(mem_name, TXA_MEM_NAME_LEN,
748 		"rte_event_eth_txa_%d",
749 		id);
750 
751 	ret = txa_service_data_init();
752 	if (ret != 0)
753 		return ret;
754 
755 	txa = rte_zmalloc_socket(mem_name,
756 				sizeof(*txa),
757 				RTE_CACHE_LINE_SIZE, socket_id);
758 	if (txa == NULL) {
759 		RTE_EDEV_LOG_ERR("failed to get mem for tx adapter");
760 		return -ENOMEM;
761 	}
762 
763 	txa->id = id;
764 	txa->eventdev_id = dev->data->dev_id;
765 	txa->socket_id = socket_id;
766 	strncpy(txa->mem_name, mem_name, TXA_SERVICE_NAME_LEN);
767 	txa->conf_cb = conf_cb;
768 	txa->conf_arg = conf_arg;
769 	txa->service_id = TXA_INVALID_SERVICE_ID;
770 	rte_spinlock_init(&txa->tx_lock);
771 	txa_service_data_array[id] = txa;
772 
773 	return 0;
774 }
775 
776 static int
777 txa_service_event_port_get(uint8_t id, uint8_t *port)
778 {
779 	struct txa_service_data *txa;
780 
781 	txa = txa_service_id_to_data(id);
782 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
783 		return -ENODEV;
784 
785 	*port = txa->port_id;
786 	return 0;
787 }
788 
789 static int
790 txa_service_adapter_free(uint8_t id)
791 {
792 	struct txa_service_data *txa;
793 
794 	txa = txa_service_id_to_data(id);
795 	if (txa->nb_queues) {
796 		RTE_EDEV_LOG_ERR("%" PRIu16 " Tx queues not deleted",
797 				txa->nb_queues);
798 		return -EBUSY;
799 	}
800 
801 	if (txa->conf_free)
802 		rte_free(txa->conf_arg);
803 	rte_free(txa);
804 	return 0;
805 }
806 
807 static int
808 txa_service_queue_add(uint8_t id,
809 		__rte_unused struct rte_eventdev *dev,
810 		const struct rte_eth_dev *eth_dev,
811 		int32_t tx_queue_id)
812 {
813 	struct txa_service_data *txa;
814 	struct txa_service_ethdev *tdi;
815 	struct txa_service_queue_info *tqi;
816 	struct rte_eth_dev_tx_buffer *tb;
817 	struct txa_retry *txa_retry;
818 	int ret = 0;
819 
820 	txa = txa_service_id_to_data(id);
821 
822 	if (tx_queue_id == -1) {
823 		int nb_queues;
824 		uint16_t i, j;
825 		uint16_t *qdone;
826 
827 		nb_queues = eth_dev->data->nb_tx_queues;
828 		if (txa->dev_count > eth_dev->data->port_id) {
829 			tdi = &txa->txa_ethdev[eth_dev->data->port_id];
830 			nb_queues -= tdi->nb_queues;
831 		}
832 
833 		qdone = rte_zmalloc(txa->mem_name,
834 				nb_queues * sizeof(*qdone), 0);
835 		if (qdone == NULL)
836 			return -ENOMEM;
837 		j = 0;
838 		for (i = 0; i < nb_queues; i++) {
839 			if (txa_service_is_queue_added(txa, eth_dev, i))
840 				continue;
841 			ret = txa_service_queue_add(id, dev, eth_dev, i);
842 			if (ret == 0)
843 				qdone[j++] = i;
844 			else
845 				break;
846 		}
847 
848 		if (i != nb_queues) {
849 			for (i = 0; i < j; i++)
850 				txa_service_queue_del(id, eth_dev, qdone[i]);
851 		}
852 		rte_free(qdone);
853 		return ret;
854 	}
855 
856 	ret = txa_service_register(txa);
857 	if (ret)
858 		return ret;
859 
860 	rte_spinlock_lock(&txa->tx_lock);
861 
862 	if (txa_service_is_queue_added(txa, eth_dev, tx_queue_id))
863 		goto ret_unlock;
864 
865 	ret = txa_service_queue_array_alloc(txa, eth_dev->data->port_id);
866 	if (ret)
867 		goto err_unlock;
868 
869 	tb = txa_service_tx_buf_alloc(txa, eth_dev);
870 	if (tb == NULL)
871 		goto err_unlock;
872 
873 	tdi = &txa->txa_ethdev[eth_dev->data->port_id];
874 	tqi = txa_service_queue(txa, eth_dev->data->port_id, tx_queue_id);
875 	if (tqi == NULL)
876 		goto err_unlock;
877 
878 	txa_retry = &tqi->txa_retry;
879 	txa_retry->id = txa->id;
880 	txa_retry->port_id = eth_dev->data->port_id;
881 	txa_retry->tx_queue = tx_queue_id;
882 
883 	rte_eth_tx_buffer_init(tb, TXA_BATCH_SIZE);
884 	rte_eth_tx_buffer_set_err_callback(tb,
885 		txa_service_buffer_retry, txa_retry);
886 
887 	tqi->tx_buf = tb;
888 	tqi->added = 1;
889 	tqi->stopped = false;
890 	tdi->nb_queues++;
891 	txa->nb_queues++;
892 
893 ret_unlock:
894 	rte_spinlock_unlock(&txa->tx_lock);
895 	return 0;
896 
897 err_unlock:
898 	if (txa->nb_queues == 0) {
899 		txa_service_queue_array_free(txa,
900 					eth_dev->data->port_id);
901 		txa_service_unregister(txa);
902 	}
903 
904 	rte_spinlock_unlock(&txa->tx_lock);
905 	return -1;
906 }
907 
908 static inline void
909 txa_txq_buffer_drain(struct txa_service_queue_info *tqi)
910 {
911 	struct rte_eth_dev_tx_buffer *b;
912 	uint16_t i;
913 
914 	b = tqi->tx_buf;
915 
916 	for (i = 0; i < b->length; i++)
917 		rte_pktmbuf_free(b->pkts[i]);
918 
919 	b->length = 0;
920 }
921 
922 static int
923 txa_service_queue_del(uint8_t id,
924 		const struct rte_eth_dev *dev,
925 		int32_t tx_queue_id)
926 {
927 	struct txa_service_data *txa;
928 	struct txa_service_queue_info *tqi;
929 	struct rte_eth_dev_tx_buffer *tb;
930 	uint16_t port_id;
931 
932 	txa = txa_service_id_to_data(id);
933 	port_id = dev->data->port_id;
934 
935 	if (tx_queue_id == -1) {
936 		uint16_t i, q, nb_queues;
937 		int ret = 0;
938 
939 		if (txa->txa_ethdev == NULL)
940 			return 0;
941 		nb_queues = txa->txa_ethdev[port_id].nb_queues;
942 		if (nb_queues == 0)
943 			return 0;
944 
945 		i = 0;
946 		q = 0;
947 		tqi = txa->txa_ethdev[port_id].queues;
948 
949 		while (i < nb_queues) {
950 
951 			if (tqi[q].added) {
952 				ret = txa_service_queue_del(id, dev, q);
953 				i++;
954 				if (ret != 0)
955 					break;
956 			}
957 			q++;
958 		}
959 		return ret;
960 	}
961 
962 	txa = txa_service_id_to_data(id);
963 
964 	rte_spinlock_lock(&txa->tx_lock);
965 	tqi = txa_service_queue(txa, port_id, tx_queue_id);
966 	if (tqi == NULL || !tqi->added)
967 		goto ret_unlock;
968 
969 	/* Drain the buffered mbufs */
970 	txa_txq_buffer_drain(tqi);
971 	tb = tqi->tx_buf;
972 	tqi->added = 0;
973 	tqi->tx_buf = NULL;
974 	rte_free(tb);
975 	txa->nb_queues--;
976 	txa->txa_ethdev[port_id].nb_queues--;
977 
978 	txa_service_queue_array_free(txa, port_id);
979 
980 ret_unlock:
981 	rte_spinlock_unlock(&txa->tx_lock);
982 	return 0;
983 }
984 
985 static int
986 txa_service_id_get(uint8_t id, uint32_t *service_id)
987 {
988 	struct txa_service_data *txa;
989 
990 	txa = txa_service_id_to_data(id);
991 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
992 		return -ESRCH;
993 
994 	if (service_id == NULL)
995 		return -EINVAL;
996 
997 	*service_id = txa->service_id;
998 
999 	rte_eventdev_trace_eth_tx_adapter_service_id_get(id, *service_id);
1000 	return 0;
1001 }
1002 
1003 static int
1004 txa_service_start(uint8_t id)
1005 {
1006 	return txa_service_ctrl(id, 1);
1007 }
1008 
1009 static int
1010 txa_service_stats_get(uint8_t id,
1011 		struct rte_event_eth_tx_adapter_stats *stats)
1012 {
1013 	struct txa_service_data *txa;
1014 
1015 	txa = txa_service_id_to_data(id);
1016 	*stats = txa->stats;
1017 	return 0;
1018 }
1019 
1020 static int
1021 txa_service_stats_reset(uint8_t id)
1022 {
1023 	struct txa_service_data *txa;
1024 
1025 	txa = txa_service_id_to_data(id);
1026 	memset(&txa->stats, 0, sizeof(txa->stats));
1027 	return 0;
1028 }
1029 
1030 static int
1031 txa_service_stop(uint8_t id)
1032 {
1033 	return txa_service_ctrl(id, 0);
1034 }
1035 
1036 
1037 int
1038 rte_event_eth_tx_adapter_create(uint8_t id, uint8_t dev_id,
1039 				struct rte_event_port_conf *port_conf)
1040 {
1041 	struct rte_eventdev *dev;
1042 	int ret;
1043 
1044 	if (port_conf == NULL)
1045 		return -EINVAL;
1046 
1047 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1048 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1049 
1050 	dev = &rte_eventdevs[dev_id];
1051 
1052 	ret = txa_init();
1053 	if (ret != 0)
1054 		return ret;
1055 
1056 	if (txa_adapter_exist(id))
1057 		return -EEXIST;
1058 
1059 	txa_dev_id_array[id] = dev_id;
1060 	if (txa_dev_adapter_create(id))
1061 		ret = txa_dev_adapter_create(id)(id, dev);
1062 
1063 	if (ret != 0) {
1064 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1065 		return ret;
1066 	}
1067 
1068 	ret = txa_service_adapter_create(id, dev, port_conf);
1069 	if (ret != 0) {
1070 		if (txa_dev_adapter_free(id))
1071 			txa_dev_adapter_free(id)(id, dev);
1072 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1073 		return ret;
1074 	}
1075 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, NULL, port_conf,
1076 		ret);
1077 	txa_dev_id_array[id] = dev_id;
1078 	return 0;
1079 }
1080 
1081 int
1082 rte_event_eth_tx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1083 				rte_event_eth_tx_adapter_conf_cb conf_cb,
1084 				void *conf_arg)
1085 {
1086 	struct rte_eventdev *dev;
1087 	int ret;
1088 
1089 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1090 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1091 
1092 	ret = txa_init();
1093 	if (ret != 0)
1094 		return ret;
1095 
1096 	if (txa_adapter_exist(id))
1097 		return -EINVAL;
1098 
1099 	dev = &rte_eventdevs[dev_id];
1100 
1101 	txa_dev_id_array[id] = dev_id;
1102 	if (txa_dev_adapter_create_ext(id))
1103 		ret = txa_dev_adapter_create_ext(id)(id, dev);
1104 
1105 	if (ret != 0) {
1106 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1107 		return ret;
1108 	}
1109 
1110 	ret = txa_service_adapter_create_ext(id, dev, conf_cb, conf_arg);
1111 	if (ret != 0) {
1112 		if (txa_dev_adapter_free(id))
1113 			txa_dev_adapter_free(id)(id, dev);
1114 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1115 		return ret;
1116 	}
1117 
1118 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, conf_cb, conf_arg,
1119 		ret);
1120 	txa_dev_id_array[id] = dev_id;
1121 	return 0;
1122 }
1123 
1124 
1125 int
1126 rte_event_eth_tx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
1127 {
1128 	rte_eventdev_trace_eth_tx_adapter_event_port_get(id);
1129 
1130 	TXA_CHECK_OR_ERR_RET(id);
1131 
1132 	return txa_service_event_port_get(id, event_port_id);
1133 }
1134 
1135 int
1136 rte_event_eth_tx_adapter_free(uint8_t id)
1137 {
1138 	int ret;
1139 
1140 	TXA_CHECK_OR_ERR_RET(id);
1141 
1142 	ret = txa_dev_adapter_free(id) ?
1143 		txa_dev_adapter_free(id)(id, txa_evdev(id)) :
1144 		0;
1145 
1146 	if (ret == 0)
1147 		ret = txa_service_adapter_free(id);
1148 	txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1149 
1150 	rte_eventdev_trace_eth_tx_adapter_free(id, ret);
1151 	return ret;
1152 }
1153 
1154 int
1155 rte_event_eth_tx_adapter_queue_add(uint8_t id,
1156 				uint16_t eth_dev_id,
1157 				int32_t queue)
1158 {
1159 	struct rte_eth_dev *eth_dev;
1160 	int ret;
1161 	uint32_t caps;
1162 
1163 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1164 	TXA_CHECK_OR_ERR_RET(id);
1165 
1166 	eth_dev = &rte_eth_devices[eth_dev_id];
1167 	TXA_CHECK_TXQ(eth_dev, queue);
1168 
1169 	caps = 0;
1170 	if (txa_dev_caps_get(id))
1171 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1172 
1173 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1174 		ret =  txa_dev_queue_add(id) ?
1175 					txa_dev_queue_add(id)(id,
1176 							txa_evdev(id),
1177 							eth_dev,
1178 							queue) : 0;
1179 	else
1180 		ret = txa_service_queue_add(id, txa_evdev(id), eth_dev, queue);
1181 
1182 	rte_eventdev_trace_eth_tx_adapter_queue_add(id, eth_dev_id, queue,
1183 		ret);
1184 	return ret;
1185 }
1186 
1187 int
1188 rte_event_eth_tx_adapter_queue_del(uint8_t id,
1189 				uint16_t eth_dev_id,
1190 				int32_t queue)
1191 {
1192 	struct rte_eth_dev *eth_dev;
1193 	int ret;
1194 	uint32_t caps;
1195 
1196 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1197 	TXA_CHECK_OR_ERR_RET(id);
1198 
1199 	eth_dev = &rte_eth_devices[eth_dev_id];
1200 
1201 	caps = 0;
1202 
1203 	if (txa_dev_caps_get(id))
1204 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1205 
1206 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1207 		ret =  txa_dev_queue_del(id) ?
1208 					txa_dev_queue_del(id)(id, txa_evdev(id),
1209 							eth_dev,
1210 							queue) : 0;
1211 	else
1212 		ret = txa_service_queue_del(id, eth_dev, queue);
1213 
1214 	rte_eventdev_trace_eth_tx_adapter_queue_del(id, eth_dev_id, queue,
1215 		ret);
1216 	return ret;
1217 }
1218 
1219 int
1220 rte_event_eth_tx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1221 {
1222 	TXA_CHECK_OR_ERR_RET(id);
1223 
1224 	return txa_service_id_get(id, service_id);
1225 }
1226 
1227 int
1228 rte_event_eth_tx_adapter_start(uint8_t id)
1229 {
1230 	int ret;
1231 
1232 	TXA_CHECK_OR_ERR_RET(id);
1233 
1234 	ret = txa_dev_start(id) ? txa_dev_start(id)(id, txa_evdev(id)) : 0;
1235 	if (ret == 0)
1236 		ret = txa_service_start(id);
1237 	rte_eventdev_trace_eth_tx_adapter_start(id, ret);
1238 	return ret;
1239 }
1240 
1241 int
1242 rte_event_eth_tx_adapter_stats_get(uint8_t id,
1243 				struct rte_event_eth_tx_adapter_stats *stats)
1244 {
1245 	int ret;
1246 
1247 	TXA_CHECK_OR_ERR_RET(id);
1248 
1249 	if (stats == NULL)
1250 		return -EINVAL;
1251 
1252 	*stats = (struct rte_event_eth_tx_adapter_stats){0};
1253 
1254 	ret = txa_dev_stats_get(id) ?
1255 			txa_dev_stats_get(id)(id, txa_evdev(id), stats) : 0;
1256 
1257 	if (ret == 0 && txa_service_id_get(id, NULL) != ESRCH) {
1258 		if (txa_dev_stats_get(id)) {
1259 			struct rte_event_eth_tx_adapter_stats service_stats;
1260 
1261 			ret = txa_service_stats_get(id, &service_stats);
1262 			if (ret == 0) {
1263 				stats->tx_retry += service_stats.tx_retry;
1264 				stats->tx_packets += service_stats.tx_packets;
1265 				stats->tx_dropped += service_stats.tx_dropped;
1266 			}
1267 		} else
1268 			ret = txa_service_stats_get(id, stats);
1269 	}
1270 
1271 	rte_eventdev_trace_eth_tx_adapter_stats_get(id, stats->tx_retry, stats->tx_packets,
1272 						    stats->tx_dropped, ret);
1273 
1274 	return ret;
1275 }
1276 
1277 int
1278 rte_event_eth_tx_adapter_stats_reset(uint8_t id)
1279 {
1280 	int ret;
1281 
1282 	TXA_CHECK_OR_ERR_RET(id);
1283 
1284 	ret = txa_dev_stats_reset(id) ?
1285 		txa_dev_stats_reset(id)(id, txa_evdev(id)) : 0;
1286 	if (ret == 0)
1287 		ret = txa_service_stats_reset(id);
1288 
1289 	rte_eventdev_trace_eth_tx_adapter_stats_reset(id, ret);
1290 
1291 	return ret;
1292 }
1293 
1294 int
1295 rte_event_eth_tx_adapter_stop(uint8_t id)
1296 {
1297 	int ret;
1298 
1299 	TXA_CHECK_OR_ERR_RET(id);
1300 
1301 	ret = txa_dev_stop(id) ? txa_dev_stop(id)(id,  txa_evdev(id)) : 0;
1302 	if (ret == 0)
1303 		ret = txa_service_stop(id);
1304 	rte_eventdev_trace_eth_tx_adapter_stop(id, ret);
1305 	return ret;
1306 }
1307 
1308 int
1309 rte_event_eth_tx_adapter_instance_get(uint16_t eth_dev_id,
1310 				      uint16_t tx_queue_id,
1311 				      uint8_t *txa_inst_id)
1312 {
1313 	uint8_t id;
1314 	int ret = -EINVAL;
1315 	uint32_t caps;
1316 	struct txa_service_data *txa;
1317 
1318 	if (txa_lookup())
1319 		return -ENOMEM;
1320 
1321 	if (eth_dev_id >= rte_eth_dev_count_avail()) {
1322 		RTE_EDEV_LOG_ERR("Invalid ethernet port id %u", eth_dev_id);
1323 		return -EINVAL;
1324 	}
1325 
1326 	if (tx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_tx_queues) {
1327 		RTE_EDEV_LOG_ERR("Invalid tx queue id %u", tx_queue_id);
1328 		return -EINVAL;
1329 	}
1330 
1331 	if (txa_inst_id == NULL) {
1332 		RTE_EDEV_LOG_ERR("txa_instance_id cannot be NULL");
1333 		return -EINVAL;
1334 	}
1335 
1336 	/* Iterate through all Tx adapter instances */
1337 	for (id = 0; id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; id++) {
1338 		txa = txa_service_id_to_data(id);
1339 		if (!txa)
1340 			continue;
1341 
1342 		caps = 0;
1343 		if (rte_event_eth_tx_adapter_caps_get(txa->eventdev_id,
1344 						      eth_dev_id,
1345 						      &caps))
1346 			continue;
1347 
1348 		if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT) {
1349 			ret = txa_dev_instance_get(id) ?
1350 					txa_dev_instance_get(id)(eth_dev_id,
1351 								 tx_queue_id,
1352 								 txa_inst_id)
1353 							: -EINVAL;
1354 			if (ret == 0) {
1355 				rte_eventdev_trace_eth_tx_adapter_instance_get(eth_dev_id,
1356 								tx_queue_id, *txa_inst_id);
1357 				return ret;
1358 			}
1359 		} else {
1360 			struct rte_eth_dev *eth_dev;
1361 
1362 			eth_dev = &rte_eth_devices[eth_dev_id];
1363 
1364 			if (txa_service_is_queue_added(txa, eth_dev,
1365 						       tx_queue_id)) {
1366 				*txa_inst_id = txa->id;
1367 				rte_eventdev_trace_eth_tx_adapter_instance_get(eth_dev_id,
1368 								tx_queue_id, *txa_inst_id);
1369 				return 0;
1370 			}
1371 		}
1372 	}
1373 
1374 	return -EINVAL;
1375 }
1376 
1377 static inline int
1378 txa_sw_queue_start_state_set(uint16_t eth_dev_id, uint16_t tx_queue_id,
1379 			     bool start_state, struct txa_service_data *txa)
1380 {
1381 	struct txa_service_queue_info *tqi = NULL;
1382 
1383 	rte_spinlock_lock(&txa->tx_lock);
1384 	tqi = txa_service_queue(txa, eth_dev_id, tx_queue_id);
1385 	if (unlikely(tqi == NULL || !tqi->added)) {
1386 		rte_spinlock_unlock(&txa->tx_lock);
1387 		return -EINVAL;
1388 	}
1389 	if (start_state == false)
1390 		txa_txq_buffer_drain(tqi);
1391 
1392 	tqi->stopped = !start_state;
1393 	rte_spinlock_unlock(&txa->tx_lock);
1394 	return 0;
1395 }
1396 
1397 static int
1398 txa_queue_start_state_set(uint16_t eth_dev_id, uint16_t tx_queue_id,
1399 			  bool start_state)
1400 {
1401 	struct txa_service_data *txa;
1402 	uint8_t txa_inst_id;
1403 	int ret;
1404 	uint32_t caps = 0;
1405 
1406 	/* Below API already does validation of input parameters.
1407 	 * Hence skipping the validation here.
1408 	 */
1409 	ret = rte_event_eth_tx_adapter_instance_get(eth_dev_id,
1410 						    tx_queue_id,
1411 						    &txa_inst_id);
1412 	if (ret < 0)
1413 		return -EINVAL;
1414 
1415 	txa = txa_service_id_to_data(txa_inst_id);
1416 	ret = rte_event_eth_tx_adapter_caps_get(txa->eventdev_id,
1417 						eth_dev_id,
1418 						&caps);
1419 	if (ret < 0)
1420 		return -EINVAL;
1421 
1422 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT) {
1423 		if (start_state == true) {
1424 			ret = txa_dev_queue_start(txa_inst_id) ?
1425 			      txa_dev_queue_start(txa_inst_id)(txa_inst_id,
1426 							       eth_dev_id,
1427 							       tx_queue_id) : 0;
1428 		} else {
1429 			ret = txa_dev_queue_stop(txa_inst_id) ?
1430 			      txa_dev_queue_stop(txa_inst_id)(txa_inst_id,
1431 							      eth_dev_id,
1432 							      tx_queue_id) : 0;
1433 		}
1434 		return ret;
1435 	}
1436 
1437 	return txa_sw_queue_start_state_set(eth_dev_id, tx_queue_id,
1438 					    start_state, txa);
1439 }
1440 
1441 int
1442 rte_event_eth_tx_adapter_queue_start(uint16_t eth_dev_id, uint16_t tx_queue_id)
1443 {
1444 	rte_eventdev_trace_eth_tx_adapter_queue_start(eth_dev_id, tx_queue_id);
1445 
1446 	return txa_queue_start_state_set(eth_dev_id, tx_queue_id, true);
1447 }
1448 
1449 int
1450 rte_event_eth_tx_adapter_queue_stop(uint16_t eth_dev_id, uint16_t tx_queue_id)
1451 {
1452 	rte_eventdev_trace_eth_tx_adapter_queue_stop(eth_dev_id, tx_queue_id);
1453 
1454 	return txa_queue_start_state_set(eth_dev_id, tx_queue_id, false);
1455 }
1456