xref: /dpdk/lib/eventdev/rte_event_eth_tx_adapter.c (revision 515cd4a488b6a0c6e40d20e6b10d8e89657dc23f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation.
3  */
4 #include <rte_spinlock.h>
5 #include <rte_service_component.h>
6 #include <ethdev_driver.h>
7 
8 #include "eventdev_pmd.h"
9 #include "eventdev_trace.h"
10 #include "rte_event_eth_tx_adapter.h"
11 
12 #define TXA_BATCH_SIZE		32
13 #define TXA_SERVICE_NAME_LEN	32
14 #define TXA_MEM_NAME_LEN	32
15 #define TXA_FLUSH_THRESHOLD	1024
16 #define TXA_RETRY_CNT		100
17 #define TXA_MAX_NB_TX		128
18 #define TXA_INVALID_DEV_ID	INT32_C(-1)
19 #define TXA_INVALID_SERVICE_ID	INT64_C(-1)
20 
21 #define TXA_ADAPTER_ARRAY "txa_adapter_array"
22 #define TXA_SERVICE_DATA_ARRAY "txa_service_data_array"
23 
24 #define txa_evdev(id) (&rte_eventdevs[txa_dev_id_array[(id)]])
25 
26 #define txa_dev_caps_get(id) txa_evdev((id))->dev_ops->eth_tx_adapter_caps_get
27 
28 #define txa_dev_adapter_create(t) txa_evdev(t)->dev_ops->eth_tx_adapter_create
29 
30 #define txa_dev_adapter_create_ext(t) \
31 				txa_evdev(t)->dev_ops->eth_tx_adapter_create
32 
33 #define txa_dev_adapter_free(t) txa_evdev(t)->dev_ops->eth_tx_adapter_free
34 
35 #define txa_dev_queue_add(id) txa_evdev(id)->dev_ops->eth_tx_adapter_queue_add
36 
37 #define txa_dev_queue_del(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_del
38 
39 #define txa_dev_start(t) txa_evdev(t)->dev_ops->eth_tx_adapter_start
40 
41 #define txa_dev_stop(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stop
42 
43 #define txa_dev_stats_reset(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_reset
44 
45 #define txa_dev_stats_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_get
46 
47 #define txa_dev_instance_get(id) \
48 			txa_evdev(id)->dev_ops->eth_tx_adapter_instance_get
49 
50 #define txa_dev_queue_start(id) \
51 			txa_evdev(id)->dev_ops->eth_tx_adapter_queue_start
52 
53 #define txa_dev_queue_stop(id) \
54 			txa_evdev(id)->dev_ops->eth_tx_adapter_queue_stop
55 
56 #define RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) \
57 do { \
58 	if (!txa_valid_id(id)) { \
59 		RTE_EDEV_LOG_ERR("Invalid eth Tx adapter id = %d", id); \
60 		return retval; \
61 	} \
62 } while (0)
63 
64 #define TXA_CHECK_OR_ERR_RET(id) \
65 do {\
66 	int ret; \
67 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET((id), -EINVAL); \
68 	ret = txa_init(); \
69 	if (ret != 0) \
70 		return ret; \
71 	if (!txa_adapter_exist((id))) \
72 		return -EINVAL; \
73 } while (0)
74 
75 #define TXA_CHECK_TXQ(dev, queue) \
76 do {\
77 	if ((dev)->data->nb_tx_queues == 0) { \
78 		RTE_EDEV_LOG_ERR("No tx queues configured"); \
79 		return -EINVAL; \
80 	} \
81 	if ((queue) != -1 && \
82 		(uint16_t)(queue) >= (dev)->data->nb_tx_queues) { \
83 		RTE_EDEV_LOG_ERR("Invalid tx queue_id %" PRIu16, \
84 				(uint16_t)(queue)); \
85 		return -EINVAL; \
86 	} \
87 } while (0)
88 
89 /* Tx retry callback structure */
90 struct txa_retry {
91 	/* Ethernet port id */
92 	uint16_t port_id;
93 	/* Tx queue */
94 	uint16_t tx_queue;
95 	/* Adapter ID */
96 	uint8_t id;
97 };
98 
99 /* Per queue structure */
100 struct txa_service_queue_info {
101 	/* Queue has been added */
102 	uint8_t added;
103 	/* Queue is stopped */
104 	bool stopped;
105 	/* Retry callback argument */
106 	struct txa_retry txa_retry;
107 	/* Tx buffer */
108 	struct rte_eth_dev_tx_buffer *tx_buf;
109 };
110 
111 /* PMD private structure */
112 struct txa_service_data {
113 	/* Max mbufs processed in any service function invocation */
114 	uint32_t max_nb_tx;
115 	/* Number of Tx queues in adapter */
116 	uint32_t nb_queues;
117 	/*  Synchronization with data path */
118 	rte_spinlock_t tx_lock;
119 	/* Event port ID */
120 	uint8_t port_id;
121 	/* Event device identifier */
122 	uint8_t eventdev_id;
123 	/* Highest port id supported + 1 */
124 	uint16_t dev_count;
125 	/* Loop count to flush Tx buffers */
126 	int loop_cnt;
127 	/* Per ethernet device structure */
128 	struct txa_service_ethdev *txa_ethdev;
129 	/* Statistics */
130 	struct rte_event_eth_tx_adapter_stats stats;
131 	/* Adapter Identifier */
132 	uint8_t id;
133 	/* Conf arg must be freed */
134 	uint8_t conf_free;
135 	/* Configuration callback */
136 	rte_event_eth_tx_adapter_conf_cb conf_cb;
137 	/* Configuration callback argument */
138 	void *conf_arg;
139 	/* socket id */
140 	int socket_id;
141 	/* Per adapter EAL service */
142 	int64_t service_id;
143 	/* Memory allocation name */
144 	char mem_name[TXA_MEM_NAME_LEN];
145 } __rte_cache_aligned;
146 
147 /* Per eth device structure */
148 struct txa_service_ethdev {
149 	/* Pointer to ethernet device */
150 	struct rte_eth_dev *dev;
151 	/* Number of queues added */
152 	uint16_t nb_queues;
153 	/* PMD specific queue data */
154 	void *queues;
155 };
156 
157 /* Array of adapter instances, initialized with event device id
158  * when adapter is created
159  */
160 static int *txa_dev_id_array;
161 
162 /* Array of pointers to service implementation data */
163 static struct txa_service_data **txa_service_data_array;
164 
165 static int32_t txa_service_func(void *args);
166 static int txa_service_adapter_create_ext(uint8_t id,
167 			struct rte_eventdev *dev,
168 			rte_event_eth_tx_adapter_conf_cb conf_cb,
169 			void *conf_arg);
170 static int txa_service_queue_del(uint8_t id,
171 				const struct rte_eth_dev *dev,
172 				int32_t tx_queue_id);
173 
174 static int
175 txa_adapter_exist(uint8_t id)
176 {
177 	return txa_dev_id_array[id] != TXA_INVALID_DEV_ID;
178 }
179 
180 static inline int
181 txa_valid_id(uint8_t id)
182 {
183 	return id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
184 }
185 
186 static void *
187 txa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
188 {
189 	const struct rte_memzone *mz;
190 	unsigned int sz;
191 
192 	sz = elt_size * nb_elems;
193 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
194 
195 	mz = rte_memzone_lookup(name);
196 	if (mz == NULL) {
197 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
198 						 RTE_CACHE_LINE_SIZE);
199 		if (mz == NULL) {
200 			RTE_EDEV_LOG_ERR("failed to reserve memzone"
201 					" name = %s err = %"
202 					PRId32, name, rte_errno);
203 			return NULL;
204 		}
205 	}
206 
207 	return  mz->addr;
208 }
209 
210 static int
211 txa_lookup(void)
212 {
213 	const struct rte_memzone *mz;
214 
215 	if (txa_dev_id_array == NULL) {
216 		mz = rte_memzone_lookup(TXA_ADAPTER_ARRAY);
217 		if (mz == NULL)
218 			return -ENOMEM;
219 		txa_dev_id_array = mz->addr;
220 	}
221 
222 	if (txa_service_data_array == NULL) {
223 		mz = rte_memzone_lookup(TXA_SERVICE_DATA_ARRAY);
224 		if (mz == NULL)
225 			return -ENOMEM;
226 		txa_service_data_array = mz->addr;
227 	}
228 
229 	return 0;
230 }
231 
232 static int
233 txa_dev_id_array_init(void)
234 {
235 	if (txa_dev_id_array == NULL) {
236 		int i;
237 
238 		txa_dev_id_array = txa_memzone_array_get(TXA_ADAPTER_ARRAY,
239 					sizeof(int),
240 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
241 		if (txa_dev_id_array == NULL)
242 			return -ENOMEM;
243 
244 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
245 			txa_dev_id_array[i] = TXA_INVALID_DEV_ID;
246 	}
247 
248 	return 0;
249 }
250 
251 static int
252 txa_init(void)
253 {
254 	return txa_dev_id_array_init();
255 }
256 
257 static int
258 txa_service_data_init(void)
259 {
260 	if (txa_service_data_array == NULL) {
261 		int i;
262 
263 		txa_service_data_array =
264 				txa_memzone_array_get(TXA_SERVICE_DATA_ARRAY,
265 					sizeof(*txa_service_data_array),
266 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
267 		if (txa_service_data_array == NULL)
268 			return -ENOMEM;
269 
270 		/* Reset the txa service pointers */
271 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
272 			txa_service_data_array[i] = NULL;
273 	}
274 
275 	return 0;
276 }
277 
278 static inline struct txa_service_data *
279 txa_service_id_to_data(uint8_t id)
280 {
281 	return txa_service_data_array[id];
282 }
283 
284 static inline struct txa_service_queue_info *
285 txa_service_queue(struct txa_service_data *txa, uint16_t port_id,
286 		uint16_t tx_queue_id)
287 {
288 	struct txa_service_queue_info *tqi;
289 
290 	if (unlikely(txa->txa_ethdev == NULL || txa->dev_count < port_id + 1))
291 		return NULL;
292 
293 	tqi = txa->txa_ethdev[port_id].queues;
294 
295 	return likely(tqi != NULL) ? tqi + tx_queue_id : NULL;
296 }
297 
298 static int
299 txa_service_conf_cb(uint8_t __rte_unused id, uint8_t dev_id,
300 		struct rte_event_eth_tx_adapter_conf *conf, void *arg)
301 {
302 	int ret;
303 	struct rte_eventdev *dev;
304 	struct rte_event_port_conf *pc;
305 	struct rte_event_dev_config dev_conf;
306 	int started;
307 	uint8_t port_id;
308 
309 	pc = arg;
310 	dev = &rte_eventdevs[dev_id];
311 	dev_conf = dev->data->dev_conf;
312 
313 	started = dev->data->dev_started;
314 	if (started)
315 		rte_event_dev_stop(dev_id);
316 
317 	port_id = dev_conf.nb_event_ports;
318 	dev_conf.nb_event_ports += 1;
319 
320 	ret = rte_event_dev_configure(dev_id, &dev_conf);
321 	if (ret) {
322 		RTE_EDEV_LOG_ERR("failed to configure event dev %u",
323 						dev_id);
324 		if (started) {
325 			if (rte_event_dev_start(dev_id))
326 				return -EIO;
327 		}
328 		return ret;
329 	}
330 
331 	ret = rte_event_port_setup(dev_id, port_id, pc);
332 	if (ret) {
333 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
334 					port_id);
335 		if (started) {
336 			if (rte_event_dev_start(dev_id))
337 				return -EIO;
338 		}
339 		return ret;
340 	}
341 
342 	conf->event_port_id = port_id;
343 	conf->max_nb_tx = TXA_MAX_NB_TX;
344 	if (started)
345 		ret = rte_event_dev_start(dev_id);
346 	return ret;
347 }
348 
349 static int
350 txa_service_ethdev_alloc(struct txa_service_data *txa)
351 {
352 	struct txa_service_ethdev *txa_ethdev;
353 	uint16_t i, dev_count;
354 
355 	dev_count = rte_eth_dev_count_avail();
356 	if (txa->txa_ethdev && dev_count == txa->dev_count)
357 		return 0;
358 
359 	txa_ethdev = rte_zmalloc_socket(txa->mem_name,
360 					dev_count * sizeof(*txa_ethdev),
361 					0,
362 					txa->socket_id);
363 	if (txa_ethdev == NULL) {
364 		RTE_EDEV_LOG_ERR("Failed to alloc txa::txa_ethdev ");
365 		return -ENOMEM;
366 	}
367 
368 	if (txa->dev_count)
369 		memcpy(txa_ethdev, txa->txa_ethdev,
370 			txa->dev_count * sizeof(*txa_ethdev));
371 
372 	RTE_ETH_FOREACH_DEV(i) {
373 		if (i == dev_count)
374 			break;
375 		txa_ethdev[i].dev = &rte_eth_devices[i];
376 	}
377 
378 	txa->txa_ethdev = txa_ethdev;
379 	txa->dev_count = dev_count;
380 	return 0;
381 }
382 
383 static int
384 txa_service_queue_array_alloc(struct txa_service_data *txa,
385 			uint16_t port_id)
386 {
387 	struct txa_service_queue_info *tqi;
388 	uint16_t nb_queue;
389 	int ret;
390 
391 	ret = txa_service_ethdev_alloc(txa);
392 	if (ret != 0)
393 		return ret;
394 
395 	if (txa->txa_ethdev[port_id].queues)
396 		return 0;
397 
398 	nb_queue = txa->txa_ethdev[port_id].dev->data->nb_tx_queues;
399 	tqi = rte_zmalloc_socket(txa->mem_name,
400 				nb_queue *
401 				sizeof(struct txa_service_queue_info), 0,
402 				txa->socket_id);
403 	if (tqi == NULL)
404 		return -ENOMEM;
405 	txa->txa_ethdev[port_id].queues = tqi;
406 	return 0;
407 }
408 
409 static void
410 txa_service_queue_array_free(struct txa_service_data *txa,
411 			uint16_t port_id)
412 {
413 	struct txa_service_ethdev *txa_ethdev;
414 	struct txa_service_queue_info *tqi;
415 
416 	txa_ethdev = &txa->txa_ethdev[port_id];
417 	if (txa->txa_ethdev == NULL || txa_ethdev->nb_queues != 0)
418 		return;
419 
420 	tqi = txa_ethdev->queues;
421 	txa_ethdev->queues = NULL;
422 	rte_free(tqi);
423 
424 	if (txa->nb_queues == 0) {
425 		rte_free(txa->txa_ethdev);
426 		txa->txa_ethdev = NULL;
427 	}
428 }
429 
430 static void
431 txa_service_unregister(struct txa_service_data *txa)
432 {
433 	if (txa->service_id != TXA_INVALID_SERVICE_ID) {
434 		rte_service_component_runstate_set(txa->service_id, 0);
435 		while (rte_service_may_be_active(txa->service_id))
436 			rte_pause();
437 		rte_service_component_unregister(txa->service_id);
438 	}
439 	txa->service_id = TXA_INVALID_SERVICE_ID;
440 }
441 
442 static int
443 txa_service_register(struct txa_service_data *txa)
444 {
445 	int ret;
446 	struct rte_service_spec service;
447 	struct rte_event_eth_tx_adapter_conf conf;
448 
449 	if (txa->service_id != TXA_INVALID_SERVICE_ID)
450 		return 0;
451 
452 	memset(&service, 0, sizeof(service));
453 	snprintf(service.name, TXA_SERVICE_NAME_LEN, "txa_%d", txa->id);
454 	service.socket_id = txa->socket_id;
455 	service.callback = txa_service_func;
456 	service.callback_userdata = txa;
457 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
458 	ret = rte_service_component_register(&service,
459 					(uint32_t *)&txa->service_id);
460 	if (ret) {
461 		RTE_EDEV_LOG_ERR("failed to register service %s err = %"
462 				 PRId32, service.name, ret);
463 		return ret;
464 	}
465 
466 	ret = txa->conf_cb(txa->id, txa->eventdev_id, &conf, txa->conf_arg);
467 	if (ret) {
468 		txa_service_unregister(txa);
469 		return ret;
470 	}
471 
472 	rte_service_component_runstate_set(txa->service_id, 1);
473 	txa->port_id = conf.event_port_id;
474 	txa->max_nb_tx = conf.max_nb_tx;
475 	return 0;
476 }
477 
478 static struct rte_eth_dev_tx_buffer *
479 txa_service_tx_buf_alloc(struct txa_service_data *txa,
480 			const struct rte_eth_dev *dev)
481 {
482 	struct rte_eth_dev_tx_buffer *tb;
483 	uint16_t port_id;
484 
485 	port_id = dev->data->port_id;
486 	tb = rte_zmalloc_socket(txa->mem_name,
487 				RTE_ETH_TX_BUFFER_SIZE(TXA_BATCH_SIZE),
488 				0,
489 				rte_eth_dev_socket_id(port_id));
490 	if (tb == NULL)
491 		RTE_EDEV_LOG_ERR("Failed to allocate memory for tx buffer");
492 	return tb;
493 }
494 
495 static int
496 txa_service_is_queue_added(struct txa_service_data *txa,
497 			const struct rte_eth_dev *dev,
498 			uint16_t tx_queue_id)
499 {
500 	struct txa_service_queue_info *tqi;
501 
502 	tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
503 	return tqi && tqi->added;
504 }
505 
506 static int
507 txa_service_ctrl(uint8_t id, int start)
508 {
509 	int ret;
510 	struct txa_service_data *txa;
511 
512 	txa = txa_service_id_to_data(id);
513 	if (txa == NULL || txa->service_id == TXA_INVALID_SERVICE_ID)
514 		return 0;
515 
516 	rte_spinlock_lock(&txa->tx_lock);
517 	ret = rte_service_runstate_set(txa->service_id, start);
518 	rte_spinlock_unlock(&txa->tx_lock);
519 
520 	return ret;
521 }
522 
523 static void
524 txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
525 			void *userdata)
526 {
527 	struct txa_retry *tr;
528 	struct txa_service_data *data;
529 	struct rte_event_eth_tx_adapter_stats *stats;
530 	uint16_t sent = 0;
531 	unsigned int retry = 0;
532 	uint16_t i, n;
533 
534 	tr = (struct txa_retry *)(uintptr_t)userdata;
535 	data = txa_service_id_to_data(tr->id);
536 	stats = &data->stats;
537 
538 	do {
539 		n = rte_eth_tx_burst(tr->port_id, tr->tx_queue,
540 			       &pkts[sent], unsent - sent);
541 
542 		sent += n;
543 	} while (sent != unsent && retry++ < TXA_RETRY_CNT);
544 
545 	for (i = sent; i < unsent; i++)
546 		rte_pktmbuf_free(pkts[i]);
547 
548 	stats->tx_retry += retry;
549 	stats->tx_packets += sent;
550 	stats->tx_dropped += unsent - sent;
551 }
552 
553 static uint16_t
554 txa_process_event_vector(struct txa_service_data *txa,
555 			 struct rte_event_vector *vec)
556 {
557 	struct txa_service_queue_info *tqi;
558 	uint16_t port, queue, nb_tx = 0;
559 	struct rte_mbuf **mbufs;
560 	int i;
561 
562 	mbufs = (struct rte_mbuf **)vec->mbufs;
563 	if (vec->attr_valid) {
564 		port = vec->port;
565 		queue = vec->queue;
566 		tqi = txa_service_queue(txa, port, queue);
567 		if (unlikely(tqi == NULL || !tqi->added || tqi->stopped)) {
568 			rte_pktmbuf_free_bulk(&mbufs[vec->elem_offset],
569 					      vec->nb_elem);
570 			rte_mempool_put(rte_mempool_from_obj(vec), vec);
571 			return 0;
572 		}
573 		for (i = 0; i < vec->nb_elem; i++) {
574 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
575 						   mbufs[i + vec->elem_offset]);
576 		}
577 	} else {
578 		for (i = vec->elem_offset; i < vec->elem_offset + vec->nb_elem;
579 		     i++) {
580 			port = mbufs[i]->port;
581 			queue = rte_event_eth_tx_adapter_txq_get(mbufs[i]);
582 			tqi = txa_service_queue(txa, port, queue);
583 			if (unlikely(tqi == NULL || !tqi->added ||
584 				     tqi->stopped)) {
585 				rte_pktmbuf_free(mbufs[i]);
586 				continue;
587 			}
588 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
589 						   mbufs[i]);
590 		}
591 	}
592 	rte_mempool_put(rte_mempool_from_obj(vec), vec);
593 
594 	return nb_tx;
595 }
596 
597 static void
598 txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
599 	uint32_t n)
600 {
601 	uint32_t i;
602 	uint16_t nb_tx;
603 	struct rte_event_eth_tx_adapter_stats *stats;
604 
605 	stats = &txa->stats;
606 
607 	nb_tx = 0;
608 	for (i = 0; i < n; i++) {
609 		uint16_t port;
610 		uint16_t queue;
611 		struct txa_service_queue_info *tqi;
612 
613 		if (!(ev[i].event_type & RTE_EVENT_TYPE_VECTOR)) {
614 			struct rte_mbuf *m;
615 
616 			m = ev[i].mbuf;
617 			port = m->port;
618 			queue = rte_event_eth_tx_adapter_txq_get(m);
619 
620 			tqi = txa_service_queue(txa, port, queue);
621 			if (unlikely(tqi == NULL || !tqi->added ||
622 				     tqi->stopped)) {
623 				rte_pktmbuf_free(m);
624 				continue;
625 			}
626 
627 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
628 		} else {
629 			nb_tx += txa_process_event_vector(txa, ev[i].vec);
630 		}
631 	}
632 
633 	stats->tx_packets += nb_tx;
634 }
635 
636 static int32_t
637 txa_service_func(void *args)
638 {
639 	struct txa_service_data *txa = args;
640 	uint8_t dev_id;
641 	uint8_t port;
642 	uint16_t n;
643 	uint32_t nb_tx, max_nb_tx;
644 	struct rte_event ev[TXA_BATCH_SIZE];
645 
646 	dev_id = txa->eventdev_id;
647 	max_nb_tx = txa->max_nb_tx;
648 	port = txa->port_id;
649 
650 	if (txa->nb_queues == 0)
651 		return 0;
652 
653 	if (!rte_spinlock_trylock(&txa->tx_lock))
654 		return 0;
655 
656 	for (nb_tx = 0; nb_tx < max_nb_tx; nb_tx += n) {
657 
658 		n = rte_event_dequeue_burst(dev_id, port, ev, RTE_DIM(ev), 0);
659 		if (!n)
660 			break;
661 		txa_service_tx(txa, ev, n);
662 	}
663 
664 	if ((txa->loop_cnt++ & (TXA_FLUSH_THRESHOLD - 1)) == 0) {
665 
666 		struct txa_service_ethdev *tdi;
667 		struct txa_service_queue_info *tqi;
668 		struct rte_eth_dev *dev;
669 		uint16_t i;
670 
671 		tdi = txa->txa_ethdev;
672 		nb_tx = 0;
673 
674 		RTE_ETH_FOREACH_DEV(i) {
675 			uint16_t q;
676 
677 			if (i == txa->dev_count)
678 				break;
679 
680 			dev = tdi[i].dev;
681 			if (tdi[i].nb_queues == 0)
682 				continue;
683 			for (q = 0; q < dev->data->nb_tx_queues; q++) {
684 
685 				tqi = txa_service_queue(txa, i, q);
686 				if (unlikely(tqi == NULL || !tqi->added ||
687 					     tqi->stopped))
688 					continue;
689 
690 				nb_tx += rte_eth_tx_buffer_flush(i, q,
691 							tqi->tx_buf);
692 			}
693 		}
694 
695 		txa->stats.tx_packets += nb_tx;
696 	}
697 	rte_spinlock_unlock(&txa->tx_lock);
698 	return 0;
699 }
700 
701 static int
702 txa_service_adapter_create(uint8_t id, struct rte_eventdev *dev,
703 			struct rte_event_port_conf *port_conf)
704 {
705 	struct txa_service_data *txa;
706 	struct rte_event_port_conf *cb_conf;
707 	int ret;
708 
709 	cb_conf = rte_malloc(NULL, sizeof(*cb_conf), 0);
710 	if (cb_conf == NULL)
711 		return -ENOMEM;
712 
713 	*cb_conf = *port_conf;
714 	ret = txa_service_adapter_create_ext(id, dev, txa_service_conf_cb,
715 					cb_conf);
716 	if (ret) {
717 		rte_free(cb_conf);
718 		return ret;
719 	}
720 
721 	txa = txa_service_id_to_data(id);
722 	txa->conf_free = 1;
723 	return ret;
724 }
725 
726 static int
727 txa_service_adapter_create_ext(uint8_t id, struct rte_eventdev *dev,
728 			rte_event_eth_tx_adapter_conf_cb conf_cb,
729 			void *conf_arg)
730 {
731 	struct txa_service_data *txa;
732 	int socket_id;
733 	char mem_name[TXA_SERVICE_NAME_LEN];
734 	int ret;
735 
736 	if (conf_cb == NULL)
737 		return -EINVAL;
738 
739 	socket_id = dev->data->socket_id;
740 	snprintf(mem_name, TXA_MEM_NAME_LEN,
741 		"rte_event_eth_txa_%d",
742 		id);
743 
744 	ret = txa_service_data_init();
745 	if (ret != 0)
746 		return ret;
747 
748 	txa = rte_zmalloc_socket(mem_name,
749 				sizeof(*txa),
750 				RTE_CACHE_LINE_SIZE, socket_id);
751 	if (txa == NULL) {
752 		RTE_EDEV_LOG_ERR("failed to get mem for tx adapter");
753 		return -ENOMEM;
754 	}
755 
756 	txa->id = id;
757 	txa->eventdev_id = dev->data->dev_id;
758 	txa->socket_id = socket_id;
759 	strncpy(txa->mem_name, mem_name, TXA_SERVICE_NAME_LEN);
760 	txa->conf_cb = conf_cb;
761 	txa->conf_arg = conf_arg;
762 	txa->service_id = TXA_INVALID_SERVICE_ID;
763 	rte_spinlock_init(&txa->tx_lock);
764 	txa_service_data_array[id] = txa;
765 
766 	return 0;
767 }
768 
769 static int
770 txa_service_event_port_get(uint8_t id, uint8_t *port)
771 {
772 	struct txa_service_data *txa;
773 
774 	txa = txa_service_id_to_data(id);
775 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
776 		return -ENODEV;
777 
778 	*port = txa->port_id;
779 	return 0;
780 }
781 
782 static int
783 txa_service_adapter_free(uint8_t id)
784 {
785 	struct txa_service_data *txa;
786 
787 	txa = txa_service_id_to_data(id);
788 	if (txa->nb_queues) {
789 		RTE_EDEV_LOG_ERR("%" PRIu16 " Tx queues not deleted",
790 				txa->nb_queues);
791 		return -EBUSY;
792 	}
793 
794 	if (txa->conf_free)
795 		rte_free(txa->conf_arg);
796 	rte_free(txa);
797 	return 0;
798 }
799 
800 static int
801 txa_service_queue_add(uint8_t id,
802 		__rte_unused struct rte_eventdev *dev,
803 		const struct rte_eth_dev *eth_dev,
804 		int32_t tx_queue_id)
805 {
806 	struct txa_service_data *txa;
807 	struct txa_service_ethdev *tdi;
808 	struct txa_service_queue_info *tqi;
809 	struct rte_eth_dev_tx_buffer *tb;
810 	struct txa_retry *txa_retry;
811 	int ret = 0;
812 
813 	txa = txa_service_id_to_data(id);
814 
815 	if (tx_queue_id == -1) {
816 		int nb_queues;
817 		uint16_t i, j;
818 		uint16_t *qdone;
819 
820 		nb_queues = eth_dev->data->nb_tx_queues;
821 		if (txa->dev_count > eth_dev->data->port_id) {
822 			tdi = &txa->txa_ethdev[eth_dev->data->port_id];
823 			nb_queues -= tdi->nb_queues;
824 		}
825 
826 		qdone = rte_zmalloc(txa->mem_name,
827 				nb_queues * sizeof(*qdone), 0);
828 		if (qdone == NULL)
829 			return -ENOMEM;
830 		j = 0;
831 		for (i = 0; i < nb_queues; i++) {
832 			if (txa_service_is_queue_added(txa, eth_dev, i))
833 				continue;
834 			ret = txa_service_queue_add(id, dev, eth_dev, i);
835 			if (ret == 0)
836 				qdone[j++] = i;
837 			else
838 				break;
839 		}
840 
841 		if (i != nb_queues) {
842 			for (i = 0; i < j; i++)
843 				txa_service_queue_del(id, eth_dev, qdone[i]);
844 		}
845 		rte_free(qdone);
846 		return ret;
847 	}
848 
849 	ret = txa_service_register(txa);
850 	if (ret)
851 		return ret;
852 
853 	rte_spinlock_lock(&txa->tx_lock);
854 
855 	if (txa_service_is_queue_added(txa, eth_dev, tx_queue_id))
856 		goto ret_unlock;
857 
858 	ret = txa_service_queue_array_alloc(txa, eth_dev->data->port_id);
859 	if (ret)
860 		goto err_unlock;
861 
862 	tb = txa_service_tx_buf_alloc(txa, eth_dev);
863 	if (tb == NULL)
864 		goto err_unlock;
865 
866 	tdi = &txa->txa_ethdev[eth_dev->data->port_id];
867 	tqi = txa_service_queue(txa, eth_dev->data->port_id, tx_queue_id);
868 	if (tqi == NULL)
869 		goto err_unlock;
870 
871 	txa_retry = &tqi->txa_retry;
872 	txa_retry->id = txa->id;
873 	txa_retry->port_id = eth_dev->data->port_id;
874 	txa_retry->tx_queue = tx_queue_id;
875 
876 	rte_eth_tx_buffer_init(tb, TXA_BATCH_SIZE);
877 	rte_eth_tx_buffer_set_err_callback(tb,
878 		txa_service_buffer_retry, txa_retry);
879 
880 	tqi->tx_buf = tb;
881 	tqi->added = 1;
882 	tqi->stopped = false;
883 	tdi->nb_queues++;
884 	txa->nb_queues++;
885 
886 ret_unlock:
887 	rte_spinlock_unlock(&txa->tx_lock);
888 	return 0;
889 
890 err_unlock:
891 	if (txa->nb_queues == 0) {
892 		txa_service_queue_array_free(txa,
893 					eth_dev->data->port_id);
894 		txa_service_unregister(txa);
895 	}
896 
897 	rte_spinlock_unlock(&txa->tx_lock);
898 	return -1;
899 }
900 
901 static inline void
902 txa_txq_buffer_drain(struct txa_service_queue_info *tqi)
903 {
904 	struct rte_eth_dev_tx_buffer *b;
905 	uint16_t i;
906 
907 	b = tqi->tx_buf;
908 
909 	for (i = 0; i < b->length; i++)
910 		rte_pktmbuf_free(b->pkts[i]);
911 
912 	b->length = 0;
913 }
914 
915 static int
916 txa_service_queue_del(uint8_t id,
917 		const struct rte_eth_dev *dev,
918 		int32_t tx_queue_id)
919 {
920 	struct txa_service_data *txa;
921 	struct txa_service_queue_info *tqi;
922 	struct rte_eth_dev_tx_buffer *tb;
923 	uint16_t port_id;
924 
925 	txa = txa_service_id_to_data(id);
926 	port_id = dev->data->port_id;
927 
928 	if (tx_queue_id == -1) {
929 		uint16_t i, q, nb_queues;
930 		int ret = 0;
931 
932 		nb_queues = txa->txa_ethdev[port_id].nb_queues;
933 		if (nb_queues == 0)
934 			return 0;
935 
936 		i = 0;
937 		q = 0;
938 		tqi = txa->txa_ethdev[port_id].queues;
939 
940 		while (i < nb_queues) {
941 
942 			if (tqi[q].added) {
943 				ret = txa_service_queue_del(id, dev, q);
944 				if (ret != 0)
945 					break;
946 			}
947 			i++;
948 			q++;
949 		}
950 		return ret;
951 	}
952 
953 	txa = txa_service_id_to_data(id);
954 
955 	rte_spinlock_lock(&txa->tx_lock);
956 	tqi = txa_service_queue(txa, port_id, tx_queue_id);
957 	if (tqi == NULL || !tqi->added)
958 		goto ret_unlock;
959 
960 	/* Drain the buffered mbufs */
961 	txa_txq_buffer_drain(tqi);
962 	tb = tqi->tx_buf;
963 	tqi->added = 0;
964 	tqi->tx_buf = NULL;
965 	rte_free(tb);
966 	txa->nb_queues--;
967 	txa->txa_ethdev[port_id].nb_queues--;
968 
969 	txa_service_queue_array_free(txa, port_id);
970 
971 ret_unlock:
972 	rte_spinlock_unlock(&txa->tx_lock);
973 	return 0;
974 }
975 
976 static int
977 txa_service_id_get(uint8_t id, uint32_t *service_id)
978 {
979 	struct txa_service_data *txa;
980 
981 	txa = txa_service_id_to_data(id);
982 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
983 		return -ESRCH;
984 
985 	if (service_id == NULL)
986 		return -EINVAL;
987 
988 	*service_id = txa->service_id;
989 	return 0;
990 }
991 
992 static int
993 txa_service_start(uint8_t id)
994 {
995 	return txa_service_ctrl(id, 1);
996 }
997 
998 static int
999 txa_service_stats_get(uint8_t id,
1000 		struct rte_event_eth_tx_adapter_stats *stats)
1001 {
1002 	struct txa_service_data *txa;
1003 
1004 	txa = txa_service_id_to_data(id);
1005 	*stats = txa->stats;
1006 	return 0;
1007 }
1008 
1009 static int
1010 txa_service_stats_reset(uint8_t id)
1011 {
1012 	struct txa_service_data *txa;
1013 
1014 	txa = txa_service_id_to_data(id);
1015 	memset(&txa->stats, 0, sizeof(txa->stats));
1016 	return 0;
1017 }
1018 
1019 static int
1020 txa_service_stop(uint8_t id)
1021 {
1022 	return txa_service_ctrl(id, 0);
1023 }
1024 
1025 
1026 int
1027 rte_event_eth_tx_adapter_create(uint8_t id, uint8_t dev_id,
1028 				struct rte_event_port_conf *port_conf)
1029 {
1030 	struct rte_eventdev *dev;
1031 	int ret;
1032 
1033 	if (port_conf == NULL)
1034 		return -EINVAL;
1035 
1036 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1037 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1038 
1039 	dev = &rte_eventdevs[dev_id];
1040 
1041 	ret = txa_init();
1042 	if (ret != 0)
1043 		return ret;
1044 
1045 	if (txa_adapter_exist(id))
1046 		return -EEXIST;
1047 
1048 	txa_dev_id_array[id] = dev_id;
1049 	if (txa_dev_adapter_create(id))
1050 		ret = txa_dev_adapter_create(id)(id, dev);
1051 
1052 	if (ret != 0) {
1053 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1054 		return ret;
1055 	}
1056 
1057 	ret = txa_service_adapter_create(id, dev, port_conf);
1058 	if (ret != 0) {
1059 		if (txa_dev_adapter_free(id))
1060 			txa_dev_adapter_free(id)(id, dev);
1061 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1062 		return ret;
1063 	}
1064 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, NULL, port_conf,
1065 		ret);
1066 	txa_dev_id_array[id] = dev_id;
1067 	return 0;
1068 }
1069 
1070 int
1071 rte_event_eth_tx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1072 				rte_event_eth_tx_adapter_conf_cb conf_cb,
1073 				void *conf_arg)
1074 {
1075 	struct rte_eventdev *dev;
1076 	int ret;
1077 
1078 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1079 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1080 
1081 	ret = txa_init();
1082 	if (ret != 0)
1083 		return ret;
1084 
1085 	if (txa_adapter_exist(id))
1086 		return -EINVAL;
1087 
1088 	dev = &rte_eventdevs[dev_id];
1089 
1090 	txa_dev_id_array[id] = dev_id;
1091 	if (txa_dev_adapter_create_ext(id))
1092 		ret = txa_dev_adapter_create_ext(id)(id, dev);
1093 
1094 	if (ret != 0) {
1095 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1096 		return ret;
1097 	}
1098 
1099 	ret = txa_service_adapter_create_ext(id, dev, conf_cb, conf_arg);
1100 	if (ret != 0) {
1101 		if (txa_dev_adapter_free(id))
1102 			txa_dev_adapter_free(id)(id, dev);
1103 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1104 		return ret;
1105 	}
1106 
1107 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, conf_cb, conf_arg,
1108 		ret);
1109 	txa_dev_id_array[id] = dev_id;
1110 	return 0;
1111 }
1112 
1113 
1114 int
1115 rte_event_eth_tx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
1116 {
1117 	TXA_CHECK_OR_ERR_RET(id);
1118 
1119 	return txa_service_event_port_get(id, event_port_id);
1120 }
1121 
1122 int
1123 rte_event_eth_tx_adapter_free(uint8_t id)
1124 {
1125 	int ret;
1126 
1127 	TXA_CHECK_OR_ERR_RET(id);
1128 
1129 	ret = txa_dev_adapter_free(id) ?
1130 		txa_dev_adapter_free(id)(id, txa_evdev(id)) :
1131 		0;
1132 
1133 	if (ret == 0)
1134 		ret = txa_service_adapter_free(id);
1135 	txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1136 
1137 	rte_eventdev_trace_eth_tx_adapter_free(id, ret);
1138 	return ret;
1139 }
1140 
1141 int
1142 rte_event_eth_tx_adapter_queue_add(uint8_t id,
1143 				uint16_t eth_dev_id,
1144 				int32_t queue)
1145 {
1146 	struct rte_eth_dev *eth_dev;
1147 	int ret;
1148 	uint32_t caps;
1149 
1150 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1151 	TXA_CHECK_OR_ERR_RET(id);
1152 
1153 	eth_dev = &rte_eth_devices[eth_dev_id];
1154 	TXA_CHECK_TXQ(eth_dev, queue);
1155 
1156 	caps = 0;
1157 	if (txa_dev_caps_get(id))
1158 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1159 
1160 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1161 		ret =  txa_dev_queue_add(id) ?
1162 					txa_dev_queue_add(id)(id,
1163 							txa_evdev(id),
1164 							eth_dev,
1165 							queue) : 0;
1166 	else
1167 		ret = txa_service_queue_add(id, txa_evdev(id), eth_dev, queue);
1168 
1169 	rte_eventdev_trace_eth_tx_adapter_queue_add(id, eth_dev_id, queue,
1170 		ret);
1171 	return ret;
1172 }
1173 
1174 int
1175 rte_event_eth_tx_adapter_queue_del(uint8_t id,
1176 				uint16_t eth_dev_id,
1177 				int32_t queue)
1178 {
1179 	struct rte_eth_dev *eth_dev;
1180 	int ret;
1181 	uint32_t caps;
1182 
1183 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1184 	TXA_CHECK_OR_ERR_RET(id);
1185 
1186 	eth_dev = &rte_eth_devices[eth_dev_id];
1187 
1188 	caps = 0;
1189 
1190 	if (txa_dev_caps_get(id))
1191 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1192 
1193 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1194 		ret =  txa_dev_queue_del(id) ?
1195 					txa_dev_queue_del(id)(id, txa_evdev(id),
1196 							eth_dev,
1197 							queue) : 0;
1198 	else
1199 		ret = txa_service_queue_del(id, eth_dev, queue);
1200 
1201 	rte_eventdev_trace_eth_tx_adapter_queue_del(id, eth_dev_id, queue,
1202 		ret);
1203 	return ret;
1204 }
1205 
1206 int
1207 rte_event_eth_tx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1208 {
1209 	TXA_CHECK_OR_ERR_RET(id);
1210 
1211 	return txa_service_id_get(id, service_id);
1212 }
1213 
1214 int
1215 rte_event_eth_tx_adapter_start(uint8_t id)
1216 {
1217 	int ret;
1218 
1219 	TXA_CHECK_OR_ERR_RET(id);
1220 
1221 	ret = txa_dev_start(id) ? txa_dev_start(id)(id, txa_evdev(id)) : 0;
1222 	if (ret == 0)
1223 		ret = txa_service_start(id);
1224 	rte_eventdev_trace_eth_tx_adapter_start(id, ret);
1225 	return ret;
1226 }
1227 
1228 int
1229 rte_event_eth_tx_adapter_stats_get(uint8_t id,
1230 				struct rte_event_eth_tx_adapter_stats *stats)
1231 {
1232 	int ret;
1233 
1234 	TXA_CHECK_OR_ERR_RET(id);
1235 
1236 	if (stats == NULL)
1237 		return -EINVAL;
1238 
1239 	*stats = (struct rte_event_eth_tx_adapter_stats){0};
1240 
1241 	ret = txa_dev_stats_get(id) ?
1242 			txa_dev_stats_get(id)(id, txa_evdev(id), stats) : 0;
1243 
1244 	if (ret == 0 && txa_service_id_get(id, NULL) != ESRCH) {
1245 		if (txa_dev_stats_get(id)) {
1246 			struct rte_event_eth_tx_adapter_stats service_stats;
1247 
1248 			ret = txa_service_stats_get(id, &service_stats);
1249 			if (ret == 0) {
1250 				stats->tx_retry += service_stats.tx_retry;
1251 				stats->tx_packets += service_stats.tx_packets;
1252 				stats->tx_dropped += service_stats.tx_dropped;
1253 			}
1254 		} else
1255 			ret = txa_service_stats_get(id, stats);
1256 	}
1257 
1258 	return ret;
1259 }
1260 
1261 int
1262 rte_event_eth_tx_adapter_stats_reset(uint8_t id)
1263 {
1264 	int ret;
1265 
1266 	TXA_CHECK_OR_ERR_RET(id);
1267 
1268 	ret = txa_dev_stats_reset(id) ?
1269 		txa_dev_stats_reset(id)(id, txa_evdev(id)) : 0;
1270 	if (ret == 0)
1271 		ret = txa_service_stats_reset(id);
1272 	return ret;
1273 }
1274 
1275 int
1276 rte_event_eth_tx_adapter_stop(uint8_t id)
1277 {
1278 	int ret;
1279 
1280 	TXA_CHECK_OR_ERR_RET(id);
1281 
1282 	ret = txa_dev_stop(id) ? txa_dev_stop(id)(id,  txa_evdev(id)) : 0;
1283 	if (ret == 0)
1284 		ret = txa_service_stop(id);
1285 	rte_eventdev_trace_eth_tx_adapter_stop(id, ret);
1286 	return ret;
1287 }
1288 
1289 int
1290 rte_event_eth_tx_adapter_instance_get(uint16_t eth_dev_id,
1291 				      uint16_t tx_queue_id,
1292 				      uint8_t *txa_inst_id)
1293 {
1294 	uint8_t id;
1295 	int ret = -EINVAL;
1296 	uint32_t caps;
1297 	struct txa_service_data *txa;
1298 
1299 	if (txa_lookup())
1300 		return -ENOMEM;
1301 
1302 	if (eth_dev_id >= rte_eth_dev_count_avail()) {
1303 		RTE_EDEV_LOG_ERR("Invalid ethernet port id %u", eth_dev_id);
1304 		return -EINVAL;
1305 	}
1306 
1307 	if (tx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_tx_queues) {
1308 		RTE_EDEV_LOG_ERR("Invalid tx queue id %u", tx_queue_id);
1309 		return -EINVAL;
1310 	}
1311 
1312 	if (txa_inst_id == NULL) {
1313 		RTE_EDEV_LOG_ERR("txa_instance_id cannot be NULL");
1314 		return -EINVAL;
1315 	}
1316 
1317 	/* Iterate through all Tx adapter instances */
1318 	for (id = 0; id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; id++) {
1319 		txa = txa_service_id_to_data(id);
1320 		if (!txa)
1321 			continue;
1322 
1323 		caps = 0;
1324 		if (rte_event_eth_tx_adapter_caps_get(txa->eventdev_id,
1325 						      eth_dev_id,
1326 						      &caps))
1327 			continue;
1328 
1329 		if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT) {
1330 			ret = txa_dev_instance_get(id) ?
1331 					txa_dev_instance_get(id)(eth_dev_id,
1332 								 tx_queue_id,
1333 								 txa_inst_id)
1334 							: -EINVAL;
1335 			if (ret == 0)
1336 				return ret;
1337 		} else {
1338 			struct rte_eth_dev *eth_dev;
1339 
1340 			eth_dev = &rte_eth_devices[eth_dev_id];
1341 
1342 			if (txa_service_is_queue_added(txa, eth_dev,
1343 						       tx_queue_id)) {
1344 				*txa_inst_id = txa->id;
1345 				return 0;
1346 			}
1347 		}
1348 	}
1349 
1350 	return -EINVAL;
1351 }
1352 
1353 static inline int
1354 txa_sw_queue_start_state_set(uint16_t eth_dev_id, uint16_t tx_queue_id,
1355 			     bool start_state, struct txa_service_data *txa)
1356 {
1357 	struct txa_service_queue_info *tqi = NULL;
1358 
1359 	rte_spinlock_lock(&txa->tx_lock);
1360 	tqi = txa_service_queue(txa, eth_dev_id, tx_queue_id);
1361 	if (unlikely(tqi == NULL || !tqi->added)) {
1362 		rte_spinlock_unlock(&txa->tx_lock);
1363 		return -EINVAL;
1364 	}
1365 	if (start_state == false)
1366 		txa_txq_buffer_drain(tqi);
1367 
1368 	tqi->stopped = !start_state;
1369 	rte_spinlock_unlock(&txa->tx_lock);
1370 	return 0;
1371 }
1372 
1373 static int
1374 txa_queue_start_state_set(uint16_t eth_dev_id, uint16_t tx_queue_id,
1375 			  bool start_state)
1376 {
1377 	struct txa_service_data *txa;
1378 	uint8_t txa_inst_id;
1379 	int ret;
1380 	uint32_t caps = 0;
1381 
1382 	/* Below API already does validation of input parameters.
1383 	 * Hence skipping the validation here.
1384 	 */
1385 	ret = rte_event_eth_tx_adapter_instance_get(eth_dev_id,
1386 						    tx_queue_id,
1387 						    &txa_inst_id);
1388 	if (ret < 0)
1389 		return -EINVAL;
1390 
1391 	txa = txa_service_id_to_data(txa_inst_id);
1392 	ret = rte_event_eth_tx_adapter_caps_get(txa->eventdev_id,
1393 						eth_dev_id,
1394 						&caps);
1395 	if (ret < 0)
1396 		return -EINVAL;
1397 
1398 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT) {
1399 		if (start_state == true) {
1400 			ret = txa_dev_queue_start(txa_inst_id) ?
1401 			      txa_dev_queue_start(txa_inst_id)(txa_inst_id,
1402 							       eth_dev_id,
1403 							       tx_queue_id) : 0;
1404 		} else {
1405 			ret = txa_dev_queue_stop(txa_inst_id) ?
1406 			      txa_dev_queue_stop(txa_inst_id)(txa_inst_id,
1407 							      eth_dev_id,
1408 							      tx_queue_id) : 0;
1409 		}
1410 		return ret;
1411 	}
1412 
1413 	return txa_sw_queue_start_state_set(eth_dev_id, tx_queue_id,
1414 					    start_state, txa);
1415 }
1416 
1417 int
1418 rte_event_eth_tx_adapter_queue_start(uint16_t eth_dev_id, uint16_t tx_queue_id)
1419 {
1420 	return txa_queue_start_state_set(eth_dev_id, tx_queue_id, true);
1421 }
1422 
1423 int
1424 rte_event_eth_tx_adapter_queue_stop(uint16_t eth_dev_id, uint16_t tx_queue_id)
1425 {
1426 	return txa_queue_start_state_set(eth_dev_id, tx_queue_id, false);
1427 }
1428