xref: /dpdk/lib/eventdev/rte_event_eth_tx_adapter.c (revision 87d396163c005deb8d9f72ec0977f19e5edd8f47)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation.
3  */
4 #include <rte_spinlock.h>
5 #include <rte_service_component.h>
6 #include <ethdev_driver.h>
7 
8 #include "eventdev_pmd.h"
9 #include "eventdev_trace.h"
10 #include "rte_event_eth_tx_adapter.h"
11 
12 #define TXA_BATCH_SIZE		32
13 #define TXA_SERVICE_NAME_LEN	32
14 #define TXA_MEM_NAME_LEN	32
15 #define TXA_FLUSH_THRESHOLD	1024
16 #define TXA_RETRY_CNT		100
17 #define TXA_MAX_NB_TX		128
18 #define TXA_INVALID_DEV_ID	INT32_C(-1)
19 #define TXA_INVALID_SERVICE_ID	INT64_C(-1)
20 
21 #define TXA_ADAPTER_ARRAY "txa_adapter_array"
22 #define TXA_SERVICE_DATA_ARRAY "txa_service_data_array"
23 
24 #define txa_evdev(id) (&rte_eventdevs[txa_dev_id_array[(id)]])
25 
26 #define txa_dev_caps_get(id) txa_evdev((id))->dev_ops->eth_tx_adapter_caps_get
27 
28 #define txa_dev_adapter_create(t) txa_evdev(t)->dev_ops->eth_tx_adapter_create
29 
30 #define txa_dev_adapter_create_ext(t) \
31 				txa_evdev(t)->dev_ops->eth_tx_adapter_create
32 
33 #define txa_dev_adapter_free(t) txa_evdev(t)->dev_ops->eth_tx_adapter_free
34 
35 #define txa_dev_queue_add(id) txa_evdev(id)->dev_ops->eth_tx_adapter_queue_add
36 
37 #define txa_dev_queue_del(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_del
38 
39 #define txa_dev_start(t) txa_evdev(t)->dev_ops->eth_tx_adapter_start
40 
41 #define txa_dev_stop(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stop
42 
43 #define txa_dev_stats_reset(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_reset
44 
45 #define txa_dev_stats_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_get
46 
47 #define txa_dev_instance_get(id) \
48 			txa_evdev(id)->dev_ops->eth_tx_adapter_instance_get
49 
50 #define RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) \
51 do { \
52 	if (!txa_valid_id(id)) { \
53 		RTE_EDEV_LOG_ERR("Invalid eth Tx adapter id = %d", id); \
54 		return retval; \
55 	} \
56 } while (0)
57 
58 #define TXA_CHECK_OR_ERR_RET(id) \
59 do {\
60 	int ret; \
61 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET((id), -EINVAL); \
62 	ret = txa_init(); \
63 	if (ret != 0) \
64 		return ret; \
65 	if (!txa_adapter_exist((id))) \
66 		return -EINVAL; \
67 } while (0)
68 
69 #define TXA_CHECK_TXQ(dev, queue) \
70 do {\
71 	if ((dev)->data->nb_tx_queues == 0) { \
72 		RTE_EDEV_LOG_ERR("No tx queues configured"); \
73 		return -EINVAL; \
74 	} \
75 	if ((queue) != -1 && \
76 		(uint16_t)(queue) >= (dev)->data->nb_tx_queues) { \
77 		RTE_EDEV_LOG_ERR("Invalid tx queue_id %" PRIu16, \
78 				(uint16_t)(queue)); \
79 		return -EINVAL; \
80 	} \
81 } while (0)
82 
83 /* Tx retry callback structure */
84 struct txa_retry {
85 	/* Ethernet port id */
86 	uint16_t port_id;
87 	/* Tx queue */
88 	uint16_t tx_queue;
89 	/* Adapter ID */
90 	uint8_t id;
91 };
92 
93 /* Per queue structure */
94 struct txa_service_queue_info {
95 	/* Queue has been added */
96 	uint8_t added;
97 	/* Retry callback argument */
98 	struct txa_retry txa_retry;
99 	/* Tx buffer */
100 	struct rte_eth_dev_tx_buffer *tx_buf;
101 };
102 
103 /* PMD private structure */
104 struct txa_service_data {
105 	/* Max mbufs processed in any service function invocation */
106 	uint32_t max_nb_tx;
107 	/* Number of Tx queues in adapter */
108 	uint32_t nb_queues;
109 	/*  Synchronization with data path */
110 	rte_spinlock_t tx_lock;
111 	/* Event port ID */
112 	uint8_t port_id;
113 	/* Event device identifier */
114 	uint8_t eventdev_id;
115 	/* Highest port id supported + 1 */
116 	uint16_t dev_count;
117 	/* Loop count to flush Tx buffers */
118 	int loop_cnt;
119 	/* Per ethernet device structure */
120 	struct txa_service_ethdev *txa_ethdev;
121 	/* Statistics */
122 	struct rte_event_eth_tx_adapter_stats stats;
123 	/* Adapter Identifier */
124 	uint8_t id;
125 	/* Conf arg must be freed */
126 	uint8_t conf_free;
127 	/* Configuration callback */
128 	rte_event_eth_tx_adapter_conf_cb conf_cb;
129 	/* Configuration callback argument */
130 	void *conf_arg;
131 	/* socket id */
132 	int socket_id;
133 	/* Per adapter EAL service */
134 	int64_t service_id;
135 	/* Memory allocation name */
136 	char mem_name[TXA_MEM_NAME_LEN];
137 } __rte_cache_aligned;
138 
139 /* Per eth device structure */
140 struct txa_service_ethdev {
141 	/* Pointer to ethernet device */
142 	struct rte_eth_dev *dev;
143 	/* Number of queues added */
144 	uint16_t nb_queues;
145 	/* PMD specific queue data */
146 	void *queues;
147 };
148 
149 /* Array of adapter instances, initialized with event device id
150  * when adapter is created
151  */
152 static int *txa_dev_id_array;
153 
154 /* Array of pointers to service implementation data */
155 static struct txa_service_data **txa_service_data_array;
156 
157 static int32_t txa_service_func(void *args);
158 static int txa_service_adapter_create_ext(uint8_t id,
159 			struct rte_eventdev *dev,
160 			rte_event_eth_tx_adapter_conf_cb conf_cb,
161 			void *conf_arg);
162 static int txa_service_queue_del(uint8_t id,
163 				const struct rte_eth_dev *dev,
164 				int32_t tx_queue_id);
165 
166 static int
167 txa_adapter_exist(uint8_t id)
168 {
169 	return txa_dev_id_array[id] != TXA_INVALID_DEV_ID;
170 }
171 
172 static inline int
173 txa_valid_id(uint8_t id)
174 {
175 	return id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
176 }
177 
178 static void *
179 txa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
180 {
181 	const struct rte_memzone *mz;
182 	unsigned int sz;
183 
184 	sz = elt_size * nb_elems;
185 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
186 
187 	mz = rte_memzone_lookup(name);
188 	if (mz == NULL) {
189 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
190 						 RTE_CACHE_LINE_SIZE);
191 		if (mz == NULL) {
192 			RTE_EDEV_LOG_ERR("failed to reserve memzone"
193 					" name = %s err = %"
194 					PRId32, name, rte_errno);
195 			return NULL;
196 		}
197 	}
198 
199 	return  mz->addr;
200 }
201 
202 static int
203 txa_lookup(void)
204 {
205 	const struct rte_memzone *mz;
206 
207 	if (txa_dev_id_array == NULL) {
208 		mz = rte_memzone_lookup(TXA_ADAPTER_ARRAY);
209 		if (mz == NULL)
210 			return -ENOMEM;
211 		txa_dev_id_array = mz->addr;
212 	}
213 
214 	if (txa_service_data_array == NULL) {
215 		mz = rte_memzone_lookup(TXA_SERVICE_DATA_ARRAY);
216 		if (mz == NULL)
217 			return -ENOMEM;
218 		txa_service_data_array = mz->addr;
219 	}
220 
221 	return 0;
222 }
223 
224 static int
225 txa_dev_id_array_init(void)
226 {
227 	if (txa_dev_id_array == NULL) {
228 		int i;
229 
230 		txa_dev_id_array = txa_memzone_array_get(TXA_ADAPTER_ARRAY,
231 					sizeof(int),
232 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
233 		if (txa_dev_id_array == NULL)
234 			return -ENOMEM;
235 
236 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
237 			txa_dev_id_array[i] = TXA_INVALID_DEV_ID;
238 	}
239 
240 	return 0;
241 }
242 
243 static int
244 txa_init(void)
245 {
246 	return txa_dev_id_array_init();
247 }
248 
249 static int
250 txa_service_data_init(void)
251 {
252 	if (txa_service_data_array == NULL) {
253 		int i;
254 
255 		txa_service_data_array =
256 				txa_memzone_array_get(TXA_SERVICE_DATA_ARRAY,
257 					sizeof(*txa_service_data_array),
258 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
259 		if (txa_service_data_array == NULL)
260 			return -ENOMEM;
261 
262 		/* Reset the txa service pointers */
263 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
264 			txa_service_data_array[i] = NULL;
265 	}
266 
267 	return 0;
268 }
269 
270 static inline struct txa_service_data *
271 txa_service_id_to_data(uint8_t id)
272 {
273 	return txa_service_data_array[id];
274 }
275 
276 static inline struct txa_service_queue_info *
277 txa_service_queue(struct txa_service_data *txa, uint16_t port_id,
278 		uint16_t tx_queue_id)
279 {
280 	struct txa_service_queue_info *tqi;
281 
282 	if (unlikely(txa->txa_ethdev == NULL || txa->dev_count < port_id + 1))
283 		return NULL;
284 
285 	tqi = txa->txa_ethdev[port_id].queues;
286 
287 	return likely(tqi != NULL) ? tqi + tx_queue_id : NULL;
288 }
289 
290 static int
291 txa_service_conf_cb(uint8_t __rte_unused id, uint8_t dev_id,
292 		struct rte_event_eth_tx_adapter_conf *conf, void *arg)
293 {
294 	int ret;
295 	struct rte_eventdev *dev;
296 	struct rte_event_port_conf *pc;
297 	struct rte_event_dev_config dev_conf;
298 	int started;
299 	uint8_t port_id;
300 
301 	pc = arg;
302 	dev = &rte_eventdevs[dev_id];
303 	dev_conf = dev->data->dev_conf;
304 
305 	started = dev->data->dev_started;
306 	if (started)
307 		rte_event_dev_stop(dev_id);
308 
309 	port_id = dev_conf.nb_event_ports;
310 	dev_conf.nb_event_ports += 1;
311 
312 	ret = rte_event_dev_configure(dev_id, &dev_conf);
313 	if (ret) {
314 		RTE_EDEV_LOG_ERR("failed to configure event dev %u",
315 						dev_id);
316 		if (started) {
317 			if (rte_event_dev_start(dev_id))
318 				return -EIO;
319 		}
320 		return ret;
321 	}
322 
323 	ret = rte_event_port_setup(dev_id, port_id, pc);
324 	if (ret) {
325 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
326 					port_id);
327 		if (started) {
328 			if (rte_event_dev_start(dev_id))
329 				return -EIO;
330 		}
331 		return ret;
332 	}
333 
334 	conf->event_port_id = port_id;
335 	conf->max_nb_tx = TXA_MAX_NB_TX;
336 	if (started)
337 		ret = rte_event_dev_start(dev_id);
338 	return ret;
339 }
340 
341 static int
342 txa_service_ethdev_alloc(struct txa_service_data *txa)
343 {
344 	struct txa_service_ethdev *txa_ethdev;
345 	uint16_t i, dev_count;
346 
347 	dev_count = rte_eth_dev_count_avail();
348 	if (txa->txa_ethdev && dev_count == txa->dev_count)
349 		return 0;
350 
351 	txa_ethdev = rte_zmalloc_socket(txa->mem_name,
352 					dev_count * sizeof(*txa_ethdev),
353 					0,
354 					txa->socket_id);
355 	if (txa_ethdev == NULL) {
356 		RTE_EDEV_LOG_ERR("Failed to alloc txa::txa_ethdev ");
357 		return -ENOMEM;
358 	}
359 
360 	if (txa->dev_count)
361 		memcpy(txa_ethdev, txa->txa_ethdev,
362 			txa->dev_count * sizeof(*txa_ethdev));
363 
364 	RTE_ETH_FOREACH_DEV(i) {
365 		if (i == dev_count)
366 			break;
367 		txa_ethdev[i].dev = &rte_eth_devices[i];
368 	}
369 
370 	txa->txa_ethdev = txa_ethdev;
371 	txa->dev_count = dev_count;
372 	return 0;
373 }
374 
375 static int
376 txa_service_queue_array_alloc(struct txa_service_data *txa,
377 			uint16_t port_id)
378 {
379 	struct txa_service_queue_info *tqi;
380 	uint16_t nb_queue;
381 	int ret;
382 
383 	ret = txa_service_ethdev_alloc(txa);
384 	if (ret != 0)
385 		return ret;
386 
387 	if (txa->txa_ethdev[port_id].queues)
388 		return 0;
389 
390 	nb_queue = txa->txa_ethdev[port_id].dev->data->nb_tx_queues;
391 	tqi = rte_zmalloc_socket(txa->mem_name,
392 				nb_queue *
393 				sizeof(struct txa_service_queue_info), 0,
394 				txa->socket_id);
395 	if (tqi == NULL)
396 		return -ENOMEM;
397 	txa->txa_ethdev[port_id].queues = tqi;
398 	return 0;
399 }
400 
401 static void
402 txa_service_queue_array_free(struct txa_service_data *txa,
403 			uint16_t port_id)
404 {
405 	struct txa_service_ethdev *txa_ethdev;
406 	struct txa_service_queue_info *tqi;
407 
408 	txa_ethdev = &txa->txa_ethdev[port_id];
409 	if (txa->txa_ethdev == NULL || txa_ethdev->nb_queues != 0)
410 		return;
411 
412 	tqi = txa_ethdev->queues;
413 	txa_ethdev->queues = NULL;
414 	rte_free(tqi);
415 
416 	if (txa->nb_queues == 0) {
417 		rte_free(txa->txa_ethdev);
418 		txa->txa_ethdev = NULL;
419 	}
420 }
421 
422 static void
423 txa_service_unregister(struct txa_service_data *txa)
424 {
425 	if (txa->service_id != TXA_INVALID_SERVICE_ID) {
426 		rte_service_component_runstate_set(txa->service_id, 0);
427 		while (rte_service_may_be_active(txa->service_id))
428 			rte_pause();
429 		rte_service_component_unregister(txa->service_id);
430 	}
431 	txa->service_id = TXA_INVALID_SERVICE_ID;
432 }
433 
434 static int
435 txa_service_register(struct txa_service_data *txa)
436 {
437 	int ret;
438 	struct rte_service_spec service;
439 	struct rte_event_eth_tx_adapter_conf conf;
440 
441 	if (txa->service_id != TXA_INVALID_SERVICE_ID)
442 		return 0;
443 
444 	memset(&service, 0, sizeof(service));
445 	snprintf(service.name, TXA_SERVICE_NAME_LEN, "txa_%d", txa->id);
446 	service.socket_id = txa->socket_id;
447 	service.callback = txa_service_func;
448 	service.callback_userdata = txa;
449 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
450 	ret = rte_service_component_register(&service,
451 					(uint32_t *)&txa->service_id);
452 	if (ret) {
453 		RTE_EDEV_LOG_ERR("failed to register service %s err = %"
454 				 PRId32, service.name, ret);
455 		return ret;
456 	}
457 
458 	ret = txa->conf_cb(txa->id, txa->eventdev_id, &conf, txa->conf_arg);
459 	if (ret) {
460 		txa_service_unregister(txa);
461 		return ret;
462 	}
463 
464 	rte_service_component_runstate_set(txa->service_id, 1);
465 	txa->port_id = conf.event_port_id;
466 	txa->max_nb_tx = conf.max_nb_tx;
467 	return 0;
468 }
469 
470 static struct rte_eth_dev_tx_buffer *
471 txa_service_tx_buf_alloc(struct txa_service_data *txa,
472 			const struct rte_eth_dev *dev)
473 {
474 	struct rte_eth_dev_tx_buffer *tb;
475 	uint16_t port_id;
476 
477 	port_id = dev->data->port_id;
478 	tb = rte_zmalloc_socket(txa->mem_name,
479 				RTE_ETH_TX_BUFFER_SIZE(TXA_BATCH_SIZE),
480 				0,
481 				rte_eth_dev_socket_id(port_id));
482 	if (tb == NULL)
483 		RTE_EDEV_LOG_ERR("Failed to allocate memory for tx buffer");
484 	return tb;
485 }
486 
487 static int
488 txa_service_is_queue_added(struct txa_service_data *txa,
489 			const struct rte_eth_dev *dev,
490 			uint16_t tx_queue_id)
491 {
492 	struct txa_service_queue_info *tqi;
493 
494 	tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
495 	return tqi && tqi->added;
496 }
497 
498 static int
499 txa_service_ctrl(uint8_t id, int start)
500 {
501 	int ret;
502 	struct txa_service_data *txa;
503 
504 	txa = txa_service_id_to_data(id);
505 	if (txa == NULL || txa->service_id == TXA_INVALID_SERVICE_ID)
506 		return 0;
507 
508 	rte_spinlock_lock(&txa->tx_lock);
509 	ret = rte_service_runstate_set(txa->service_id, start);
510 	rte_spinlock_unlock(&txa->tx_lock);
511 
512 	return ret;
513 }
514 
515 static void
516 txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
517 			void *userdata)
518 {
519 	struct txa_retry *tr;
520 	struct txa_service_data *data;
521 	struct rte_event_eth_tx_adapter_stats *stats;
522 	uint16_t sent = 0;
523 	unsigned int retry = 0;
524 	uint16_t i, n;
525 
526 	tr = (struct txa_retry *)(uintptr_t)userdata;
527 	data = txa_service_id_to_data(tr->id);
528 	stats = &data->stats;
529 
530 	do {
531 		n = rte_eth_tx_burst(tr->port_id, tr->tx_queue,
532 			       &pkts[sent], unsent - sent);
533 
534 		sent += n;
535 	} while (sent != unsent && retry++ < TXA_RETRY_CNT);
536 
537 	for (i = sent; i < unsent; i++)
538 		rte_pktmbuf_free(pkts[i]);
539 
540 	stats->tx_retry += retry;
541 	stats->tx_packets += sent;
542 	stats->tx_dropped += unsent - sent;
543 }
544 
545 static uint16_t
546 txa_process_event_vector(struct txa_service_data *txa,
547 			 struct rte_event_vector *vec)
548 {
549 	struct txa_service_queue_info *tqi;
550 	uint16_t port, queue, nb_tx = 0;
551 	struct rte_mbuf **mbufs;
552 	int i;
553 
554 	mbufs = (struct rte_mbuf **)vec->mbufs;
555 	if (vec->attr_valid) {
556 		port = vec->port;
557 		queue = vec->queue;
558 		tqi = txa_service_queue(txa, port, queue);
559 		if (unlikely(tqi == NULL || !tqi->added)) {
560 			rte_pktmbuf_free_bulk(mbufs, vec->nb_elem);
561 			rte_mempool_put(rte_mempool_from_obj(vec), vec);
562 			return 0;
563 		}
564 		for (i = 0; i < vec->nb_elem; i++) {
565 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
566 						   mbufs[i]);
567 		}
568 	} else {
569 		for (i = 0; i < vec->nb_elem; i++) {
570 			port = mbufs[i]->port;
571 			queue = rte_event_eth_tx_adapter_txq_get(mbufs[i]);
572 			tqi = txa_service_queue(txa, port, queue);
573 			if (unlikely(tqi == NULL || !tqi->added)) {
574 				rte_pktmbuf_free(mbufs[i]);
575 				continue;
576 			}
577 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
578 						   mbufs[i]);
579 		}
580 	}
581 	rte_mempool_put(rte_mempool_from_obj(vec), vec);
582 
583 	return nb_tx;
584 }
585 
586 static void
587 txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
588 	uint32_t n)
589 {
590 	uint32_t i;
591 	uint16_t nb_tx;
592 	struct rte_event_eth_tx_adapter_stats *stats;
593 
594 	stats = &txa->stats;
595 
596 	nb_tx = 0;
597 	for (i = 0; i < n; i++) {
598 		uint16_t port;
599 		uint16_t queue;
600 		struct txa_service_queue_info *tqi;
601 
602 		if (!(ev[i].event_type & RTE_EVENT_TYPE_VECTOR)) {
603 			struct rte_mbuf *m;
604 
605 			m = ev[i].mbuf;
606 			port = m->port;
607 			queue = rte_event_eth_tx_adapter_txq_get(m);
608 
609 			tqi = txa_service_queue(txa, port, queue);
610 			if (unlikely(tqi == NULL || !tqi->added)) {
611 				rte_pktmbuf_free(m);
612 				continue;
613 			}
614 
615 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
616 		} else {
617 			nb_tx += txa_process_event_vector(txa, ev[i].vec);
618 		}
619 	}
620 
621 	stats->tx_packets += nb_tx;
622 }
623 
624 static int32_t
625 txa_service_func(void *args)
626 {
627 	struct txa_service_data *txa = args;
628 	uint8_t dev_id;
629 	uint8_t port;
630 	uint16_t n;
631 	uint32_t nb_tx, max_nb_tx;
632 	struct rte_event ev[TXA_BATCH_SIZE];
633 
634 	dev_id = txa->eventdev_id;
635 	max_nb_tx = txa->max_nb_tx;
636 	port = txa->port_id;
637 
638 	if (txa->nb_queues == 0)
639 		return 0;
640 
641 	if (!rte_spinlock_trylock(&txa->tx_lock))
642 		return 0;
643 
644 	for (nb_tx = 0; nb_tx < max_nb_tx; nb_tx += n) {
645 
646 		n = rte_event_dequeue_burst(dev_id, port, ev, RTE_DIM(ev), 0);
647 		if (!n)
648 			break;
649 		txa_service_tx(txa, ev, n);
650 	}
651 
652 	if ((txa->loop_cnt++ & (TXA_FLUSH_THRESHOLD - 1)) == 0) {
653 
654 		struct txa_service_ethdev *tdi;
655 		struct txa_service_queue_info *tqi;
656 		struct rte_eth_dev *dev;
657 		uint16_t i;
658 
659 		tdi = txa->txa_ethdev;
660 		nb_tx = 0;
661 
662 		RTE_ETH_FOREACH_DEV(i) {
663 			uint16_t q;
664 
665 			if (i == txa->dev_count)
666 				break;
667 
668 			dev = tdi[i].dev;
669 			if (tdi[i].nb_queues == 0)
670 				continue;
671 			for (q = 0; q < dev->data->nb_tx_queues; q++) {
672 
673 				tqi = txa_service_queue(txa, i, q);
674 				if (unlikely(tqi == NULL || !tqi->added))
675 					continue;
676 
677 				nb_tx += rte_eth_tx_buffer_flush(i, q,
678 							tqi->tx_buf);
679 			}
680 		}
681 
682 		txa->stats.tx_packets += nb_tx;
683 	}
684 	rte_spinlock_unlock(&txa->tx_lock);
685 	return 0;
686 }
687 
688 static int
689 txa_service_adapter_create(uint8_t id, struct rte_eventdev *dev,
690 			struct rte_event_port_conf *port_conf)
691 {
692 	struct txa_service_data *txa;
693 	struct rte_event_port_conf *cb_conf;
694 	int ret;
695 
696 	cb_conf = rte_malloc(NULL, sizeof(*cb_conf), 0);
697 	if (cb_conf == NULL)
698 		return -ENOMEM;
699 
700 	*cb_conf = *port_conf;
701 	ret = txa_service_adapter_create_ext(id, dev, txa_service_conf_cb,
702 					cb_conf);
703 	if (ret) {
704 		rte_free(cb_conf);
705 		return ret;
706 	}
707 
708 	txa = txa_service_id_to_data(id);
709 	txa->conf_free = 1;
710 	return ret;
711 }
712 
713 static int
714 txa_service_adapter_create_ext(uint8_t id, struct rte_eventdev *dev,
715 			rte_event_eth_tx_adapter_conf_cb conf_cb,
716 			void *conf_arg)
717 {
718 	struct txa_service_data *txa;
719 	int socket_id;
720 	char mem_name[TXA_SERVICE_NAME_LEN];
721 	int ret;
722 
723 	if (conf_cb == NULL)
724 		return -EINVAL;
725 
726 	socket_id = dev->data->socket_id;
727 	snprintf(mem_name, TXA_MEM_NAME_LEN,
728 		"rte_event_eth_txa_%d",
729 		id);
730 
731 	ret = txa_service_data_init();
732 	if (ret != 0)
733 		return ret;
734 
735 	txa = rte_zmalloc_socket(mem_name,
736 				sizeof(*txa),
737 				RTE_CACHE_LINE_SIZE, socket_id);
738 	if (txa == NULL) {
739 		RTE_EDEV_LOG_ERR("failed to get mem for tx adapter");
740 		return -ENOMEM;
741 	}
742 
743 	txa->id = id;
744 	txa->eventdev_id = dev->data->dev_id;
745 	txa->socket_id = socket_id;
746 	strncpy(txa->mem_name, mem_name, TXA_SERVICE_NAME_LEN);
747 	txa->conf_cb = conf_cb;
748 	txa->conf_arg = conf_arg;
749 	txa->service_id = TXA_INVALID_SERVICE_ID;
750 	rte_spinlock_init(&txa->tx_lock);
751 	txa_service_data_array[id] = txa;
752 
753 	return 0;
754 }
755 
756 static int
757 txa_service_event_port_get(uint8_t id, uint8_t *port)
758 {
759 	struct txa_service_data *txa;
760 
761 	txa = txa_service_id_to_data(id);
762 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
763 		return -ENODEV;
764 
765 	*port = txa->port_id;
766 	return 0;
767 }
768 
769 static int
770 txa_service_adapter_free(uint8_t id)
771 {
772 	struct txa_service_data *txa;
773 
774 	txa = txa_service_id_to_data(id);
775 	if (txa->nb_queues) {
776 		RTE_EDEV_LOG_ERR("%" PRIu16 " Tx queues not deleted",
777 				txa->nb_queues);
778 		return -EBUSY;
779 	}
780 
781 	if (txa->conf_free)
782 		rte_free(txa->conf_arg);
783 	rte_free(txa);
784 	return 0;
785 }
786 
787 static int
788 txa_service_queue_add(uint8_t id,
789 		__rte_unused struct rte_eventdev *dev,
790 		const struct rte_eth_dev *eth_dev,
791 		int32_t tx_queue_id)
792 {
793 	struct txa_service_data *txa;
794 	struct txa_service_ethdev *tdi;
795 	struct txa_service_queue_info *tqi;
796 	struct rte_eth_dev_tx_buffer *tb;
797 	struct txa_retry *txa_retry;
798 	int ret = 0;
799 
800 	txa = txa_service_id_to_data(id);
801 
802 	if (tx_queue_id == -1) {
803 		int nb_queues;
804 		uint16_t i, j;
805 		uint16_t *qdone;
806 
807 		nb_queues = eth_dev->data->nb_tx_queues;
808 		if (txa->dev_count > eth_dev->data->port_id) {
809 			tdi = &txa->txa_ethdev[eth_dev->data->port_id];
810 			nb_queues -= tdi->nb_queues;
811 		}
812 
813 		qdone = rte_zmalloc(txa->mem_name,
814 				nb_queues * sizeof(*qdone), 0);
815 		if (qdone == NULL)
816 			return -ENOMEM;
817 		j = 0;
818 		for (i = 0; i < nb_queues; i++) {
819 			if (txa_service_is_queue_added(txa, eth_dev, i))
820 				continue;
821 			ret = txa_service_queue_add(id, dev, eth_dev, i);
822 			if (ret == 0)
823 				qdone[j++] = i;
824 			else
825 				break;
826 		}
827 
828 		if (i != nb_queues) {
829 			for (i = 0; i < j; i++)
830 				txa_service_queue_del(id, eth_dev, qdone[i]);
831 		}
832 		rte_free(qdone);
833 		return ret;
834 	}
835 
836 	ret = txa_service_register(txa);
837 	if (ret)
838 		return ret;
839 
840 	rte_spinlock_lock(&txa->tx_lock);
841 
842 	if (txa_service_is_queue_added(txa, eth_dev, tx_queue_id))
843 		goto ret_unlock;
844 
845 	ret = txa_service_queue_array_alloc(txa, eth_dev->data->port_id);
846 	if (ret)
847 		goto err_unlock;
848 
849 	tb = txa_service_tx_buf_alloc(txa, eth_dev);
850 	if (tb == NULL)
851 		goto err_unlock;
852 
853 	tdi = &txa->txa_ethdev[eth_dev->data->port_id];
854 	tqi = txa_service_queue(txa, eth_dev->data->port_id, tx_queue_id);
855 	if (tqi == NULL)
856 		goto err_unlock;
857 
858 	txa_retry = &tqi->txa_retry;
859 	txa_retry->id = txa->id;
860 	txa_retry->port_id = eth_dev->data->port_id;
861 	txa_retry->tx_queue = tx_queue_id;
862 
863 	rte_eth_tx_buffer_init(tb, TXA_BATCH_SIZE);
864 	rte_eth_tx_buffer_set_err_callback(tb,
865 		txa_service_buffer_retry, txa_retry);
866 
867 	tqi->tx_buf = tb;
868 	tqi->added = 1;
869 	tdi->nb_queues++;
870 	txa->nb_queues++;
871 
872 ret_unlock:
873 	rte_spinlock_unlock(&txa->tx_lock);
874 	return 0;
875 
876 err_unlock:
877 	if (txa->nb_queues == 0) {
878 		txa_service_queue_array_free(txa,
879 					eth_dev->data->port_id);
880 		txa_service_unregister(txa);
881 	}
882 
883 	rte_spinlock_unlock(&txa->tx_lock);
884 	return -1;
885 }
886 
887 static int
888 txa_service_queue_del(uint8_t id,
889 		const struct rte_eth_dev *dev,
890 		int32_t tx_queue_id)
891 {
892 	struct txa_service_data *txa;
893 	struct txa_service_queue_info *tqi;
894 	struct rte_eth_dev_tx_buffer *tb;
895 	uint16_t port_id;
896 
897 	txa = txa_service_id_to_data(id);
898 	port_id = dev->data->port_id;
899 
900 	if (tx_queue_id == -1) {
901 		uint16_t i, q, nb_queues;
902 		int ret = 0;
903 
904 		nb_queues = txa->txa_ethdev[port_id].nb_queues;
905 		if (nb_queues == 0)
906 			return 0;
907 
908 		i = 0;
909 		q = 0;
910 		tqi = txa->txa_ethdev[port_id].queues;
911 
912 		while (i < nb_queues) {
913 
914 			if (tqi[q].added) {
915 				ret = txa_service_queue_del(id, dev, q);
916 				if (ret != 0)
917 					break;
918 			}
919 			i++;
920 			q++;
921 		}
922 		return ret;
923 	}
924 
925 	txa = txa_service_id_to_data(id);
926 
927 	rte_spinlock_lock(&txa->tx_lock);
928 	tqi = txa_service_queue(txa, port_id, tx_queue_id);
929 	if (tqi == NULL || !tqi->added)
930 		goto ret_unlock;
931 
932 	tb = tqi->tx_buf;
933 	tqi->added = 0;
934 	tqi->tx_buf = NULL;
935 	rte_free(tb);
936 	txa->nb_queues--;
937 	txa->txa_ethdev[port_id].nb_queues--;
938 
939 	txa_service_queue_array_free(txa, port_id);
940 
941 ret_unlock:
942 	rte_spinlock_unlock(&txa->tx_lock);
943 	return 0;
944 }
945 
946 static int
947 txa_service_id_get(uint8_t id, uint32_t *service_id)
948 {
949 	struct txa_service_data *txa;
950 
951 	txa = txa_service_id_to_data(id);
952 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
953 		return -ESRCH;
954 
955 	if (service_id == NULL)
956 		return -EINVAL;
957 
958 	*service_id = txa->service_id;
959 	return 0;
960 }
961 
962 static int
963 txa_service_start(uint8_t id)
964 {
965 	return txa_service_ctrl(id, 1);
966 }
967 
968 static int
969 txa_service_stats_get(uint8_t id,
970 		struct rte_event_eth_tx_adapter_stats *stats)
971 {
972 	struct txa_service_data *txa;
973 
974 	txa = txa_service_id_to_data(id);
975 	*stats = txa->stats;
976 	return 0;
977 }
978 
979 static int
980 txa_service_stats_reset(uint8_t id)
981 {
982 	struct txa_service_data *txa;
983 
984 	txa = txa_service_id_to_data(id);
985 	memset(&txa->stats, 0, sizeof(txa->stats));
986 	return 0;
987 }
988 
989 static int
990 txa_service_stop(uint8_t id)
991 {
992 	return txa_service_ctrl(id, 0);
993 }
994 
995 
996 int
997 rte_event_eth_tx_adapter_create(uint8_t id, uint8_t dev_id,
998 				struct rte_event_port_conf *port_conf)
999 {
1000 	struct rte_eventdev *dev;
1001 	int ret;
1002 
1003 	if (port_conf == NULL)
1004 		return -EINVAL;
1005 
1006 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1007 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1008 
1009 	dev = &rte_eventdevs[dev_id];
1010 
1011 	ret = txa_init();
1012 	if (ret != 0)
1013 		return ret;
1014 
1015 	if (txa_adapter_exist(id))
1016 		return -EEXIST;
1017 
1018 	txa_dev_id_array[id] = dev_id;
1019 	if (txa_dev_adapter_create(id))
1020 		ret = txa_dev_adapter_create(id)(id, dev);
1021 
1022 	if (ret != 0) {
1023 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1024 		return ret;
1025 	}
1026 
1027 	ret = txa_service_adapter_create(id, dev, port_conf);
1028 	if (ret != 0) {
1029 		if (txa_dev_adapter_free(id))
1030 			txa_dev_adapter_free(id)(id, dev);
1031 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1032 		return ret;
1033 	}
1034 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, NULL, port_conf,
1035 		ret);
1036 	txa_dev_id_array[id] = dev_id;
1037 	return 0;
1038 }
1039 
1040 int
1041 rte_event_eth_tx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1042 				rte_event_eth_tx_adapter_conf_cb conf_cb,
1043 				void *conf_arg)
1044 {
1045 	struct rte_eventdev *dev;
1046 	int ret;
1047 
1048 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1049 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1050 
1051 	ret = txa_init();
1052 	if (ret != 0)
1053 		return ret;
1054 
1055 	if (txa_adapter_exist(id))
1056 		return -EINVAL;
1057 
1058 	dev = &rte_eventdevs[dev_id];
1059 
1060 	txa_dev_id_array[id] = dev_id;
1061 	if (txa_dev_adapter_create_ext(id))
1062 		ret = txa_dev_adapter_create_ext(id)(id, dev);
1063 
1064 	if (ret != 0) {
1065 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1066 		return ret;
1067 	}
1068 
1069 	ret = txa_service_adapter_create_ext(id, dev, conf_cb, conf_arg);
1070 	if (ret != 0) {
1071 		if (txa_dev_adapter_free(id))
1072 			txa_dev_adapter_free(id)(id, dev);
1073 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1074 		return ret;
1075 	}
1076 
1077 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, conf_cb, conf_arg,
1078 		ret);
1079 	txa_dev_id_array[id] = dev_id;
1080 	return 0;
1081 }
1082 
1083 
1084 int
1085 rte_event_eth_tx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
1086 {
1087 	TXA_CHECK_OR_ERR_RET(id);
1088 
1089 	return txa_service_event_port_get(id, event_port_id);
1090 }
1091 
1092 int
1093 rte_event_eth_tx_adapter_free(uint8_t id)
1094 {
1095 	int ret;
1096 
1097 	TXA_CHECK_OR_ERR_RET(id);
1098 
1099 	ret = txa_dev_adapter_free(id) ?
1100 		txa_dev_adapter_free(id)(id, txa_evdev(id)) :
1101 		0;
1102 
1103 	if (ret == 0)
1104 		ret = txa_service_adapter_free(id);
1105 	txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1106 
1107 	rte_eventdev_trace_eth_tx_adapter_free(id, ret);
1108 	return ret;
1109 }
1110 
1111 int
1112 rte_event_eth_tx_adapter_queue_add(uint8_t id,
1113 				uint16_t eth_dev_id,
1114 				int32_t queue)
1115 {
1116 	struct rte_eth_dev *eth_dev;
1117 	int ret;
1118 	uint32_t caps;
1119 
1120 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1121 	TXA_CHECK_OR_ERR_RET(id);
1122 
1123 	eth_dev = &rte_eth_devices[eth_dev_id];
1124 	TXA_CHECK_TXQ(eth_dev, queue);
1125 
1126 	caps = 0;
1127 	if (txa_dev_caps_get(id))
1128 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1129 
1130 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1131 		ret =  txa_dev_queue_add(id) ?
1132 					txa_dev_queue_add(id)(id,
1133 							txa_evdev(id),
1134 							eth_dev,
1135 							queue) : 0;
1136 	else
1137 		ret = txa_service_queue_add(id, txa_evdev(id), eth_dev, queue);
1138 
1139 	rte_eventdev_trace_eth_tx_adapter_queue_add(id, eth_dev_id, queue,
1140 		ret);
1141 	return ret;
1142 }
1143 
1144 int
1145 rte_event_eth_tx_adapter_queue_del(uint8_t id,
1146 				uint16_t eth_dev_id,
1147 				int32_t queue)
1148 {
1149 	struct rte_eth_dev *eth_dev;
1150 	int ret;
1151 	uint32_t caps;
1152 
1153 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1154 	TXA_CHECK_OR_ERR_RET(id);
1155 
1156 	eth_dev = &rte_eth_devices[eth_dev_id];
1157 
1158 	caps = 0;
1159 
1160 	if (txa_dev_caps_get(id))
1161 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1162 
1163 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1164 		ret =  txa_dev_queue_del(id) ?
1165 					txa_dev_queue_del(id)(id, txa_evdev(id),
1166 							eth_dev,
1167 							queue) : 0;
1168 	else
1169 		ret = txa_service_queue_del(id, eth_dev, queue);
1170 
1171 	rte_eventdev_trace_eth_tx_adapter_queue_del(id, eth_dev_id, queue,
1172 		ret);
1173 	return ret;
1174 }
1175 
1176 int
1177 rte_event_eth_tx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1178 {
1179 	TXA_CHECK_OR_ERR_RET(id);
1180 
1181 	return txa_service_id_get(id, service_id);
1182 }
1183 
1184 int
1185 rte_event_eth_tx_adapter_start(uint8_t id)
1186 {
1187 	int ret;
1188 
1189 	TXA_CHECK_OR_ERR_RET(id);
1190 
1191 	ret = txa_dev_start(id) ? txa_dev_start(id)(id, txa_evdev(id)) : 0;
1192 	if (ret == 0)
1193 		ret = txa_service_start(id);
1194 	rte_eventdev_trace_eth_tx_adapter_start(id, ret);
1195 	return ret;
1196 }
1197 
1198 int
1199 rte_event_eth_tx_adapter_stats_get(uint8_t id,
1200 				struct rte_event_eth_tx_adapter_stats *stats)
1201 {
1202 	int ret;
1203 
1204 	TXA_CHECK_OR_ERR_RET(id);
1205 
1206 	if (stats == NULL)
1207 		return -EINVAL;
1208 
1209 	*stats = (struct rte_event_eth_tx_adapter_stats){0};
1210 
1211 	ret = txa_dev_stats_get(id) ?
1212 			txa_dev_stats_get(id)(id, txa_evdev(id), stats) : 0;
1213 
1214 	if (ret == 0 && txa_service_id_get(id, NULL) != ESRCH) {
1215 		if (txa_dev_stats_get(id)) {
1216 			struct rte_event_eth_tx_adapter_stats service_stats;
1217 
1218 			ret = txa_service_stats_get(id, &service_stats);
1219 			if (ret == 0) {
1220 				stats->tx_retry += service_stats.tx_retry;
1221 				stats->tx_packets += service_stats.tx_packets;
1222 				stats->tx_dropped += service_stats.tx_dropped;
1223 			}
1224 		} else
1225 			ret = txa_service_stats_get(id, stats);
1226 	}
1227 
1228 	return ret;
1229 }
1230 
1231 int
1232 rte_event_eth_tx_adapter_stats_reset(uint8_t id)
1233 {
1234 	int ret;
1235 
1236 	TXA_CHECK_OR_ERR_RET(id);
1237 
1238 	ret = txa_dev_stats_reset(id) ?
1239 		txa_dev_stats_reset(id)(id, txa_evdev(id)) : 0;
1240 	if (ret == 0)
1241 		ret = txa_service_stats_reset(id);
1242 	return ret;
1243 }
1244 
1245 int
1246 rte_event_eth_tx_adapter_stop(uint8_t id)
1247 {
1248 	int ret;
1249 
1250 	TXA_CHECK_OR_ERR_RET(id);
1251 
1252 	ret = txa_dev_stop(id) ? txa_dev_stop(id)(id,  txa_evdev(id)) : 0;
1253 	if (ret == 0)
1254 		ret = txa_service_stop(id);
1255 	rte_eventdev_trace_eth_tx_adapter_stop(id, ret);
1256 	return ret;
1257 }
1258 
1259 int
1260 rte_event_eth_tx_adapter_instance_get(uint16_t eth_dev_id,
1261 				      uint16_t tx_queue_id,
1262 				      uint8_t *txa_inst_id)
1263 {
1264 	uint8_t id;
1265 	int ret = -EINVAL;
1266 	uint32_t caps;
1267 	struct txa_service_data *txa;
1268 
1269 	if (txa_lookup())
1270 		return -ENOMEM;
1271 
1272 	if (eth_dev_id >= rte_eth_dev_count_avail()) {
1273 		RTE_EDEV_LOG_ERR("Invalid ethernet port id %u", eth_dev_id);
1274 		return -EINVAL;
1275 	}
1276 
1277 	if (tx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_tx_queues) {
1278 		RTE_EDEV_LOG_ERR("Invalid tx queue id %u", tx_queue_id);
1279 		return -EINVAL;
1280 	}
1281 
1282 	if (txa_inst_id == NULL) {
1283 		RTE_EDEV_LOG_ERR("txa_instance_id cannot be NULL");
1284 		return -EINVAL;
1285 	}
1286 
1287 	/* Iterate through all Tx adapter instances */
1288 	for (id = 0; id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; id++) {
1289 		txa = txa_service_id_to_data(id);
1290 		if (!txa)
1291 			continue;
1292 
1293 		caps = 0;
1294 		if (rte_event_eth_tx_adapter_caps_get(txa->eventdev_id,
1295 						      eth_dev_id,
1296 						      &caps))
1297 			continue;
1298 
1299 		if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT) {
1300 			ret = txa_dev_instance_get(id) ?
1301 					txa_dev_instance_get(id)(eth_dev_id,
1302 								 tx_queue_id,
1303 								 txa_inst_id)
1304 							: -EINVAL;
1305 			if (ret == 0)
1306 				return ret;
1307 		} else {
1308 			struct rte_eth_dev *eth_dev;
1309 
1310 			eth_dev = &rte_eth_devices[eth_dev_id];
1311 
1312 			if (txa_service_is_queue_added(txa, eth_dev,
1313 						       tx_queue_id)) {
1314 				*txa_inst_id = txa->id;
1315 				return 0;
1316 			}
1317 		}
1318 	}
1319 
1320 	return -EINVAL;
1321 }
1322