xref: /dpdk/lib/eventdev/rte_event_eth_tx_adapter.c (revision 3a80d7fb2ecdd6e8e48e56e3726b26980fa2a089)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation.
3  */
4 #include <rte_spinlock.h>
5 #include <rte_service_component.h>
6 #include <ethdev_driver.h>
7 
8 #include "eventdev_pmd.h"
9 #include "eventdev_trace.h"
10 #include "rte_event_eth_tx_adapter.h"
11 
12 #define TXA_BATCH_SIZE		32
13 #define TXA_SERVICE_NAME_LEN	32
14 #define TXA_MEM_NAME_LEN	32
15 #define TXA_FLUSH_THRESHOLD	1024
16 #define TXA_RETRY_CNT		100
17 #define TXA_MAX_NB_TX		128
18 #define TXA_INVALID_DEV_ID	INT32_C(-1)
19 #define TXA_INVALID_SERVICE_ID	INT64_C(-1)
20 
21 #define TXA_ADAPTER_ARRAY "txa_adapter_array"
22 #define TXA_SERVICE_DATA_ARRAY "txa_service_data_array"
23 
24 #define txa_evdev(id) (&rte_eventdevs[txa_dev_id_array[(id)]])
25 
26 #define txa_dev_caps_get(id) txa_evdev((id))->dev_ops->eth_tx_adapter_caps_get
27 
28 #define txa_dev_adapter_create(t) txa_evdev(t)->dev_ops->eth_tx_adapter_create
29 
30 #define txa_dev_adapter_create_ext(t) \
31 				txa_evdev(t)->dev_ops->eth_tx_adapter_create
32 
33 #define txa_dev_adapter_free(t) txa_evdev(t)->dev_ops->eth_tx_adapter_free
34 
35 #define txa_dev_queue_add(id) txa_evdev(id)->dev_ops->eth_tx_adapter_queue_add
36 
37 #define txa_dev_queue_del(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_del
38 
39 #define txa_dev_start(t) txa_evdev(t)->dev_ops->eth_tx_adapter_start
40 
41 #define txa_dev_stop(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stop
42 
43 #define txa_dev_stats_reset(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_reset
44 
45 #define txa_dev_stats_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_get
46 
47 #define txa_dev_instance_get(id) \
48 			txa_evdev(id)->dev_ops->eth_tx_adapter_instance_get
49 
50 #define txa_dev_queue_start(id) \
51 			txa_evdev(id)->dev_ops->eth_tx_adapter_queue_start
52 
53 #define txa_dev_queue_stop(id) \
54 			txa_evdev(id)->dev_ops->eth_tx_adapter_queue_stop
55 
56 #define RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) \
57 do { \
58 	if (!txa_valid_id(id)) { \
59 		RTE_EDEV_LOG_ERR("Invalid eth Tx adapter id = %d", id); \
60 		return retval; \
61 	} \
62 } while (0)
63 
64 #define TXA_CHECK_OR_ERR_RET(id) \
65 do {\
66 	int ret; \
67 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET((id), -EINVAL); \
68 	ret = txa_init(); \
69 	if (ret != 0) \
70 		return ret; \
71 	if (!txa_adapter_exist((id))) \
72 		return -EINVAL; \
73 } while (0)
74 
75 #define TXA_CHECK_TXQ(dev, queue) \
76 do {\
77 	if ((dev)->data->nb_tx_queues == 0) { \
78 		RTE_EDEV_LOG_ERR("No tx queues configured"); \
79 		return -EINVAL; \
80 	} \
81 	if ((queue) != -1 && \
82 		(uint16_t)(queue) >= (dev)->data->nb_tx_queues) { \
83 		RTE_EDEV_LOG_ERR("Invalid tx queue_id %" PRIu16, \
84 				(uint16_t)(queue)); \
85 		return -EINVAL; \
86 	} \
87 } while (0)
88 
89 /* Tx retry callback structure */
90 struct txa_retry {
91 	/* Ethernet port id */
92 	uint16_t port_id;
93 	/* Tx queue */
94 	uint16_t tx_queue;
95 	/* Adapter ID */
96 	uint8_t id;
97 };
98 
99 /* Per queue structure */
100 struct txa_service_queue_info {
101 	/* Queue has been added */
102 	uint8_t added;
103 	/* Queue is stopped */
104 	bool stopped;
105 	/* Retry callback argument */
106 	struct txa_retry txa_retry;
107 	/* Tx buffer */
108 	struct rte_eth_dev_tx_buffer *tx_buf;
109 };
110 
111 /* PMD private structure */
112 struct txa_service_data {
113 	/* Max mbufs processed in any service function invocation */
114 	uint32_t max_nb_tx;
115 	/* Number of Tx queues in adapter */
116 	uint32_t nb_queues;
117 	/*  Synchronization with data path */
118 	rte_spinlock_t tx_lock;
119 	/* Event port ID */
120 	uint8_t port_id;
121 	/* Event device identifier */
122 	uint8_t eventdev_id;
123 	/* Highest port id supported + 1 */
124 	uint16_t dev_count;
125 	/* Loop count to flush Tx buffers */
126 	int loop_cnt;
127 	/* Per ethernet device structure */
128 	struct txa_service_ethdev *txa_ethdev;
129 	/* Statistics */
130 	struct rte_event_eth_tx_adapter_stats stats;
131 	/* Adapter Identifier */
132 	uint8_t id;
133 	/* Conf arg must be freed */
134 	uint8_t conf_free;
135 	/* Configuration callback */
136 	rte_event_eth_tx_adapter_conf_cb conf_cb;
137 	/* Configuration callback argument */
138 	void *conf_arg;
139 	/* socket id */
140 	int socket_id;
141 	/* Per adapter EAL service */
142 	int64_t service_id;
143 	/* Memory allocation name */
144 	char mem_name[TXA_MEM_NAME_LEN];
145 } __rte_cache_aligned;
146 
147 /* Per eth device structure */
148 struct txa_service_ethdev {
149 	/* Pointer to ethernet device */
150 	struct rte_eth_dev *dev;
151 	/* Number of queues added */
152 	uint16_t nb_queues;
153 	/* PMD specific queue data */
154 	void *queues;
155 };
156 
157 /* Array of adapter instances, initialized with event device id
158  * when adapter is created
159  */
160 static int *txa_dev_id_array;
161 
162 /* Array of pointers to service implementation data */
163 static struct txa_service_data **txa_service_data_array;
164 
165 static int32_t txa_service_func(void *args);
166 static int txa_service_adapter_create_ext(uint8_t id,
167 			struct rte_eventdev *dev,
168 			rte_event_eth_tx_adapter_conf_cb conf_cb,
169 			void *conf_arg);
170 static int txa_service_queue_del(uint8_t id,
171 				const struct rte_eth_dev *dev,
172 				int32_t tx_queue_id);
173 
174 static int
175 txa_adapter_exist(uint8_t id)
176 {
177 	return txa_dev_id_array[id] != TXA_INVALID_DEV_ID;
178 }
179 
180 static inline int
181 txa_valid_id(uint8_t id)
182 {
183 	return id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
184 }
185 
186 static void *
187 txa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
188 {
189 	const struct rte_memzone *mz;
190 	unsigned int sz;
191 
192 	sz = elt_size * nb_elems;
193 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
194 
195 	mz = rte_memzone_lookup(name);
196 	if (mz == NULL) {
197 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
198 						 RTE_CACHE_LINE_SIZE);
199 		if (mz == NULL) {
200 			RTE_EDEV_LOG_ERR("failed to reserve memzone"
201 					" name = %s err = %"
202 					PRId32, name, rte_errno);
203 			return NULL;
204 		}
205 	}
206 
207 	return  mz->addr;
208 }
209 
210 static int
211 txa_lookup(void)
212 {
213 	const struct rte_memzone *mz;
214 
215 	if (txa_dev_id_array == NULL) {
216 		mz = rte_memzone_lookup(TXA_ADAPTER_ARRAY);
217 		if (mz == NULL)
218 			return -ENOMEM;
219 		txa_dev_id_array = mz->addr;
220 	}
221 
222 	if (txa_service_data_array == NULL) {
223 		mz = rte_memzone_lookup(TXA_SERVICE_DATA_ARRAY);
224 		if (mz == NULL)
225 			return -ENOMEM;
226 		txa_service_data_array = mz->addr;
227 	}
228 
229 	return 0;
230 }
231 
232 static int
233 txa_dev_id_array_init(void)
234 {
235 	if (txa_dev_id_array == NULL) {
236 		int i;
237 
238 		txa_dev_id_array = txa_memzone_array_get(TXA_ADAPTER_ARRAY,
239 					sizeof(int),
240 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
241 		if (txa_dev_id_array == NULL)
242 			return -ENOMEM;
243 
244 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
245 			txa_dev_id_array[i] = TXA_INVALID_DEV_ID;
246 	}
247 
248 	return 0;
249 }
250 
251 static int
252 txa_init(void)
253 {
254 	return txa_dev_id_array_init();
255 }
256 
257 static int
258 txa_service_data_init(void)
259 {
260 	if (txa_service_data_array == NULL) {
261 		int i;
262 
263 		txa_service_data_array =
264 				txa_memzone_array_get(TXA_SERVICE_DATA_ARRAY,
265 					sizeof(*txa_service_data_array),
266 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
267 		if (txa_service_data_array == NULL)
268 			return -ENOMEM;
269 
270 		/* Reset the txa service pointers */
271 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
272 			txa_service_data_array[i] = NULL;
273 	}
274 
275 	return 0;
276 }
277 
278 static inline struct txa_service_data *
279 txa_service_id_to_data(uint8_t id)
280 {
281 	return txa_service_data_array[id];
282 }
283 
284 static inline struct txa_service_queue_info *
285 txa_service_queue(struct txa_service_data *txa, uint16_t port_id,
286 		uint16_t tx_queue_id)
287 {
288 	struct txa_service_queue_info *tqi;
289 
290 	if (unlikely(txa->txa_ethdev == NULL || txa->dev_count < port_id + 1))
291 		return NULL;
292 
293 	tqi = txa->txa_ethdev[port_id].queues;
294 
295 	return likely(tqi != NULL) ? tqi + tx_queue_id : NULL;
296 }
297 
298 static int
299 txa_service_conf_cb(uint8_t __rte_unused id, uint8_t dev_id,
300 		struct rte_event_eth_tx_adapter_conf *conf, void *arg)
301 {
302 	int ret;
303 	struct rte_eventdev *dev;
304 	struct rte_event_port_conf *pc;
305 	struct rte_event_dev_config dev_conf;
306 	int started;
307 	uint8_t port_id;
308 
309 	pc = arg;
310 	dev = &rte_eventdevs[dev_id];
311 	dev_conf = dev->data->dev_conf;
312 
313 	started = dev->data->dev_started;
314 	if (started)
315 		rte_event_dev_stop(dev_id);
316 
317 	port_id = dev_conf.nb_event_ports;
318 	dev_conf.nb_event_ports += 1;
319 	if (pc->event_port_cfg & RTE_EVENT_PORT_CFG_SINGLE_LINK)
320 		dev_conf.nb_single_link_event_port_queues += 1;
321 
322 	ret = rte_event_dev_configure(dev_id, &dev_conf);
323 	if (ret) {
324 		RTE_EDEV_LOG_ERR("failed to configure event dev %u",
325 						dev_id);
326 		if (started) {
327 			if (rte_event_dev_start(dev_id))
328 				return -EIO;
329 		}
330 		return ret;
331 	}
332 
333 	ret = rte_event_port_setup(dev_id, port_id, pc);
334 	if (ret) {
335 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
336 					port_id);
337 		if (started) {
338 			if (rte_event_dev_start(dev_id))
339 				return -EIO;
340 		}
341 		return ret;
342 	}
343 
344 	conf->event_port_id = port_id;
345 	conf->max_nb_tx = TXA_MAX_NB_TX;
346 	if (started)
347 		ret = rte_event_dev_start(dev_id);
348 	return ret;
349 }
350 
351 static int
352 txa_service_ethdev_alloc(struct txa_service_data *txa)
353 {
354 	struct txa_service_ethdev *txa_ethdev;
355 	uint16_t i, dev_count;
356 
357 	dev_count = rte_eth_dev_count_avail();
358 	if (txa->txa_ethdev && dev_count == txa->dev_count)
359 		return 0;
360 
361 	txa_ethdev = rte_zmalloc_socket(txa->mem_name,
362 					dev_count * sizeof(*txa_ethdev),
363 					0,
364 					txa->socket_id);
365 	if (txa_ethdev == NULL) {
366 		RTE_EDEV_LOG_ERR("Failed to alloc txa::txa_ethdev ");
367 		return -ENOMEM;
368 	}
369 
370 	if (txa->dev_count)
371 		memcpy(txa_ethdev, txa->txa_ethdev,
372 			txa->dev_count * sizeof(*txa_ethdev));
373 
374 	RTE_ETH_FOREACH_DEV(i) {
375 		if (i == dev_count)
376 			break;
377 		txa_ethdev[i].dev = &rte_eth_devices[i];
378 	}
379 
380 	txa->txa_ethdev = txa_ethdev;
381 	txa->dev_count = dev_count;
382 	return 0;
383 }
384 
385 static int
386 txa_service_queue_array_alloc(struct txa_service_data *txa,
387 			uint16_t port_id)
388 {
389 	struct txa_service_queue_info *tqi;
390 	uint16_t nb_queue;
391 	int ret;
392 
393 	ret = txa_service_ethdev_alloc(txa);
394 	if (ret != 0)
395 		return ret;
396 
397 	if (txa->txa_ethdev[port_id].queues)
398 		return 0;
399 
400 	nb_queue = txa->txa_ethdev[port_id].dev->data->nb_tx_queues;
401 	tqi = rte_zmalloc_socket(txa->mem_name,
402 				nb_queue *
403 				sizeof(struct txa_service_queue_info), 0,
404 				txa->socket_id);
405 	if (tqi == NULL)
406 		return -ENOMEM;
407 	txa->txa_ethdev[port_id].queues = tqi;
408 	return 0;
409 }
410 
411 static void
412 txa_service_queue_array_free(struct txa_service_data *txa,
413 			uint16_t port_id)
414 {
415 	struct txa_service_ethdev *txa_ethdev;
416 	struct txa_service_queue_info *tqi;
417 
418 	txa_ethdev = &txa->txa_ethdev[port_id];
419 	if (txa->txa_ethdev == NULL || txa_ethdev->nb_queues != 0)
420 		return;
421 
422 	tqi = txa_ethdev->queues;
423 	txa_ethdev->queues = NULL;
424 	rte_free(tqi);
425 
426 	if (txa->nb_queues == 0) {
427 		rte_free(txa->txa_ethdev);
428 		txa->txa_ethdev = NULL;
429 	}
430 }
431 
432 static void
433 txa_service_unregister(struct txa_service_data *txa)
434 {
435 	if (txa->service_id != TXA_INVALID_SERVICE_ID) {
436 		rte_service_component_runstate_set(txa->service_id, 0);
437 		while (rte_service_may_be_active(txa->service_id))
438 			rte_pause();
439 		rte_service_component_unregister(txa->service_id);
440 	}
441 	txa->service_id = TXA_INVALID_SERVICE_ID;
442 }
443 
444 static int
445 txa_service_register(struct txa_service_data *txa)
446 {
447 	int ret;
448 	struct rte_service_spec service;
449 	struct rte_event_eth_tx_adapter_conf conf;
450 
451 	if (txa->service_id != TXA_INVALID_SERVICE_ID)
452 		return 0;
453 
454 	memset(&service, 0, sizeof(service));
455 	snprintf(service.name, TXA_SERVICE_NAME_LEN, "txa_%d", txa->id);
456 	service.socket_id = txa->socket_id;
457 	service.callback = txa_service_func;
458 	service.callback_userdata = txa;
459 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
460 	ret = rte_service_component_register(&service,
461 					(uint32_t *)&txa->service_id);
462 	if (ret) {
463 		RTE_EDEV_LOG_ERR("failed to register service %s err = %"
464 				 PRId32, service.name, ret);
465 		return ret;
466 	}
467 
468 	ret = txa->conf_cb(txa->id, txa->eventdev_id, &conf, txa->conf_arg);
469 	if (ret) {
470 		txa_service_unregister(txa);
471 		return ret;
472 	}
473 
474 	rte_service_component_runstate_set(txa->service_id, 1);
475 	txa->port_id = conf.event_port_id;
476 	txa->max_nb_tx = conf.max_nb_tx;
477 	return 0;
478 }
479 
480 static struct rte_eth_dev_tx_buffer *
481 txa_service_tx_buf_alloc(struct txa_service_data *txa,
482 			const struct rte_eth_dev *dev)
483 {
484 	struct rte_eth_dev_tx_buffer *tb;
485 	uint16_t port_id;
486 
487 	port_id = dev->data->port_id;
488 	tb = rte_zmalloc_socket(txa->mem_name,
489 				RTE_ETH_TX_BUFFER_SIZE(TXA_BATCH_SIZE),
490 				0,
491 				rte_eth_dev_socket_id(port_id));
492 	if (tb == NULL)
493 		RTE_EDEV_LOG_ERR("Failed to allocate memory for tx buffer");
494 	return tb;
495 }
496 
497 static int
498 txa_service_is_queue_added(struct txa_service_data *txa,
499 			const struct rte_eth_dev *dev,
500 			uint16_t tx_queue_id)
501 {
502 	struct txa_service_queue_info *tqi;
503 
504 	tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
505 	return tqi && tqi->added;
506 }
507 
508 static int
509 txa_service_ctrl(uint8_t id, int start)
510 {
511 	int ret;
512 	struct txa_service_data *txa;
513 
514 	txa = txa_service_id_to_data(id);
515 	if (txa == NULL || txa->service_id == TXA_INVALID_SERVICE_ID)
516 		return 0;
517 
518 	rte_spinlock_lock(&txa->tx_lock);
519 	ret = rte_service_runstate_set(txa->service_id, start);
520 	rte_spinlock_unlock(&txa->tx_lock);
521 
522 	return ret;
523 }
524 
525 static void
526 txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
527 			void *userdata)
528 {
529 	struct txa_retry *tr;
530 	struct txa_service_data *data;
531 	struct rte_event_eth_tx_adapter_stats *stats;
532 	uint16_t sent = 0;
533 	unsigned int retry = 0;
534 	uint16_t i, n;
535 
536 	tr = (struct txa_retry *)(uintptr_t)userdata;
537 	data = txa_service_id_to_data(tr->id);
538 	stats = &data->stats;
539 
540 	do {
541 		n = rte_eth_tx_burst(tr->port_id, tr->tx_queue,
542 			       &pkts[sent], unsent - sent);
543 
544 		sent += n;
545 	} while (sent != unsent && retry++ < TXA_RETRY_CNT);
546 
547 	for (i = sent; i < unsent; i++)
548 		rte_pktmbuf_free(pkts[i]);
549 
550 	stats->tx_retry += retry;
551 	stats->tx_packets += sent;
552 	stats->tx_dropped += unsent - sent;
553 }
554 
555 static uint16_t
556 txa_process_event_vector(struct txa_service_data *txa,
557 			 struct rte_event_vector *vec)
558 {
559 	struct txa_service_queue_info *tqi;
560 	uint16_t port, queue, nb_tx = 0;
561 	struct rte_mbuf **mbufs;
562 	int i;
563 
564 	mbufs = (struct rte_mbuf **)vec->mbufs;
565 	if (vec->attr_valid) {
566 		port = vec->port;
567 		queue = vec->queue;
568 		tqi = txa_service_queue(txa, port, queue);
569 		if (unlikely(tqi == NULL || !tqi->added || tqi->stopped)) {
570 			rte_pktmbuf_free_bulk(&mbufs[vec->elem_offset],
571 					      vec->nb_elem);
572 			rte_mempool_put(rte_mempool_from_obj(vec), vec);
573 			return 0;
574 		}
575 		for (i = 0; i < vec->nb_elem; i++) {
576 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
577 						   mbufs[i + vec->elem_offset]);
578 		}
579 	} else {
580 		for (i = vec->elem_offset; i < vec->elem_offset + vec->nb_elem;
581 		     i++) {
582 			port = mbufs[i]->port;
583 			queue = rte_event_eth_tx_adapter_txq_get(mbufs[i]);
584 			tqi = txa_service_queue(txa, port, queue);
585 			if (unlikely(tqi == NULL || !tqi->added ||
586 				     tqi->stopped)) {
587 				rte_pktmbuf_free(mbufs[i]);
588 				continue;
589 			}
590 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
591 						   mbufs[i]);
592 		}
593 	}
594 	rte_mempool_put(rte_mempool_from_obj(vec), vec);
595 
596 	return nb_tx;
597 }
598 
599 static void
600 txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
601 	uint32_t n)
602 {
603 	uint32_t i;
604 	uint16_t nb_tx;
605 	struct rte_event_eth_tx_adapter_stats *stats;
606 
607 	stats = &txa->stats;
608 
609 	nb_tx = 0;
610 	for (i = 0; i < n; i++) {
611 		uint16_t port;
612 		uint16_t queue;
613 		struct txa_service_queue_info *tqi;
614 
615 		if (!(ev[i].event_type & RTE_EVENT_TYPE_VECTOR)) {
616 			struct rte_mbuf *m;
617 
618 			m = ev[i].mbuf;
619 			port = m->port;
620 			queue = rte_event_eth_tx_adapter_txq_get(m);
621 
622 			tqi = txa_service_queue(txa, port, queue);
623 			if (unlikely(tqi == NULL || !tqi->added ||
624 				     tqi->stopped)) {
625 				rte_pktmbuf_free(m);
626 				continue;
627 			}
628 
629 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
630 		} else {
631 			nb_tx += txa_process_event_vector(txa, ev[i].vec);
632 		}
633 	}
634 
635 	stats->tx_packets += nb_tx;
636 }
637 
638 static int32_t
639 txa_service_func(void *args)
640 {
641 	struct txa_service_data *txa = args;
642 	uint8_t dev_id;
643 	uint8_t port;
644 	int ret = -EAGAIN;
645 	uint16_t n;
646 	uint32_t nb_tx, max_nb_tx;
647 	struct rte_event ev[TXA_BATCH_SIZE];
648 
649 	dev_id = txa->eventdev_id;
650 	max_nb_tx = txa->max_nb_tx;
651 	port = txa->port_id;
652 
653 	if (txa->nb_queues == 0)
654 		return ret;
655 
656 	if (!rte_spinlock_trylock(&txa->tx_lock))
657 		return ret;
658 
659 	for (nb_tx = 0; nb_tx < max_nb_tx; nb_tx += n) {
660 
661 		n = rte_event_dequeue_burst(dev_id, port, ev, RTE_DIM(ev), 0);
662 		if (!n)
663 			break;
664 		txa_service_tx(txa, ev, n);
665 		ret = 0;
666 	}
667 
668 	if ((txa->loop_cnt++ & (TXA_FLUSH_THRESHOLD - 1)) == 0) {
669 
670 		struct txa_service_ethdev *tdi;
671 		struct txa_service_queue_info *tqi;
672 		struct rte_eth_dev *dev;
673 		uint16_t i;
674 
675 		tdi = txa->txa_ethdev;
676 		nb_tx = 0;
677 
678 		RTE_ETH_FOREACH_DEV(i) {
679 			uint16_t q;
680 
681 			if (i >= txa->dev_count)
682 				break;
683 
684 			dev = tdi[i].dev;
685 			if (tdi[i].nb_queues == 0)
686 				continue;
687 			for (q = 0; q < dev->data->nb_tx_queues; q++) {
688 
689 				tqi = txa_service_queue(txa, i, q);
690 				if (unlikely(tqi == NULL || !tqi->added ||
691 					     tqi->stopped))
692 					continue;
693 
694 				nb_tx += rte_eth_tx_buffer_flush(i, q,
695 							tqi->tx_buf);
696 			}
697 		}
698 
699 		if (likely(nb_tx > 0)) {
700 			txa->stats.tx_packets += nb_tx;
701 			ret = 0;
702 		}
703 	}
704 	rte_spinlock_unlock(&txa->tx_lock);
705 	return ret;
706 }
707 
708 static int
709 txa_service_adapter_create(uint8_t id, struct rte_eventdev *dev,
710 			struct rte_event_port_conf *port_conf)
711 {
712 	struct txa_service_data *txa;
713 	struct rte_event_port_conf *cb_conf;
714 	int ret;
715 
716 	cb_conf = rte_malloc(NULL, sizeof(*cb_conf), 0);
717 	if (cb_conf == NULL)
718 		return -ENOMEM;
719 
720 	*cb_conf = *port_conf;
721 	ret = txa_service_adapter_create_ext(id, dev, txa_service_conf_cb,
722 					cb_conf);
723 	if (ret) {
724 		rte_free(cb_conf);
725 		return ret;
726 	}
727 
728 	txa = txa_service_id_to_data(id);
729 	txa->conf_free = 1;
730 	return ret;
731 }
732 
733 static int
734 txa_service_adapter_create_ext(uint8_t id, struct rte_eventdev *dev,
735 			rte_event_eth_tx_adapter_conf_cb conf_cb,
736 			void *conf_arg)
737 {
738 	struct txa_service_data *txa;
739 	int socket_id;
740 	char mem_name[TXA_SERVICE_NAME_LEN];
741 	int ret;
742 
743 	if (conf_cb == NULL)
744 		return -EINVAL;
745 
746 	socket_id = dev->data->socket_id;
747 	snprintf(mem_name, TXA_MEM_NAME_LEN,
748 		"rte_event_eth_txa_%d",
749 		id);
750 
751 	ret = txa_service_data_init();
752 	if (ret != 0)
753 		return ret;
754 
755 	txa = rte_zmalloc_socket(mem_name,
756 				sizeof(*txa),
757 				RTE_CACHE_LINE_SIZE, socket_id);
758 	if (txa == NULL) {
759 		RTE_EDEV_LOG_ERR("failed to get mem for tx adapter");
760 		return -ENOMEM;
761 	}
762 
763 	txa->id = id;
764 	txa->eventdev_id = dev->data->dev_id;
765 	txa->socket_id = socket_id;
766 	strncpy(txa->mem_name, mem_name, TXA_SERVICE_NAME_LEN);
767 	txa->conf_cb = conf_cb;
768 	txa->conf_arg = conf_arg;
769 	txa->service_id = TXA_INVALID_SERVICE_ID;
770 	rte_spinlock_init(&txa->tx_lock);
771 	txa_service_data_array[id] = txa;
772 
773 	return 0;
774 }
775 
776 static int
777 txa_service_event_port_get(uint8_t id, uint8_t *port)
778 {
779 	struct txa_service_data *txa;
780 
781 	txa = txa_service_id_to_data(id);
782 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
783 		return -ENODEV;
784 
785 	*port = txa->port_id;
786 	return 0;
787 }
788 
789 static int
790 txa_service_adapter_free(uint8_t id)
791 {
792 	struct txa_service_data *txa;
793 
794 	txa = txa_service_id_to_data(id);
795 	if (txa->nb_queues) {
796 		RTE_EDEV_LOG_ERR("%" PRIu16 " Tx queues not deleted",
797 				txa->nb_queues);
798 		return -EBUSY;
799 	}
800 
801 	if (txa->conf_free)
802 		rte_free(txa->conf_arg);
803 	rte_free(txa);
804 	return 0;
805 }
806 
807 static int
808 txa_service_queue_add(uint8_t id,
809 		__rte_unused struct rte_eventdev *dev,
810 		const struct rte_eth_dev *eth_dev,
811 		int32_t tx_queue_id)
812 {
813 	struct txa_service_data *txa;
814 	struct txa_service_ethdev *tdi;
815 	struct txa_service_queue_info *tqi;
816 	struct rte_eth_dev_tx_buffer *tb;
817 	struct txa_retry *txa_retry;
818 	int ret = 0;
819 
820 	txa = txa_service_id_to_data(id);
821 
822 	if (tx_queue_id == -1) {
823 		int nb_queues;
824 		uint16_t i, j;
825 		uint16_t *qdone;
826 
827 		nb_queues = eth_dev->data->nb_tx_queues;
828 		if (txa->dev_count > eth_dev->data->port_id) {
829 			tdi = &txa->txa_ethdev[eth_dev->data->port_id];
830 			nb_queues -= tdi->nb_queues;
831 		}
832 
833 		qdone = rte_zmalloc(txa->mem_name,
834 				nb_queues * sizeof(*qdone), 0);
835 		if (qdone == NULL)
836 			return -ENOMEM;
837 		j = 0;
838 		for (i = 0; i < nb_queues; i++) {
839 			if (txa_service_is_queue_added(txa, eth_dev, i))
840 				continue;
841 			ret = txa_service_queue_add(id, dev, eth_dev, i);
842 			if (ret == 0)
843 				qdone[j++] = i;
844 			else
845 				break;
846 		}
847 
848 		if (i != nb_queues) {
849 			for (i = 0; i < j; i++)
850 				txa_service_queue_del(id, eth_dev, qdone[i]);
851 		}
852 		rte_free(qdone);
853 		return ret;
854 	}
855 
856 	ret = txa_service_register(txa);
857 	if (ret)
858 		return ret;
859 
860 	rte_spinlock_lock(&txa->tx_lock);
861 
862 	if (txa_service_is_queue_added(txa, eth_dev, tx_queue_id))
863 		goto ret_unlock;
864 
865 	ret = txa_service_queue_array_alloc(txa, eth_dev->data->port_id);
866 	if (ret)
867 		goto err_unlock;
868 
869 	tb = txa_service_tx_buf_alloc(txa, eth_dev);
870 	if (tb == NULL)
871 		goto err_unlock;
872 
873 	tdi = &txa->txa_ethdev[eth_dev->data->port_id];
874 	tqi = txa_service_queue(txa, eth_dev->data->port_id, tx_queue_id);
875 	if (tqi == NULL)
876 		goto err_unlock;
877 
878 	txa_retry = &tqi->txa_retry;
879 	txa_retry->id = txa->id;
880 	txa_retry->port_id = eth_dev->data->port_id;
881 	txa_retry->tx_queue = tx_queue_id;
882 
883 	rte_eth_tx_buffer_init(tb, TXA_BATCH_SIZE);
884 	rte_eth_tx_buffer_set_err_callback(tb,
885 		txa_service_buffer_retry, txa_retry);
886 
887 	tqi->tx_buf = tb;
888 	tqi->added = 1;
889 	tqi->stopped = false;
890 	tdi->nb_queues++;
891 	txa->nb_queues++;
892 
893 ret_unlock:
894 	rte_spinlock_unlock(&txa->tx_lock);
895 	return 0;
896 
897 err_unlock:
898 	if (txa->nb_queues == 0) {
899 		txa_service_queue_array_free(txa,
900 					eth_dev->data->port_id);
901 		txa_service_unregister(txa);
902 	}
903 
904 	rte_spinlock_unlock(&txa->tx_lock);
905 	return -1;
906 }
907 
908 static inline void
909 txa_txq_buffer_drain(struct txa_service_queue_info *tqi)
910 {
911 	struct rte_eth_dev_tx_buffer *b;
912 	uint16_t i;
913 
914 	b = tqi->tx_buf;
915 
916 	for (i = 0; i < b->length; i++)
917 		rte_pktmbuf_free(b->pkts[i]);
918 
919 	b->length = 0;
920 }
921 
922 static int
923 txa_service_queue_del(uint8_t id,
924 		const struct rte_eth_dev *dev,
925 		int32_t tx_queue_id)
926 {
927 	struct txa_service_data *txa;
928 	struct txa_service_queue_info *tqi;
929 	struct rte_eth_dev_tx_buffer *tb;
930 	uint16_t port_id;
931 
932 	txa = txa_service_id_to_data(id);
933 	port_id = dev->data->port_id;
934 
935 	if (tx_queue_id == -1) {
936 		uint16_t i, q, nb_queues;
937 		int ret = 0;
938 
939 		if (txa->txa_ethdev == NULL)
940 			return 0;
941 		nb_queues = txa->txa_ethdev[port_id].nb_queues;
942 		if (nb_queues == 0)
943 			return 0;
944 
945 		i = 0;
946 		q = 0;
947 		tqi = txa->txa_ethdev[port_id].queues;
948 
949 		while (i < nb_queues) {
950 
951 			if (tqi[q].added) {
952 				ret = txa_service_queue_del(id, dev, q);
953 				i++;
954 				if (ret != 0)
955 					break;
956 			}
957 			q++;
958 		}
959 		return ret;
960 	}
961 
962 	txa = txa_service_id_to_data(id);
963 
964 	rte_spinlock_lock(&txa->tx_lock);
965 	tqi = txa_service_queue(txa, port_id, tx_queue_id);
966 	if (tqi == NULL || !tqi->added)
967 		goto ret_unlock;
968 
969 	/* Drain the buffered mbufs */
970 	txa_txq_buffer_drain(tqi);
971 	tb = tqi->tx_buf;
972 	tqi->added = 0;
973 	tqi->tx_buf = NULL;
974 	rte_free(tb);
975 	txa->nb_queues--;
976 	txa->txa_ethdev[port_id].nb_queues--;
977 
978 	txa_service_queue_array_free(txa, port_id);
979 
980 ret_unlock:
981 	rte_spinlock_unlock(&txa->tx_lock);
982 	return 0;
983 }
984 
985 static int
986 txa_service_id_get(uint8_t id, uint32_t *service_id)
987 {
988 	struct txa_service_data *txa;
989 
990 	txa = txa_service_id_to_data(id);
991 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
992 		return -ESRCH;
993 
994 	if (service_id == NULL)
995 		return -EINVAL;
996 
997 	*service_id = txa->service_id;
998 	return 0;
999 }
1000 
1001 static int
1002 txa_service_start(uint8_t id)
1003 {
1004 	return txa_service_ctrl(id, 1);
1005 }
1006 
1007 static int
1008 txa_service_stats_get(uint8_t id,
1009 		struct rte_event_eth_tx_adapter_stats *stats)
1010 {
1011 	struct txa_service_data *txa;
1012 
1013 	txa = txa_service_id_to_data(id);
1014 	*stats = txa->stats;
1015 	return 0;
1016 }
1017 
1018 static int
1019 txa_service_stats_reset(uint8_t id)
1020 {
1021 	struct txa_service_data *txa;
1022 
1023 	txa = txa_service_id_to_data(id);
1024 	memset(&txa->stats, 0, sizeof(txa->stats));
1025 	return 0;
1026 }
1027 
1028 static int
1029 txa_service_stop(uint8_t id)
1030 {
1031 	return txa_service_ctrl(id, 0);
1032 }
1033 
1034 
1035 int
1036 rte_event_eth_tx_adapter_create(uint8_t id, uint8_t dev_id,
1037 				struct rte_event_port_conf *port_conf)
1038 {
1039 	struct rte_eventdev *dev;
1040 	int ret;
1041 
1042 	if (port_conf == NULL)
1043 		return -EINVAL;
1044 
1045 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1046 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1047 
1048 	dev = &rte_eventdevs[dev_id];
1049 
1050 	ret = txa_init();
1051 	if (ret != 0)
1052 		return ret;
1053 
1054 	if (txa_adapter_exist(id))
1055 		return -EEXIST;
1056 
1057 	txa_dev_id_array[id] = dev_id;
1058 	if (txa_dev_adapter_create(id))
1059 		ret = txa_dev_adapter_create(id)(id, dev);
1060 
1061 	if (ret != 0) {
1062 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1063 		return ret;
1064 	}
1065 
1066 	ret = txa_service_adapter_create(id, dev, port_conf);
1067 	if (ret != 0) {
1068 		if (txa_dev_adapter_free(id))
1069 			txa_dev_adapter_free(id)(id, dev);
1070 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1071 		return ret;
1072 	}
1073 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, NULL, port_conf,
1074 		ret);
1075 	txa_dev_id_array[id] = dev_id;
1076 	return 0;
1077 }
1078 
1079 int
1080 rte_event_eth_tx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1081 				rte_event_eth_tx_adapter_conf_cb conf_cb,
1082 				void *conf_arg)
1083 {
1084 	struct rte_eventdev *dev;
1085 	int ret;
1086 
1087 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1088 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1089 
1090 	ret = txa_init();
1091 	if (ret != 0)
1092 		return ret;
1093 
1094 	if (txa_adapter_exist(id))
1095 		return -EINVAL;
1096 
1097 	dev = &rte_eventdevs[dev_id];
1098 
1099 	txa_dev_id_array[id] = dev_id;
1100 	if (txa_dev_adapter_create_ext(id))
1101 		ret = txa_dev_adapter_create_ext(id)(id, dev);
1102 
1103 	if (ret != 0) {
1104 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1105 		return ret;
1106 	}
1107 
1108 	ret = txa_service_adapter_create_ext(id, dev, conf_cb, conf_arg);
1109 	if (ret != 0) {
1110 		if (txa_dev_adapter_free(id))
1111 			txa_dev_adapter_free(id)(id, dev);
1112 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1113 		return ret;
1114 	}
1115 
1116 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, conf_cb, conf_arg,
1117 		ret);
1118 	txa_dev_id_array[id] = dev_id;
1119 	return 0;
1120 }
1121 
1122 
1123 int
1124 rte_event_eth_tx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
1125 {
1126 	TXA_CHECK_OR_ERR_RET(id);
1127 
1128 	return txa_service_event_port_get(id, event_port_id);
1129 }
1130 
1131 int
1132 rte_event_eth_tx_adapter_free(uint8_t id)
1133 {
1134 	int ret;
1135 
1136 	TXA_CHECK_OR_ERR_RET(id);
1137 
1138 	ret = txa_dev_adapter_free(id) ?
1139 		txa_dev_adapter_free(id)(id, txa_evdev(id)) :
1140 		0;
1141 
1142 	if (ret == 0)
1143 		ret = txa_service_adapter_free(id);
1144 	txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1145 
1146 	rte_eventdev_trace_eth_tx_adapter_free(id, ret);
1147 	return ret;
1148 }
1149 
1150 int
1151 rte_event_eth_tx_adapter_queue_add(uint8_t id,
1152 				uint16_t eth_dev_id,
1153 				int32_t queue)
1154 {
1155 	struct rte_eth_dev *eth_dev;
1156 	int ret;
1157 	uint32_t caps;
1158 
1159 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1160 	TXA_CHECK_OR_ERR_RET(id);
1161 
1162 	eth_dev = &rte_eth_devices[eth_dev_id];
1163 	TXA_CHECK_TXQ(eth_dev, queue);
1164 
1165 	caps = 0;
1166 	if (txa_dev_caps_get(id))
1167 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1168 
1169 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1170 		ret =  txa_dev_queue_add(id) ?
1171 					txa_dev_queue_add(id)(id,
1172 							txa_evdev(id),
1173 							eth_dev,
1174 							queue) : 0;
1175 	else
1176 		ret = txa_service_queue_add(id, txa_evdev(id), eth_dev, queue);
1177 
1178 	rte_eventdev_trace_eth_tx_adapter_queue_add(id, eth_dev_id, queue,
1179 		ret);
1180 	return ret;
1181 }
1182 
1183 int
1184 rte_event_eth_tx_adapter_queue_del(uint8_t id,
1185 				uint16_t eth_dev_id,
1186 				int32_t queue)
1187 {
1188 	struct rte_eth_dev *eth_dev;
1189 	int ret;
1190 	uint32_t caps;
1191 
1192 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1193 	TXA_CHECK_OR_ERR_RET(id);
1194 
1195 	eth_dev = &rte_eth_devices[eth_dev_id];
1196 
1197 	caps = 0;
1198 
1199 	if (txa_dev_caps_get(id))
1200 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1201 
1202 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1203 		ret =  txa_dev_queue_del(id) ?
1204 					txa_dev_queue_del(id)(id, txa_evdev(id),
1205 							eth_dev,
1206 							queue) : 0;
1207 	else
1208 		ret = txa_service_queue_del(id, eth_dev, queue);
1209 
1210 	rte_eventdev_trace_eth_tx_adapter_queue_del(id, eth_dev_id, queue,
1211 		ret);
1212 	return ret;
1213 }
1214 
1215 int
1216 rte_event_eth_tx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1217 {
1218 	TXA_CHECK_OR_ERR_RET(id);
1219 
1220 	return txa_service_id_get(id, service_id);
1221 }
1222 
1223 int
1224 rte_event_eth_tx_adapter_start(uint8_t id)
1225 {
1226 	int ret;
1227 
1228 	TXA_CHECK_OR_ERR_RET(id);
1229 
1230 	ret = txa_dev_start(id) ? txa_dev_start(id)(id, txa_evdev(id)) : 0;
1231 	if (ret == 0)
1232 		ret = txa_service_start(id);
1233 	rte_eventdev_trace_eth_tx_adapter_start(id, ret);
1234 	return ret;
1235 }
1236 
1237 int
1238 rte_event_eth_tx_adapter_stats_get(uint8_t id,
1239 				struct rte_event_eth_tx_adapter_stats *stats)
1240 {
1241 	int ret;
1242 
1243 	TXA_CHECK_OR_ERR_RET(id);
1244 
1245 	if (stats == NULL)
1246 		return -EINVAL;
1247 
1248 	*stats = (struct rte_event_eth_tx_adapter_stats){0};
1249 
1250 	ret = txa_dev_stats_get(id) ?
1251 			txa_dev_stats_get(id)(id, txa_evdev(id), stats) : 0;
1252 
1253 	if (ret == 0 && txa_service_id_get(id, NULL) != ESRCH) {
1254 		if (txa_dev_stats_get(id)) {
1255 			struct rte_event_eth_tx_adapter_stats service_stats;
1256 
1257 			ret = txa_service_stats_get(id, &service_stats);
1258 			if (ret == 0) {
1259 				stats->tx_retry += service_stats.tx_retry;
1260 				stats->tx_packets += service_stats.tx_packets;
1261 				stats->tx_dropped += service_stats.tx_dropped;
1262 			}
1263 		} else
1264 			ret = txa_service_stats_get(id, stats);
1265 	}
1266 
1267 	return ret;
1268 }
1269 
1270 int
1271 rte_event_eth_tx_adapter_stats_reset(uint8_t id)
1272 {
1273 	int ret;
1274 
1275 	TXA_CHECK_OR_ERR_RET(id);
1276 
1277 	ret = txa_dev_stats_reset(id) ?
1278 		txa_dev_stats_reset(id)(id, txa_evdev(id)) : 0;
1279 	if (ret == 0)
1280 		ret = txa_service_stats_reset(id);
1281 	return ret;
1282 }
1283 
1284 int
1285 rte_event_eth_tx_adapter_stop(uint8_t id)
1286 {
1287 	int ret;
1288 
1289 	TXA_CHECK_OR_ERR_RET(id);
1290 
1291 	ret = txa_dev_stop(id) ? txa_dev_stop(id)(id,  txa_evdev(id)) : 0;
1292 	if (ret == 0)
1293 		ret = txa_service_stop(id);
1294 	rte_eventdev_trace_eth_tx_adapter_stop(id, ret);
1295 	return ret;
1296 }
1297 
1298 int
1299 rte_event_eth_tx_adapter_instance_get(uint16_t eth_dev_id,
1300 				      uint16_t tx_queue_id,
1301 				      uint8_t *txa_inst_id)
1302 {
1303 	uint8_t id;
1304 	int ret = -EINVAL;
1305 	uint32_t caps;
1306 	struct txa_service_data *txa;
1307 
1308 	if (txa_lookup())
1309 		return -ENOMEM;
1310 
1311 	if (eth_dev_id >= rte_eth_dev_count_avail()) {
1312 		RTE_EDEV_LOG_ERR("Invalid ethernet port id %u", eth_dev_id);
1313 		return -EINVAL;
1314 	}
1315 
1316 	if (tx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_tx_queues) {
1317 		RTE_EDEV_LOG_ERR("Invalid tx queue id %u", tx_queue_id);
1318 		return -EINVAL;
1319 	}
1320 
1321 	if (txa_inst_id == NULL) {
1322 		RTE_EDEV_LOG_ERR("txa_instance_id cannot be NULL");
1323 		return -EINVAL;
1324 	}
1325 
1326 	/* Iterate through all Tx adapter instances */
1327 	for (id = 0; id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; id++) {
1328 		txa = txa_service_id_to_data(id);
1329 		if (!txa)
1330 			continue;
1331 
1332 		caps = 0;
1333 		if (rte_event_eth_tx_adapter_caps_get(txa->eventdev_id,
1334 						      eth_dev_id,
1335 						      &caps))
1336 			continue;
1337 
1338 		if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT) {
1339 			ret = txa_dev_instance_get(id) ?
1340 					txa_dev_instance_get(id)(eth_dev_id,
1341 								 tx_queue_id,
1342 								 txa_inst_id)
1343 							: -EINVAL;
1344 			if (ret == 0)
1345 				return ret;
1346 		} else {
1347 			struct rte_eth_dev *eth_dev;
1348 
1349 			eth_dev = &rte_eth_devices[eth_dev_id];
1350 
1351 			if (txa_service_is_queue_added(txa, eth_dev,
1352 						       tx_queue_id)) {
1353 				*txa_inst_id = txa->id;
1354 				return 0;
1355 			}
1356 		}
1357 	}
1358 
1359 	return -EINVAL;
1360 }
1361 
1362 static inline int
1363 txa_sw_queue_start_state_set(uint16_t eth_dev_id, uint16_t tx_queue_id,
1364 			     bool start_state, struct txa_service_data *txa)
1365 {
1366 	struct txa_service_queue_info *tqi = NULL;
1367 
1368 	rte_spinlock_lock(&txa->tx_lock);
1369 	tqi = txa_service_queue(txa, eth_dev_id, tx_queue_id);
1370 	if (unlikely(tqi == NULL || !tqi->added)) {
1371 		rte_spinlock_unlock(&txa->tx_lock);
1372 		return -EINVAL;
1373 	}
1374 	if (start_state == false)
1375 		txa_txq_buffer_drain(tqi);
1376 
1377 	tqi->stopped = !start_state;
1378 	rte_spinlock_unlock(&txa->tx_lock);
1379 	return 0;
1380 }
1381 
1382 static int
1383 txa_queue_start_state_set(uint16_t eth_dev_id, uint16_t tx_queue_id,
1384 			  bool start_state)
1385 {
1386 	struct txa_service_data *txa;
1387 	uint8_t txa_inst_id;
1388 	int ret;
1389 	uint32_t caps = 0;
1390 
1391 	/* Below API already does validation of input parameters.
1392 	 * Hence skipping the validation here.
1393 	 */
1394 	ret = rte_event_eth_tx_adapter_instance_get(eth_dev_id,
1395 						    tx_queue_id,
1396 						    &txa_inst_id);
1397 	if (ret < 0)
1398 		return -EINVAL;
1399 
1400 	txa = txa_service_id_to_data(txa_inst_id);
1401 	ret = rte_event_eth_tx_adapter_caps_get(txa->eventdev_id,
1402 						eth_dev_id,
1403 						&caps);
1404 	if (ret < 0)
1405 		return -EINVAL;
1406 
1407 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT) {
1408 		if (start_state == true) {
1409 			ret = txa_dev_queue_start(txa_inst_id) ?
1410 			      txa_dev_queue_start(txa_inst_id)(txa_inst_id,
1411 							       eth_dev_id,
1412 							       tx_queue_id) : 0;
1413 		} else {
1414 			ret = txa_dev_queue_stop(txa_inst_id) ?
1415 			      txa_dev_queue_stop(txa_inst_id)(txa_inst_id,
1416 							      eth_dev_id,
1417 							      tx_queue_id) : 0;
1418 		}
1419 		return ret;
1420 	}
1421 
1422 	return txa_sw_queue_start_state_set(eth_dev_id, tx_queue_id,
1423 					    start_state, txa);
1424 }
1425 
1426 int
1427 rte_event_eth_tx_adapter_queue_start(uint16_t eth_dev_id, uint16_t tx_queue_id)
1428 {
1429 	return txa_queue_start_state_set(eth_dev_id, tx_queue_id, true);
1430 }
1431 
1432 int
1433 rte_event_eth_tx_adapter_queue_stop(uint16_t eth_dev_id, uint16_t tx_queue_id)
1434 {
1435 	return txa_queue_start_state_set(eth_dev_id, tx_queue_id, false);
1436 }
1437