xref: /dpdk/lib/eventdev/rte_event_eth_tx_adapter.c (revision f9dfb59edbccae50e7c5508348aa2b4b84413048)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation.
3  */
4 #include <rte_spinlock.h>
5 #include <rte_service_component.h>
6 #include <ethdev_driver.h>
7 
8 #include "eventdev_pmd.h"
9 #include "eventdev_trace.h"
10 #include "rte_event_eth_tx_adapter.h"
11 
12 #define TXA_BATCH_SIZE		32
13 #define TXA_SERVICE_NAME_LEN	32
14 #define TXA_MEM_NAME_LEN	32
15 #define TXA_FLUSH_THRESHOLD	1024
16 #define TXA_RETRY_CNT		100
17 #define TXA_MAX_NB_TX		128
18 #define TXA_INVALID_DEV_ID	INT32_C(-1)
19 #define TXA_INVALID_SERVICE_ID	INT64_C(-1)
20 
21 #define TXA_ADAPTER_ARRAY "txa_adapter_array"
22 #define TXA_SERVICE_DATA_ARRAY "txa_service_data_array"
23 
24 #define txa_evdev(id) (&rte_eventdevs[txa_dev_id_array[(id)]])
25 
26 #define txa_dev_caps_get(id) txa_evdev((id))->dev_ops->eth_tx_adapter_caps_get
27 
28 #define txa_dev_adapter_create(t) txa_evdev(t)->dev_ops->eth_tx_adapter_create
29 
30 #define txa_dev_adapter_create_ext(t) \
31 				txa_evdev(t)->dev_ops->eth_tx_adapter_create
32 
33 #define txa_dev_adapter_free(t) txa_evdev(t)->dev_ops->eth_tx_adapter_free
34 
35 #define txa_dev_queue_add(id) txa_evdev(id)->dev_ops->eth_tx_adapter_queue_add
36 
37 #define txa_dev_queue_del(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_del
38 
39 #define txa_dev_start(t) txa_evdev(t)->dev_ops->eth_tx_adapter_start
40 
41 #define txa_dev_stop(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stop
42 
43 #define txa_dev_stats_reset(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_reset
44 
45 #define txa_dev_stats_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_get
46 
47 #define txa_dev_instance_get(id) \
48 			txa_evdev(id)->dev_ops->eth_tx_adapter_instance_get
49 
50 #define txa_dev_queue_start(id) \
51 			txa_evdev(id)->dev_ops->eth_tx_adapter_queue_start
52 
53 #define txa_dev_queue_stop(id) \
54 			txa_evdev(id)->dev_ops->eth_tx_adapter_queue_stop
55 
56 #define RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) \
57 do { \
58 	if (!txa_valid_id(id)) { \
59 		RTE_EDEV_LOG_ERR("Invalid eth Tx adapter id = %d", id); \
60 		return retval; \
61 	} \
62 } while (0)
63 
64 #define TXA_CHECK_OR_ERR_RET(id) \
65 do {\
66 	int ret; \
67 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET((id), -EINVAL); \
68 	ret = txa_init(); \
69 	if (ret != 0) \
70 		return ret; \
71 	if (!txa_adapter_exist((id))) \
72 		return -EINVAL; \
73 } while (0)
74 
75 #define TXA_CHECK_TXQ(dev, queue) \
76 do {\
77 	if ((dev)->data->nb_tx_queues == 0) { \
78 		RTE_EDEV_LOG_ERR("No tx queues configured"); \
79 		return -EINVAL; \
80 	} \
81 	if ((queue) != -1 && \
82 		(uint16_t)(queue) >= (dev)->data->nb_tx_queues) { \
83 		RTE_EDEV_LOG_ERR("Invalid tx queue_id %" PRIu16, \
84 				(uint16_t)(queue)); \
85 		return -EINVAL; \
86 	} \
87 } while (0)
88 
89 /* Tx retry callback structure */
90 struct txa_retry {
91 	/* Ethernet port id */
92 	uint16_t port_id;
93 	/* Tx queue */
94 	uint16_t tx_queue;
95 	/* Adapter ID */
96 	uint8_t id;
97 };
98 
99 /* Per queue structure */
100 struct txa_service_queue_info {
101 	/* Queue has been added */
102 	uint8_t added;
103 	/* Queue is stopped */
104 	bool stopped;
105 	/* Retry callback argument */
106 	struct txa_retry txa_retry;
107 	/* Tx buffer */
108 	struct rte_eth_dev_tx_buffer *tx_buf;
109 };
110 
111 /* PMD private structure */
112 struct txa_service_data {
113 	/* Max mbufs processed in any service function invocation */
114 	uint32_t max_nb_tx;
115 	/* Number of Tx queues in adapter */
116 	uint32_t nb_queues;
117 	/*  Synchronization with data path */
118 	rte_spinlock_t tx_lock;
119 	/* Event port ID */
120 	uint8_t port_id;
121 	/* Event device identifier */
122 	uint8_t eventdev_id;
123 	/* Highest port id supported + 1 */
124 	uint16_t dev_count;
125 	/* Loop count to flush Tx buffers */
126 	int loop_cnt;
127 	/* Per ethernet device structure */
128 	struct txa_service_ethdev *txa_ethdev;
129 	/* Statistics */
130 	struct rte_event_eth_tx_adapter_stats stats;
131 	/* Adapter Identifier */
132 	uint8_t id;
133 	/* Conf arg must be freed */
134 	uint8_t conf_free;
135 	/* Configuration callback */
136 	rte_event_eth_tx_adapter_conf_cb conf_cb;
137 	/* Configuration callback argument */
138 	void *conf_arg;
139 	/* socket id */
140 	int socket_id;
141 	/* Per adapter EAL service */
142 	int64_t service_id;
143 	/* Memory allocation name */
144 	char mem_name[TXA_MEM_NAME_LEN];
145 } __rte_cache_aligned;
146 
147 /* Per eth device structure */
148 struct txa_service_ethdev {
149 	/* Pointer to ethernet device */
150 	struct rte_eth_dev *dev;
151 	/* Number of queues added */
152 	uint16_t nb_queues;
153 	/* PMD specific queue data */
154 	void *queues;
155 };
156 
157 /* Array of adapter instances, initialized with event device id
158  * when adapter is created
159  */
160 static int *txa_dev_id_array;
161 
162 /* Array of pointers to service implementation data */
163 static struct txa_service_data **txa_service_data_array;
164 
165 static int32_t txa_service_func(void *args);
166 static int txa_service_adapter_create_ext(uint8_t id,
167 			struct rte_eventdev *dev,
168 			rte_event_eth_tx_adapter_conf_cb conf_cb,
169 			void *conf_arg);
170 static int txa_service_queue_del(uint8_t id,
171 				const struct rte_eth_dev *dev,
172 				int32_t tx_queue_id);
173 
174 static int
175 txa_adapter_exist(uint8_t id)
176 {
177 	return txa_dev_id_array[id] != TXA_INVALID_DEV_ID;
178 }
179 
180 static inline int
181 txa_valid_id(uint8_t id)
182 {
183 	return id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
184 }
185 
186 static void *
187 txa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
188 {
189 	const struct rte_memzone *mz;
190 	unsigned int sz;
191 
192 	sz = elt_size * nb_elems;
193 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
194 
195 	mz = rte_memzone_lookup(name);
196 	if (mz == NULL) {
197 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
198 						 RTE_CACHE_LINE_SIZE);
199 		if (mz == NULL) {
200 			RTE_EDEV_LOG_ERR("failed to reserve memzone"
201 					" name = %s err = %"
202 					PRId32, name, rte_errno);
203 			return NULL;
204 		}
205 	}
206 
207 	return  mz->addr;
208 }
209 
210 static int
211 txa_lookup(void)
212 {
213 	const struct rte_memzone *mz;
214 
215 	if (txa_dev_id_array == NULL) {
216 		mz = rte_memzone_lookup(TXA_ADAPTER_ARRAY);
217 		if (mz == NULL)
218 			return -ENOMEM;
219 		txa_dev_id_array = mz->addr;
220 	}
221 
222 	if (txa_service_data_array == NULL) {
223 		mz = rte_memzone_lookup(TXA_SERVICE_DATA_ARRAY);
224 		if (mz == NULL)
225 			return -ENOMEM;
226 		txa_service_data_array = mz->addr;
227 	}
228 
229 	return 0;
230 }
231 
232 static int
233 txa_dev_id_array_init(void)
234 {
235 	if (txa_dev_id_array == NULL) {
236 		int i;
237 
238 		txa_dev_id_array = txa_memzone_array_get(TXA_ADAPTER_ARRAY,
239 					sizeof(int),
240 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
241 		if (txa_dev_id_array == NULL)
242 			return -ENOMEM;
243 
244 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
245 			txa_dev_id_array[i] = TXA_INVALID_DEV_ID;
246 	}
247 
248 	return 0;
249 }
250 
251 static int
252 txa_init(void)
253 {
254 	return txa_dev_id_array_init();
255 }
256 
257 static int
258 txa_service_data_init(void)
259 {
260 	if (txa_service_data_array == NULL) {
261 		int i;
262 
263 		txa_service_data_array =
264 				txa_memzone_array_get(TXA_SERVICE_DATA_ARRAY,
265 					sizeof(*txa_service_data_array),
266 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
267 		if (txa_service_data_array == NULL)
268 			return -ENOMEM;
269 
270 		/* Reset the txa service pointers */
271 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
272 			txa_service_data_array[i] = NULL;
273 	}
274 
275 	return 0;
276 }
277 
278 static inline struct txa_service_data *
279 txa_service_id_to_data(uint8_t id)
280 {
281 	return txa_service_data_array[id];
282 }
283 
284 static inline struct txa_service_queue_info *
285 txa_service_queue(struct txa_service_data *txa, uint16_t port_id,
286 		uint16_t tx_queue_id)
287 {
288 	struct txa_service_queue_info *tqi;
289 
290 	if (unlikely(txa->txa_ethdev == NULL || txa->dev_count < port_id + 1))
291 		return NULL;
292 
293 	tqi = txa->txa_ethdev[port_id].queues;
294 
295 	return likely(tqi != NULL) ? tqi + tx_queue_id : NULL;
296 }
297 
298 static int
299 txa_service_conf_cb(uint8_t __rte_unused id, uint8_t dev_id,
300 		struct rte_event_eth_tx_adapter_conf *conf, void *arg)
301 {
302 	int ret;
303 	struct rte_eventdev *dev;
304 	struct rte_event_port_conf *pc;
305 	struct rte_event_dev_config dev_conf;
306 	int started;
307 	uint8_t port_id;
308 
309 	pc = arg;
310 	dev = &rte_eventdevs[dev_id];
311 	dev_conf = dev->data->dev_conf;
312 
313 	started = dev->data->dev_started;
314 	if (started)
315 		rte_event_dev_stop(dev_id);
316 
317 	port_id = dev_conf.nb_event_ports;
318 	dev_conf.nb_event_ports += 1;
319 
320 	ret = rte_event_dev_configure(dev_id, &dev_conf);
321 	if (ret) {
322 		RTE_EDEV_LOG_ERR("failed to configure event dev %u",
323 						dev_id);
324 		if (started) {
325 			if (rte_event_dev_start(dev_id))
326 				return -EIO;
327 		}
328 		return ret;
329 	}
330 
331 	ret = rte_event_port_setup(dev_id, port_id, pc);
332 	if (ret) {
333 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
334 					port_id);
335 		if (started) {
336 			if (rte_event_dev_start(dev_id))
337 				return -EIO;
338 		}
339 		return ret;
340 	}
341 
342 	conf->event_port_id = port_id;
343 	conf->max_nb_tx = TXA_MAX_NB_TX;
344 	if (started)
345 		ret = rte_event_dev_start(dev_id);
346 	return ret;
347 }
348 
349 static int
350 txa_service_ethdev_alloc(struct txa_service_data *txa)
351 {
352 	struct txa_service_ethdev *txa_ethdev;
353 	uint16_t i, dev_count;
354 
355 	dev_count = rte_eth_dev_count_avail();
356 	if (txa->txa_ethdev && dev_count == txa->dev_count)
357 		return 0;
358 
359 	txa_ethdev = rte_zmalloc_socket(txa->mem_name,
360 					dev_count * sizeof(*txa_ethdev),
361 					0,
362 					txa->socket_id);
363 	if (txa_ethdev == NULL) {
364 		RTE_EDEV_LOG_ERR("Failed to alloc txa::txa_ethdev ");
365 		return -ENOMEM;
366 	}
367 
368 	if (txa->dev_count)
369 		memcpy(txa_ethdev, txa->txa_ethdev,
370 			txa->dev_count * sizeof(*txa_ethdev));
371 
372 	RTE_ETH_FOREACH_DEV(i) {
373 		if (i == dev_count)
374 			break;
375 		txa_ethdev[i].dev = &rte_eth_devices[i];
376 	}
377 
378 	txa->txa_ethdev = txa_ethdev;
379 	txa->dev_count = dev_count;
380 	return 0;
381 }
382 
383 static int
384 txa_service_queue_array_alloc(struct txa_service_data *txa,
385 			uint16_t port_id)
386 {
387 	struct txa_service_queue_info *tqi;
388 	uint16_t nb_queue;
389 	int ret;
390 
391 	ret = txa_service_ethdev_alloc(txa);
392 	if (ret != 0)
393 		return ret;
394 
395 	if (txa->txa_ethdev[port_id].queues)
396 		return 0;
397 
398 	nb_queue = txa->txa_ethdev[port_id].dev->data->nb_tx_queues;
399 	tqi = rte_zmalloc_socket(txa->mem_name,
400 				nb_queue *
401 				sizeof(struct txa_service_queue_info), 0,
402 				txa->socket_id);
403 	if (tqi == NULL)
404 		return -ENOMEM;
405 	txa->txa_ethdev[port_id].queues = tqi;
406 	return 0;
407 }
408 
409 static void
410 txa_service_queue_array_free(struct txa_service_data *txa,
411 			uint16_t port_id)
412 {
413 	struct txa_service_ethdev *txa_ethdev;
414 	struct txa_service_queue_info *tqi;
415 
416 	txa_ethdev = &txa->txa_ethdev[port_id];
417 	if (txa->txa_ethdev == NULL || txa_ethdev->nb_queues != 0)
418 		return;
419 
420 	tqi = txa_ethdev->queues;
421 	txa_ethdev->queues = NULL;
422 	rte_free(tqi);
423 
424 	if (txa->nb_queues == 0) {
425 		rte_free(txa->txa_ethdev);
426 		txa->txa_ethdev = NULL;
427 	}
428 }
429 
430 static void
431 txa_service_unregister(struct txa_service_data *txa)
432 {
433 	if (txa->service_id != TXA_INVALID_SERVICE_ID) {
434 		rte_service_component_runstate_set(txa->service_id, 0);
435 		while (rte_service_may_be_active(txa->service_id))
436 			rte_pause();
437 		rte_service_component_unregister(txa->service_id);
438 	}
439 	txa->service_id = TXA_INVALID_SERVICE_ID;
440 }
441 
442 static int
443 txa_service_register(struct txa_service_data *txa)
444 {
445 	int ret;
446 	struct rte_service_spec service;
447 	struct rte_event_eth_tx_adapter_conf conf;
448 
449 	if (txa->service_id != TXA_INVALID_SERVICE_ID)
450 		return 0;
451 
452 	memset(&service, 0, sizeof(service));
453 	snprintf(service.name, TXA_SERVICE_NAME_LEN, "txa_%d", txa->id);
454 	service.socket_id = txa->socket_id;
455 	service.callback = txa_service_func;
456 	service.callback_userdata = txa;
457 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
458 	ret = rte_service_component_register(&service,
459 					(uint32_t *)&txa->service_id);
460 	if (ret) {
461 		RTE_EDEV_LOG_ERR("failed to register service %s err = %"
462 				 PRId32, service.name, ret);
463 		return ret;
464 	}
465 
466 	ret = txa->conf_cb(txa->id, txa->eventdev_id, &conf, txa->conf_arg);
467 	if (ret) {
468 		txa_service_unregister(txa);
469 		return ret;
470 	}
471 
472 	rte_service_component_runstate_set(txa->service_id, 1);
473 	txa->port_id = conf.event_port_id;
474 	txa->max_nb_tx = conf.max_nb_tx;
475 	return 0;
476 }
477 
478 static struct rte_eth_dev_tx_buffer *
479 txa_service_tx_buf_alloc(struct txa_service_data *txa,
480 			const struct rte_eth_dev *dev)
481 {
482 	struct rte_eth_dev_tx_buffer *tb;
483 	uint16_t port_id;
484 
485 	port_id = dev->data->port_id;
486 	tb = rte_zmalloc_socket(txa->mem_name,
487 				RTE_ETH_TX_BUFFER_SIZE(TXA_BATCH_SIZE),
488 				0,
489 				rte_eth_dev_socket_id(port_id));
490 	if (tb == NULL)
491 		RTE_EDEV_LOG_ERR("Failed to allocate memory for tx buffer");
492 	return tb;
493 }
494 
495 static int
496 txa_service_is_queue_added(struct txa_service_data *txa,
497 			const struct rte_eth_dev *dev,
498 			uint16_t tx_queue_id)
499 {
500 	struct txa_service_queue_info *tqi;
501 
502 	tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
503 	return tqi && tqi->added;
504 }
505 
506 static int
507 txa_service_ctrl(uint8_t id, int start)
508 {
509 	int ret;
510 	struct txa_service_data *txa;
511 
512 	txa = txa_service_id_to_data(id);
513 	if (txa == NULL || txa->service_id == TXA_INVALID_SERVICE_ID)
514 		return 0;
515 
516 	rte_spinlock_lock(&txa->tx_lock);
517 	ret = rte_service_runstate_set(txa->service_id, start);
518 	rte_spinlock_unlock(&txa->tx_lock);
519 
520 	return ret;
521 }
522 
523 static void
524 txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
525 			void *userdata)
526 {
527 	struct txa_retry *tr;
528 	struct txa_service_data *data;
529 	struct rte_event_eth_tx_adapter_stats *stats;
530 	uint16_t sent = 0;
531 	unsigned int retry = 0;
532 	uint16_t i, n;
533 
534 	tr = (struct txa_retry *)(uintptr_t)userdata;
535 	data = txa_service_id_to_data(tr->id);
536 	stats = &data->stats;
537 
538 	do {
539 		n = rte_eth_tx_burst(tr->port_id, tr->tx_queue,
540 			       &pkts[sent], unsent - sent);
541 
542 		sent += n;
543 	} while (sent != unsent && retry++ < TXA_RETRY_CNT);
544 
545 	for (i = sent; i < unsent; i++)
546 		rte_pktmbuf_free(pkts[i]);
547 
548 	stats->tx_retry += retry;
549 	stats->tx_packets += sent;
550 	stats->tx_dropped += unsent - sent;
551 }
552 
553 static uint16_t
554 txa_process_event_vector(struct txa_service_data *txa,
555 			 struct rte_event_vector *vec)
556 {
557 	struct txa_service_queue_info *tqi;
558 	uint16_t port, queue, nb_tx = 0;
559 	struct rte_mbuf **mbufs;
560 	int i;
561 
562 	mbufs = (struct rte_mbuf **)vec->mbufs;
563 	if (vec->attr_valid) {
564 		port = vec->port;
565 		queue = vec->queue;
566 		tqi = txa_service_queue(txa, port, queue);
567 		if (unlikely(tqi == NULL || !tqi->added || tqi->stopped)) {
568 			rte_pktmbuf_free_bulk(&mbufs[vec->elem_offset],
569 					      vec->nb_elem);
570 			rte_mempool_put(rte_mempool_from_obj(vec), vec);
571 			return 0;
572 		}
573 		for (i = 0; i < vec->nb_elem; i++) {
574 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
575 						   mbufs[i + vec->elem_offset]);
576 		}
577 	} else {
578 		for (i = vec->elem_offset; i < vec->elem_offset + vec->nb_elem;
579 		     i++) {
580 			port = mbufs[i]->port;
581 			queue = rte_event_eth_tx_adapter_txq_get(mbufs[i]);
582 			tqi = txa_service_queue(txa, port, queue);
583 			if (unlikely(tqi == NULL || !tqi->added ||
584 				     tqi->stopped)) {
585 				rte_pktmbuf_free(mbufs[i]);
586 				continue;
587 			}
588 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
589 						   mbufs[i]);
590 		}
591 	}
592 	rte_mempool_put(rte_mempool_from_obj(vec), vec);
593 
594 	return nb_tx;
595 }
596 
597 static void
598 txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
599 	uint32_t n)
600 {
601 	uint32_t i;
602 	uint16_t nb_tx;
603 	struct rte_event_eth_tx_adapter_stats *stats;
604 
605 	stats = &txa->stats;
606 
607 	nb_tx = 0;
608 	for (i = 0; i < n; i++) {
609 		uint16_t port;
610 		uint16_t queue;
611 		struct txa_service_queue_info *tqi;
612 
613 		if (!(ev[i].event_type & RTE_EVENT_TYPE_VECTOR)) {
614 			struct rte_mbuf *m;
615 
616 			m = ev[i].mbuf;
617 			port = m->port;
618 			queue = rte_event_eth_tx_adapter_txq_get(m);
619 
620 			tqi = txa_service_queue(txa, port, queue);
621 			if (unlikely(tqi == NULL || !tqi->added ||
622 				     tqi->stopped)) {
623 				rte_pktmbuf_free(m);
624 				continue;
625 			}
626 
627 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
628 		} else {
629 			nb_tx += txa_process_event_vector(txa, ev[i].vec);
630 		}
631 	}
632 
633 	stats->tx_packets += nb_tx;
634 }
635 
636 static int32_t
637 txa_service_func(void *args)
638 {
639 	struct txa_service_data *txa = args;
640 	uint8_t dev_id;
641 	uint8_t port;
642 	int ret = -EAGAIN;
643 	uint16_t n;
644 	uint32_t nb_tx, max_nb_tx;
645 	struct rte_event ev[TXA_BATCH_SIZE];
646 
647 	dev_id = txa->eventdev_id;
648 	max_nb_tx = txa->max_nb_tx;
649 	port = txa->port_id;
650 
651 	if (txa->nb_queues == 0)
652 		return ret;
653 
654 	if (!rte_spinlock_trylock(&txa->tx_lock))
655 		return ret;
656 
657 	for (nb_tx = 0; nb_tx < max_nb_tx; nb_tx += n) {
658 
659 		n = rte_event_dequeue_burst(dev_id, port, ev, RTE_DIM(ev), 0);
660 		if (!n)
661 			break;
662 		txa_service_tx(txa, ev, n);
663 		ret = 0;
664 	}
665 
666 	if ((txa->loop_cnt++ & (TXA_FLUSH_THRESHOLD - 1)) == 0) {
667 
668 		struct txa_service_ethdev *tdi;
669 		struct txa_service_queue_info *tqi;
670 		struct rte_eth_dev *dev;
671 		uint16_t i;
672 
673 		tdi = txa->txa_ethdev;
674 		nb_tx = 0;
675 
676 		RTE_ETH_FOREACH_DEV(i) {
677 			uint16_t q;
678 
679 			if (i == txa->dev_count)
680 				break;
681 
682 			dev = tdi[i].dev;
683 			if (tdi[i].nb_queues == 0)
684 				continue;
685 			for (q = 0; q < dev->data->nb_tx_queues; q++) {
686 
687 				tqi = txa_service_queue(txa, i, q);
688 				if (unlikely(tqi == NULL || !tqi->added ||
689 					     tqi->stopped))
690 					continue;
691 
692 				nb_tx += rte_eth_tx_buffer_flush(i, q,
693 							tqi->tx_buf);
694 			}
695 		}
696 
697 		if (likely(nb_tx > 0)) {
698 			txa->stats.tx_packets += nb_tx;
699 			ret = 0;
700 		}
701 	}
702 	rte_spinlock_unlock(&txa->tx_lock);
703 	return ret;
704 }
705 
706 static int
707 txa_service_adapter_create(uint8_t id, struct rte_eventdev *dev,
708 			struct rte_event_port_conf *port_conf)
709 {
710 	struct txa_service_data *txa;
711 	struct rte_event_port_conf *cb_conf;
712 	int ret;
713 
714 	cb_conf = rte_malloc(NULL, sizeof(*cb_conf), 0);
715 	if (cb_conf == NULL)
716 		return -ENOMEM;
717 
718 	*cb_conf = *port_conf;
719 	ret = txa_service_adapter_create_ext(id, dev, txa_service_conf_cb,
720 					cb_conf);
721 	if (ret) {
722 		rte_free(cb_conf);
723 		return ret;
724 	}
725 
726 	txa = txa_service_id_to_data(id);
727 	txa->conf_free = 1;
728 	return ret;
729 }
730 
731 static int
732 txa_service_adapter_create_ext(uint8_t id, struct rte_eventdev *dev,
733 			rte_event_eth_tx_adapter_conf_cb conf_cb,
734 			void *conf_arg)
735 {
736 	struct txa_service_data *txa;
737 	int socket_id;
738 	char mem_name[TXA_SERVICE_NAME_LEN];
739 	int ret;
740 
741 	if (conf_cb == NULL)
742 		return -EINVAL;
743 
744 	socket_id = dev->data->socket_id;
745 	snprintf(mem_name, TXA_MEM_NAME_LEN,
746 		"rte_event_eth_txa_%d",
747 		id);
748 
749 	ret = txa_service_data_init();
750 	if (ret != 0)
751 		return ret;
752 
753 	txa = rte_zmalloc_socket(mem_name,
754 				sizeof(*txa),
755 				RTE_CACHE_LINE_SIZE, socket_id);
756 	if (txa == NULL) {
757 		RTE_EDEV_LOG_ERR("failed to get mem for tx adapter");
758 		return -ENOMEM;
759 	}
760 
761 	txa->id = id;
762 	txa->eventdev_id = dev->data->dev_id;
763 	txa->socket_id = socket_id;
764 	strncpy(txa->mem_name, mem_name, TXA_SERVICE_NAME_LEN);
765 	txa->conf_cb = conf_cb;
766 	txa->conf_arg = conf_arg;
767 	txa->service_id = TXA_INVALID_SERVICE_ID;
768 	rte_spinlock_init(&txa->tx_lock);
769 	txa_service_data_array[id] = txa;
770 
771 	return 0;
772 }
773 
774 static int
775 txa_service_event_port_get(uint8_t id, uint8_t *port)
776 {
777 	struct txa_service_data *txa;
778 
779 	txa = txa_service_id_to_data(id);
780 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
781 		return -ENODEV;
782 
783 	*port = txa->port_id;
784 	return 0;
785 }
786 
787 static int
788 txa_service_adapter_free(uint8_t id)
789 {
790 	struct txa_service_data *txa;
791 
792 	txa = txa_service_id_to_data(id);
793 	if (txa->nb_queues) {
794 		RTE_EDEV_LOG_ERR("%" PRIu16 " Tx queues not deleted",
795 				txa->nb_queues);
796 		return -EBUSY;
797 	}
798 
799 	if (txa->conf_free)
800 		rte_free(txa->conf_arg);
801 	rte_free(txa);
802 	return 0;
803 }
804 
805 static int
806 txa_service_queue_add(uint8_t id,
807 		__rte_unused struct rte_eventdev *dev,
808 		const struct rte_eth_dev *eth_dev,
809 		int32_t tx_queue_id)
810 {
811 	struct txa_service_data *txa;
812 	struct txa_service_ethdev *tdi;
813 	struct txa_service_queue_info *tqi;
814 	struct rte_eth_dev_tx_buffer *tb;
815 	struct txa_retry *txa_retry;
816 	int ret = 0;
817 
818 	txa = txa_service_id_to_data(id);
819 
820 	if (tx_queue_id == -1) {
821 		int nb_queues;
822 		uint16_t i, j;
823 		uint16_t *qdone;
824 
825 		nb_queues = eth_dev->data->nb_tx_queues;
826 		if (txa->dev_count > eth_dev->data->port_id) {
827 			tdi = &txa->txa_ethdev[eth_dev->data->port_id];
828 			nb_queues -= tdi->nb_queues;
829 		}
830 
831 		qdone = rte_zmalloc(txa->mem_name,
832 				nb_queues * sizeof(*qdone), 0);
833 		if (qdone == NULL)
834 			return -ENOMEM;
835 		j = 0;
836 		for (i = 0; i < nb_queues; i++) {
837 			if (txa_service_is_queue_added(txa, eth_dev, i))
838 				continue;
839 			ret = txa_service_queue_add(id, dev, eth_dev, i);
840 			if (ret == 0)
841 				qdone[j++] = i;
842 			else
843 				break;
844 		}
845 
846 		if (i != nb_queues) {
847 			for (i = 0; i < j; i++)
848 				txa_service_queue_del(id, eth_dev, qdone[i]);
849 		}
850 		rte_free(qdone);
851 		return ret;
852 	}
853 
854 	ret = txa_service_register(txa);
855 	if (ret)
856 		return ret;
857 
858 	rte_spinlock_lock(&txa->tx_lock);
859 
860 	if (txa_service_is_queue_added(txa, eth_dev, tx_queue_id))
861 		goto ret_unlock;
862 
863 	ret = txa_service_queue_array_alloc(txa, eth_dev->data->port_id);
864 	if (ret)
865 		goto err_unlock;
866 
867 	tb = txa_service_tx_buf_alloc(txa, eth_dev);
868 	if (tb == NULL)
869 		goto err_unlock;
870 
871 	tdi = &txa->txa_ethdev[eth_dev->data->port_id];
872 	tqi = txa_service_queue(txa, eth_dev->data->port_id, tx_queue_id);
873 	if (tqi == NULL)
874 		goto err_unlock;
875 
876 	txa_retry = &tqi->txa_retry;
877 	txa_retry->id = txa->id;
878 	txa_retry->port_id = eth_dev->data->port_id;
879 	txa_retry->tx_queue = tx_queue_id;
880 
881 	rte_eth_tx_buffer_init(tb, TXA_BATCH_SIZE);
882 	rte_eth_tx_buffer_set_err_callback(tb,
883 		txa_service_buffer_retry, txa_retry);
884 
885 	tqi->tx_buf = tb;
886 	tqi->added = 1;
887 	tqi->stopped = false;
888 	tdi->nb_queues++;
889 	txa->nb_queues++;
890 
891 ret_unlock:
892 	rte_spinlock_unlock(&txa->tx_lock);
893 	return 0;
894 
895 err_unlock:
896 	if (txa->nb_queues == 0) {
897 		txa_service_queue_array_free(txa,
898 					eth_dev->data->port_id);
899 		txa_service_unregister(txa);
900 	}
901 
902 	rte_spinlock_unlock(&txa->tx_lock);
903 	return -1;
904 }
905 
906 static inline void
907 txa_txq_buffer_drain(struct txa_service_queue_info *tqi)
908 {
909 	struct rte_eth_dev_tx_buffer *b;
910 	uint16_t i;
911 
912 	b = tqi->tx_buf;
913 
914 	for (i = 0; i < b->length; i++)
915 		rte_pktmbuf_free(b->pkts[i]);
916 
917 	b->length = 0;
918 }
919 
920 static int
921 txa_service_queue_del(uint8_t id,
922 		const struct rte_eth_dev *dev,
923 		int32_t tx_queue_id)
924 {
925 	struct txa_service_data *txa;
926 	struct txa_service_queue_info *tqi;
927 	struct rte_eth_dev_tx_buffer *tb;
928 	uint16_t port_id;
929 
930 	txa = txa_service_id_to_data(id);
931 	port_id = dev->data->port_id;
932 
933 	if (tx_queue_id == -1) {
934 		uint16_t i, q, nb_queues;
935 		int ret = 0;
936 
937 		if (txa->txa_ethdev == NULL)
938 			return 0;
939 		nb_queues = txa->txa_ethdev[port_id].nb_queues;
940 		if (nb_queues == 0)
941 			return 0;
942 
943 		i = 0;
944 		q = 0;
945 		tqi = txa->txa_ethdev[port_id].queues;
946 
947 		while (i < nb_queues) {
948 
949 			if (tqi[q].added) {
950 				ret = txa_service_queue_del(id, dev, q);
951 				i++;
952 				if (ret != 0)
953 					break;
954 			}
955 			q++;
956 		}
957 		return ret;
958 	}
959 
960 	txa = txa_service_id_to_data(id);
961 
962 	rte_spinlock_lock(&txa->tx_lock);
963 	tqi = txa_service_queue(txa, port_id, tx_queue_id);
964 	if (tqi == NULL || !tqi->added)
965 		goto ret_unlock;
966 
967 	/* Drain the buffered mbufs */
968 	txa_txq_buffer_drain(tqi);
969 	tb = tqi->tx_buf;
970 	tqi->added = 0;
971 	tqi->tx_buf = NULL;
972 	rte_free(tb);
973 	txa->nb_queues--;
974 	txa->txa_ethdev[port_id].nb_queues--;
975 
976 	txa_service_queue_array_free(txa, port_id);
977 
978 ret_unlock:
979 	rte_spinlock_unlock(&txa->tx_lock);
980 	return 0;
981 }
982 
983 static int
984 txa_service_id_get(uint8_t id, uint32_t *service_id)
985 {
986 	struct txa_service_data *txa;
987 
988 	txa = txa_service_id_to_data(id);
989 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
990 		return -ESRCH;
991 
992 	if (service_id == NULL)
993 		return -EINVAL;
994 
995 	*service_id = txa->service_id;
996 	return 0;
997 }
998 
999 static int
1000 txa_service_start(uint8_t id)
1001 {
1002 	return txa_service_ctrl(id, 1);
1003 }
1004 
1005 static int
1006 txa_service_stats_get(uint8_t id,
1007 		struct rte_event_eth_tx_adapter_stats *stats)
1008 {
1009 	struct txa_service_data *txa;
1010 
1011 	txa = txa_service_id_to_data(id);
1012 	*stats = txa->stats;
1013 	return 0;
1014 }
1015 
1016 static int
1017 txa_service_stats_reset(uint8_t id)
1018 {
1019 	struct txa_service_data *txa;
1020 
1021 	txa = txa_service_id_to_data(id);
1022 	memset(&txa->stats, 0, sizeof(txa->stats));
1023 	return 0;
1024 }
1025 
1026 static int
1027 txa_service_stop(uint8_t id)
1028 {
1029 	return txa_service_ctrl(id, 0);
1030 }
1031 
1032 
1033 int
1034 rte_event_eth_tx_adapter_create(uint8_t id, uint8_t dev_id,
1035 				struct rte_event_port_conf *port_conf)
1036 {
1037 	struct rte_eventdev *dev;
1038 	int ret;
1039 
1040 	if (port_conf == NULL)
1041 		return -EINVAL;
1042 
1043 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1044 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1045 
1046 	dev = &rte_eventdevs[dev_id];
1047 
1048 	ret = txa_init();
1049 	if (ret != 0)
1050 		return ret;
1051 
1052 	if (txa_adapter_exist(id))
1053 		return -EEXIST;
1054 
1055 	txa_dev_id_array[id] = dev_id;
1056 	if (txa_dev_adapter_create(id))
1057 		ret = txa_dev_adapter_create(id)(id, dev);
1058 
1059 	if (ret != 0) {
1060 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1061 		return ret;
1062 	}
1063 
1064 	ret = txa_service_adapter_create(id, dev, port_conf);
1065 	if (ret != 0) {
1066 		if (txa_dev_adapter_free(id))
1067 			txa_dev_adapter_free(id)(id, dev);
1068 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1069 		return ret;
1070 	}
1071 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, NULL, port_conf,
1072 		ret);
1073 	txa_dev_id_array[id] = dev_id;
1074 	return 0;
1075 }
1076 
1077 int
1078 rte_event_eth_tx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1079 				rte_event_eth_tx_adapter_conf_cb conf_cb,
1080 				void *conf_arg)
1081 {
1082 	struct rte_eventdev *dev;
1083 	int ret;
1084 
1085 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1086 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1087 
1088 	ret = txa_init();
1089 	if (ret != 0)
1090 		return ret;
1091 
1092 	if (txa_adapter_exist(id))
1093 		return -EINVAL;
1094 
1095 	dev = &rte_eventdevs[dev_id];
1096 
1097 	txa_dev_id_array[id] = dev_id;
1098 	if (txa_dev_adapter_create_ext(id))
1099 		ret = txa_dev_adapter_create_ext(id)(id, dev);
1100 
1101 	if (ret != 0) {
1102 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1103 		return ret;
1104 	}
1105 
1106 	ret = txa_service_adapter_create_ext(id, dev, conf_cb, conf_arg);
1107 	if (ret != 0) {
1108 		if (txa_dev_adapter_free(id))
1109 			txa_dev_adapter_free(id)(id, dev);
1110 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1111 		return ret;
1112 	}
1113 
1114 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, conf_cb, conf_arg,
1115 		ret);
1116 	txa_dev_id_array[id] = dev_id;
1117 	return 0;
1118 }
1119 
1120 
1121 int
1122 rte_event_eth_tx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
1123 {
1124 	TXA_CHECK_OR_ERR_RET(id);
1125 
1126 	return txa_service_event_port_get(id, event_port_id);
1127 }
1128 
1129 int
1130 rte_event_eth_tx_adapter_free(uint8_t id)
1131 {
1132 	int ret;
1133 
1134 	TXA_CHECK_OR_ERR_RET(id);
1135 
1136 	ret = txa_dev_adapter_free(id) ?
1137 		txa_dev_adapter_free(id)(id, txa_evdev(id)) :
1138 		0;
1139 
1140 	if (ret == 0)
1141 		ret = txa_service_adapter_free(id);
1142 	txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1143 
1144 	rte_eventdev_trace_eth_tx_adapter_free(id, ret);
1145 	return ret;
1146 }
1147 
1148 int
1149 rte_event_eth_tx_adapter_queue_add(uint8_t id,
1150 				uint16_t eth_dev_id,
1151 				int32_t queue)
1152 {
1153 	struct rte_eth_dev *eth_dev;
1154 	int ret;
1155 	uint32_t caps;
1156 
1157 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1158 	TXA_CHECK_OR_ERR_RET(id);
1159 
1160 	eth_dev = &rte_eth_devices[eth_dev_id];
1161 	TXA_CHECK_TXQ(eth_dev, queue);
1162 
1163 	caps = 0;
1164 	if (txa_dev_caps_get(id))
1165 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1166 
1167 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1168 		ret =  txa_dev_queue_add(id) ?
1169 					txa_dev_queue_add(id)(id,
1170 							txa_evdev(id),
1171 							eth_dev,
1172 							queue) : 0;
1173 	else
1174 		ret = txa_service_queue_add(id, txa_evdev(id), eth_dev, queue);
1175 
1176 	rte_eventdev_trace_eth_tx_adapter_queue_add(id, eth_dev_id, queue,
1177 		ret);
1178 	return ret;
1179 }
1180 
1181 int
1182 rte_event_eth_tx_adapter_queue_del(uint8_t id,
1183 				uint16_t eth_dev_id,
1184 				int32_t queue)
1185 {
1186 	struct rte_eth_dev *eth_dev;
1187 	int ret;
1188 	uint32_t caps;
1189 
1190 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1191 	TXA_CHECK_OR_ERR_RET(id);
1192 
1193 	eth_dev = &rte_eth_devices[eth_dev_id];
1194 
1195 	caps = 0;
1196 
1197 	if (txa_dev_caps_get(id))
1198 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1199 
1200 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1201 		ret =  txa_dev_queue_del(id) ?
1202 					txa_dev_queue_del(id)(id, txa_evdev(id),
1203 							eth_dev,
1204 							queue) : 0;
1205 	else
1206 		ret = txa_service_queue_del(id, eth_dev, queue);
1207 
1208 	rte_eventdev_trace_eth_tx_adapter_queue_del(id, eth_dev_id, queue,
1209 		ret);
1210 	return ret;
1211 }
1212 
1213 int
1214 rte_event_eth_tx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1215 {
1216 	TXA_CHECK_OR_ERR_RET(id);
1217 
1218 	return txa_service_id_get(id, service_id);
1219 }
1220 
1221 int
1222 rte_event_eth_tx_adapter_start(uint8_t id)
1223 {
1224 	int ret;
1225 
1226 	TXA_CHECK_OR_ERR_RET(id);
1227 
1228 	ret = txa_dev_start(id) ? txa_dev_start(id)(id, txa_evdev(id)) : 0;
1229 	if (ret == 0)
1230 		ret = txa_service_start(id);
1231 	rte_eventdev_trace_eth_tx_adapter_start(id, ret);
1232 	return ret;
1233 }
1234 
1235 int
1236 rte_event_eth_tx_adapter_stats_get(uint8_t id,
1237 				struct rte_event_eth_tx_adapter_stats *stats)
1238 {
1239 	int ret;
1240 
1241 	TXA_CHECK_OR_ERR_RET(id);
1242 
1243 	if (stats == NULL)
1244 		return -EINVAL;
1245 
1246 	*stats = (struct rte_event_eth_tx_adapter_stats){0};
1247 
1248 	ret = txa_dev_stats_get(id) ?
1249 			txa_dev_stats_get(id)(id, txa_evdev(id), stats) : 0;
1250 
1251 	if (ret == 0 && txa_service_id_get(id, NULL) != ESRCH) {
1252 		if (txa_dev_stats_get(id)) {
1253 			struct rte_event_eth_tx_adapter_stats service_stats;
1254 
1255 			ret = txa_service_stats_get(id, &service_stats);
1256 			if (ret == 0) {
1257 				stats->tx_retry += service_stats.tx_retry;
1258 				stats->tx_packets += service_stats.tx_packets;
1259 				stats->tx_dropped += service_stats.tx_dropped;
1260 			}
1261 		} else
1262 			ret = txa_service_stats_get(id, stats);
1263 	}
1264 
1265 	return ret;
1266 }
1267 
1268 int
1269 rte_event_eth_tx_adapter_stats_reset(uint8_t id)
1270 {
1271 	int ret;
1272 
1273 	TXA_CHECK_OR_ERR_RET(id);
1274 
1275 	ret = txa_dev_stats_reset(id) ?
1276 		txa_dev_stats_reset(id)(id, txa_evdev(id)) : 0;
1277 	if (ret == 0)
1278 		ret = txa_service_stats_reset(id);
1279 	return ret;
1280 }
1281 
1282 int
1283 rte_event_eth_tx_adapter_stop(uint8_t id)
1284 {
1285 	int ret;
1286 
1287 	TXA_CHECK_OR_ERR_RET(id);
1288 
1289 	ret = txa_dev_stop(id) ? txa_dev_stop(id)(id,  txa_evdev(id)) : 0;
1290 	if (ret == 0)
1291 		ret = txa_service_stop(id);
1292 	rte_eventdev_trace_eth_tx_adapter_stop(id, ret);
1293 	return ret;
1294 }
1295 
1296 int
1297 rte_event_eth_tx_adapter_instance_get(uint16_t eth_dev_id,
1298 				      uint16_t tx_queue_id,
1299 				      uint8_t *txa_inst_id)
1300 {
1301 	uint8_t id;
1302 	int ret = -EINVAL;
1303 	uint32_t caps;
1304 	struct txa_service_data *txa;
1305 
1306 	if (txa_lookup())
1307 		return -ENOMEM;
1308 
1309 	if (eth_dev_id >= rte_eth_dev_count_avail()) {
1310 		RTE_EDEV_LOG_ERR("Invalid ethernet port id %u", eth_dev_id);
1311 		return -EINVAL;
1312 	}
1313 
1314 	if (tx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_tx_queues) {
1315 		RTE_EDEV_LOG_ERR("Invalid tx queue id %u", tx_queue_id);
1316 		return -EINVAL;
1317 	}
1318 
1319 	if (txa_inst_id == NULL) {
1320 		RTE_EDEV_LOG_ERR("txa_instance_id cannot be NULL");
1321 		return -EINVAL;
1322 	}
1323 
1324 	/* Iterate through all Tx adapter instances */
1325 	for (id = 0; id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; id++) {
1326 		txa = txa_service_id_to_data(id);
1327 		if (!txa)
1328 			continue;
1329 
1330 		caps = 0;
1331 		if (rte_event_eth_tx_adapter_caps_get(txa->eventdev_id,
1332 						      eth_dev_id,
1333 						      &caps))
1334 			continue;
1335 
1336 		if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT) {
1337 			ret = txa_dev_instance_get(id) ?
1338 					txa_dev_instance_get(id)(eth_dev_id,
1339 								 tx_queue_id,
1340 								 txa_inst_id)
1341 							: -EINVAL;
1342 			if (ret == 0)
1343 				return ret;
1344 		} else {
1345 			struct rte_eth_dev *eth_dev;
1346 
1347 			eth_dev = &rte_eth_devices[eth_dev_id];
1348 
1349 			if (txa_service_is_queue_added(txa, eth_dev,
1350 						       tx_queue_id)) {
1351 				*txa_inst_id = txa->id;
1352 				return 0;
1353 			}
1354 		}
1355 	}
1356 
1357 	return -EINVAL;
1358 }
1359 
1360 static inline int
1361 txa_sw_queue_start_state_set(uint16_t eth_dev_id, uint16_t tx_queue_id,
1362 			     bool start_state, struct txa_service_data *txa)
1363 {
1364 	struct txa_service_queue_info *tqi = NULL;
1365 
1366 	rte_spinlock_lock(&txa->tx_lock);
1367 	tqi = txa_service_queue(txa, eth_dev_id, tx_queue_id);
1368 	if (unlikely(tqi == NULL || !tqi->added)) {
1369 		rte_spinlock_unlock(&txa->tx_lock);
1370 		return -EINVAL;
1371 	}
1372 	if (start_state == false)
1373 		txa_txq_buffer_drain(tqi);
1374 
1375 	tqi->stopped = !start_state;
1376 	rte_spinlock_unlock(&txa->tx_lock);
1377 	return 0;
1378 }
1379 
1380 static int
1381 txa_queue_start_state_set(uint16_t eth_dev_id, uint16_t tx_queue_id,
1382 			  bool start_state)
1383 {
1384 	struct txa_service_data *txa;
1385 	uint8_t txa_inst_id;
1386 	int ret;
1387 	uint32_t caps = 0;
1388 
1389 	/* Below API already does validation of input parameters.
1390 	 * Hence skipping the validation here.
1391 	 */
1392 	ret = rte_event_eth_tx_adapter_instance_get(eth_dev_id,
1393 						    tx_queue_id,
1394 						    &txa_inst_id);
1395 	if (ret < 0)
1396 		return -EINVAL;
1397 
1398 	txa = txa_service_id_to_data(txa_inst_id);
1399 	ret = rte_event_eth_tx_adapter_caps_get(txa->eventdev_id,
1400 						eth_dev_id,
1401 						&caps);
1402 	if (ret < 0)
1403 		return -EINVAL;
1404 
1405 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT) {
1406 		if (start_state == true) {
1407 			ret = txa_dev_queue_start(txa_inst_id) ?
1408 			      txa_dev_queue_start(txa_inst_id)(txa_inst_id,
1409 							       eth_dev_id,
1410 							       tx_queue_id) : 0;
1411 		} else {
1412 			ret = txa_dev_queue_stop(txa_inst_id) ?
1413 			      txa_dev_queue_stop(txa_inst_id)(txa_inst_id,
1414 							      eth_dev_id,
1415 							      tx_queue_id) : 0;
1416 		}
1417 		return ret;
1418 	}
1419 
1420 	return txa_sw_queue_start_state_set(eth_dev_id, tx_queue_id,
1421 					    start_state, txa);
1422 }
1423 
1424 int
1425 rte_event_eth_tx_adapter_queue_start(uint16_t eth_dev_id, uint16_t tx_queue_id)
1426 {
1427 	return txa_queue_start_state_set(eth_dev_id, tx_queue_id, true);
1428 }
1429 
1430 int
1431 rte_event_eth_tx_adapter_queue_stop(uint16_t eth_dev_id, uint16_t tx_queue_id)
1432 {
1433 	return txa_queue_start_state_set(eth_dev_id, tx_queue_id, false);
1434 }
1435