xref: /dpdk/lib/eventdev/rte_event_eth_tx_adapter.c (revision 9ad3a41ab2a10db0059e1decdbf3ec038f348e08)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation.
3  */
4 #include <rte_spinlock.h>
5 #include <rte_service_component.h>
6 #include <ethdev_driver.h>
7 
8 #include "eventdev_pmd.h"
9 #include "eventdev_trace.h"
10 #include "rte_event_eth_tx_adapter.h"
11 
12 #define TXA_BATCH_SIZE		32
13 #define TXA_SERVICE_NAME_LEN	32
14 #define TXA_MEM_NAME_LEN	32
15 #define TXA_FLUSH_THRESHOLD	1024
16 #define TXA_RETRY_CNT		100
17 #define TXA_MAX_NB_TX		128
18 #define TXA_INVALID_DEV_ID	INT32_C(-1)
19 #define TXA_INVALID_SERVICE_ID	INT64_C(-1)
20 
21 #define txa_evdev(id) (&rte_eventdevs[txa_dev_id_array[(id)]])
22 
23 #define txa_dev_caps_get(id) txa_evdev((id))->dev_ops->eth_tx_adapter_caps_get
24 
25 #define txa_dev_adapter_create(t) txa_evdev(t)->dev_ops->eth_tx_adapter_create
26 
27 #define txa_dev_adapter_create_ext(t) \
28 				txa_evdev(t)->dev_ops->eth_tx_adapter_create
29 
30 #define txa_dev_adapter_free(t) txa_evdev(t)->dev_ops->eth_tx_adapter_free
31 
32 #define txa_dev_queue_add(id) txa_evdev(id)->dev_ops->eth_tx_adapter_queue_add
33 
34 #define txa_dev_queue_del(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_del
35 
36 #define txa_dev_start(t) txa_evdev(t)->dev_ops->eth_tx_adapter_start
37 
38 #define txa_dev_stop(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stop
39 
40 #define txa_dev_stats_reset(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_reset
41 
42 #define txa_dev_stats_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_get
43 
44 #define RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) \
45 do { \
46 	if (!txa_valid_id(id)) { \
47 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d", id); \
48 		return retval; \
49 	} \
50 } while (0)
51 
52 #define TXA_CHECK_OR_ERR_RET(id) \
53 do {\
54 	int ret; \
55 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET((id), -EINVAL); \
56 	ret = txa_init(); \
57 	if (ret != 0) \
58 		return ret; \
59 	if (!txa_adapter_exist((id))) \
60 		return -EINVAL; \
61 } while (0)
62 
63 #define TXA_CHECK_TXQ(dev, queue) \
64 do {\
65 	if ((dev)->data->nb_tx_queues == 0) { \
66 		RTE_EDEV_LOG_ERR("No tx queues configured"); \
67 		return -EINVAL; \
68 	} \
69 	if ((queue) != -1 && \
70 		(uint16_t)(queue) >= (dev)->data->nb_tx_queues) { \
71 		RTE_EDEV_LOG_ERR("Invalid tx queue_id %" PRIu16, \
72 				(uint16_t)(queue)); \
73 		return -EINVAL; \
74 	} \
75 } while (0)
76 
77 /* Tx retry callback structure */
78 struct txa_retry {
79 	/* Ethernet port id */
80 	uint16_t port_id;
81 	/* Tx queue */
82 	uint16_t tx_queue;
83 	/* Adapter ID */
84 	uint8_t id;
85 };
86 
87 /* Per queue structure */
88 struct txa_service_queue_info {
89 	/* Queue has been added */
90 	uint8_t added;
91 	/* Retry callback argument */
92 	struct txa_retry txa_retry;
93 	/* Tx buffer */
94 	struct rte_eth_dev_tx_buffer *tx_buf;
95 };
96 
97 /* PMD private structure */
98 struct txa_service_data {
99 	/* Max mbufs processed in any service function invocation */
100 	uint32_t max_nb_tx;
101 	/* Number of Tx queues in adapter */
102 	uint32_t nb_queues;
103 	/*  Synchronization with data path */
104 	rte_spinlock_t tx_lock;
105 	/* Event port ID */
106 	uint8_t port_id;
107 	/* Event device identifier */
108 	uint8_t eventdev_id;
109 	/* Highest port id supported + 1 */
110 	uint16_t dev_count;
111 	/* Loop count to flush Tx buffers */
112 	int loop_cnt;
113 	/* Per ethernet device structure */
114 	struct txa_service_ethdev *txa_ethdev;
115 	/* Statistics */
116 	struct rte_event_eth_tx_adapter_stats stats;
117 	/* Adapter Identifier */
118 	uint8_t id;
119 	/* Conf arg must be freed */
120 	uint8_t conf_free;
121 	/* Configuration callback */
122 	rte_event_eth_tx_adapter_conf_cb conf_cb;
123 	/* Configuration callback argument */
124 	void *conf_arg;
125 	/* socket id */
126 	int socket_id;
127 	/* Per adapter EAL service */
128 	int64_t service_id;
129 	/* Memory allocation name */
130 	char mem_name[TXA_MEM_NAME_LEN];
131 } __rte_cache_aligned;
132 
133 /* Per eth device structure */
134 struct txa_service_ethdev {
135 	/* Pointer to ethernet device */
136 	struct rte_eth_dev *dev;
137 	/* Number of queues added */
138 	uint16_t nb_queues;
139 	/* PMD specific queue data */
140 	void *queues;
141 };
142 
143 /* Array of adapter instances, initialized with event device id
144  * when adapter is created
145  */
146 static int *txa_dev_id_array;
147 
148 /* Array of pointers to service implementation data */
149 static struct txa_service_data **txa_service_data_array;
150 
151 static int32_t txa_service_func(void *args);
152 static int txa_service_adapter_create_ext(uint8_t id,
153 			struct rte_eventdev *dev,
154 			rte_event_eth_tx_adapter_conf_cb conf_cb,
155 			void *conf_arg);
156 static int txa_service_queue_del(uint8_t id,
157 				const struct rte_eth_dev *dev,
158 				int32_t tx_queue_id);
159 
160 static int
161 txa_adapter_exist(uint8_t id)
162 {
163 	return txa_dev_id_array[id] != TXA_INVALID_DEV_ID;
164 }
165 
166 static inline int
167 txa_valid_id(uint8_t id)
168 {
169 	return id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
170 }
171 
172 static void *
173 txa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
174 {
175 	const struct rte_memzone *mz;
176 	unsigned int sz;
177 
178 	sz = elt_size * nb_elems;
179 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
180 
181 	mz = rte_memzone_lookup(name);
182 	if (mz == NULL) {
183 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
184 						 RTE_CACHE_LINE_SIZE);
185 		if (mz == NULL) {
186 			RTE_EDEV_LOG_ERR("failed to reserve memzone"
187 					" name = %s err = %"
188 					PRId32, name, rte_errno);
189 			return NULL;
190 		}
191 	}
192 
193 	return  mz->addr;
194 }
195 
196 static int
197 txa_dev_id_array_init(void)
198 {
199 	if (txa_dev_id_array == NULL) {
200 		int i;
201 
202 		txa_dev_id_array = txa_memzone_array_get("txa_adapter_array",
203 					sizeof(int),
204 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
205 		if (txa_dev_id_array == NULL)
206 			return -ENOMEM;
207 
208 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
209 			txa_dev_id_array[i] = TXA_INVALID_DEV_ID;
210 	}
211 
212 	return 0;
213 }
214 
215 static int
216 txa_init(void)
217 {
218 	return txa_dev_id_array_init();
219 }
220 
221 static int
222 txa_service_data_init(void)
223 {
224 	if (txa_service_data_array == NULL) {
225 		txa_service_data_array =
226 				txa_memzone_array_get("txa_service_data_array",
227 					sizeof(int),
228 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
229 		if (txa_service_data_array == NULL)
230 			return -ENOMEM;
231 	}
232 
233 	return 0;
234 }
235 
236 static inline struct txa_service_data *
237 txa_service_id_to_data(uint8_t id)
238 {
239 	return txa_service_data_array[id];
240 }
241 
242 static inline struct txa_service_queue_info *
243 txa_service_queue(struct txa_service_data *txa, uint16_t port_id,
244 		uint16_t tx_queue_id)
245 {
246 	struct txa_service_queue_info *tqi;
247 
248 	if (unlikely(txa->txa_ethdev == NULL || txa->dev_count < port_id + 1))
249 		return NULL;
250 
251 	tqi = txa->txa_ethdev[port_id].queues;
252 
253 	return likely(tqi != NULL) ? tqi + tx_queue_id : NULL;
254 }
255 
256 static int
257 txa_service_conf_cb(uint8_t __rte_unused id, uint8_t dev_id,
258 		struct rte_event_eth_tx_adapter_conf *conf, void *arg)
259 {
260 	int ret;
261 	struct rte_eventdev *dev;
262 	struct rte_event_port_conf *pc;
263 	struct rte_event_dev_config dev_conf;
264 	int started;
265 	uint8_t port_id;
266 
267 	pc = arg;
268 	dev = &rte_eventdevs[dev_id];
269 	dev_conf = dev->data->dev_conf;
270 
271 	started = dev->data->dev_started;
272 	if (started)
273 		rte_event_dev_stop(dev_id);
274 
275 	port_id = dev_conf.nb_event_ports;
276 	dev_conf.nb_event_ports += 1;
277 
278 	ret = rte_event_dev_configure(dev_id, &dev_conf);
279 	if (ret) {
280 		RTE_EDEV_LOG_ERR("failed to configure event dev %u",
281 						dev_id);
282 		if (started) {
283 			if (rte_event_dev_start(dev_id))
284 				return -EIO;
285 		}
286 		return ret;
287 	}
288 
289 	ret = rte_event_port_setup(dev_id, port_id, pc);
290 	if (ret) {
291 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
292 					port_id);
293 		if (started) {
294 			if (rte_event_dev_start(dev_id))
295 				return -EIO;
296 		}
297 		return ret;
298 	}
299 
300 	conf->event_port_id = port_id;
301 	conf->max_nb_tx = TXA_MAX_NB_TX;
302 	if (started)
303 		ret = rte_event_dev_start(dev_id);
304 	return ret;
305 }
306 
307 static int
308 txa_service_ethdev_alloc(struct txa_service_data *txa)
309 {
310 	struct txa_service_ethdev *txa_ethdev;
311 	uint16_t i, dev_count;
312 
313 	dev_count = rte_eth_dev_count_avail();
314 	if (txa->txa_ethdev && dev_count == txa->dev_count)
315 		return 0;
316 
317 	txa_ethdev = rte_zmalloc_socket(txa->mem_name,
318 					dev_count * sizeof(*txa_ethdev),
319 					0,
320 					txa->socket_id);
321 	if (txa_ethdev == NULL) {
322 		RTE_EDEV_LOG_ERR("Failed to alloc txa::txa_ethdev ");
323 		return -ENOMEM;
324 	}
325 
326 	if (txa->dev_count)
327 		memcpy(txa_ethdev, txa->txa_ethdev,
328 			txa->dev_count * sizeof(*txa_ethdev));
329 
330 	RTE_ETH_FOREACH_DEV(i) {
331 		if (i == dev_count)
332 			break;
333 		txa_ethdev[i].dev = &rte_eth_devices[i];
334 	}
335 
336 	txa->txa_ethdev = txa_ethdev;
337 	txa->dev_count = dev_count;
338 	return 0;
339 }
340 
341 static int
342 txa_service_queue_array_alloc(struct txa_service_data *txa,
343 			uint16_t port_id)
344 {
345 	struct txa_service_queue_info *tqi;
346 	uint16_t nb_queue;
347 	int ret;
348 
349 	ret = txa_service_ethdev_alloc(txa);
350 	if (ret != 0)
351 		return ret;
352 
353 	if (txa->txa_ethdev[port_id].queues)
354 		return 0;
355 
356 	nb_queue = txa->txa_ethdev[port_id].dev->data->nb_tx_queues;
357 	tqi = rte_zmalloc_socket(txa->mem_name,
358 				nb_queue *
359 				sizeof(struct txa_service_queue_info), 0,
360 				txa->socket_id);
361 	if (tqi == NULL)
362 		return -ENOMEM;
363 	txa->txa_ethdev[port_id].queues = tqi;
364 	return 0;
365 }
366 
367 static void
368 txa_service_queue_array_free(struct txa_service_data *txa,
369 			uint16_t port_id)
370 {
371 	struct txa_service_ethdev *txa_ethdev;
372 	struct txa_service_queue_info *tqi;
373 
374 	txa_ethdev = &txa->txa_ethdev[port_id];
375 	if (txa->txa_ethdev == NULL || txa_ethdev->nb_queues != 0)
376 		return;
377 
378 	tqi = txa_ethdev->queues;
379 	txa_ethdev->queues = NULL;
380 	rte_free(tqi);
381 
382 	if (txa->nb_queues == 0) {
383 		rte_free(txa->txa_ethdev);
384 		txa->txa_ethdev = NULL;
385 	}
386 }
387 
388 static void
389 txa_service_unregister(struct txa_service_data *txa)
390 {
391 	if (txa->service_id != TXA_INVALID_SERVICE_ID) {
392 		rte_service_component_runstate_set(txa->service_id, 0);
393 		while (rte_service_may_be_active(txa->service_id))
394 			rte_pause();
395 		rte_service_component_unregister(txa->service_id);
396 	}
397 	txa->service_id = TXA_INVALID_SERVICE_ID;
398 }
399 
400 static int
401 txa_service_register(struct txa_service_data *txa)
402 {
403 	int ret;
404 	struct rte_service_spec service;
405 	struct rte_event_eth_tx_adapter_conf conf;
406 
407 	if (txa->service_id != TXA_INVALID_SERVICE_ID)
408 		return 0;
409 
410 	memset(&service, 0, sizeof(service));
411 	snprintf(service.name, TXA_SERVICE_NAME_LEN, "txa_%d", txa->id);
412 	service.socket_id = txa->socket_id;
413 	service.callback = txa_service_func;
414 	service.callback_userdata = txa;
415 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
416 	ret = rte_service_component_register(&service,
417 					(uint32_t *)&txa->service_id);
418 	if (ret) {
419 		RTE_EDEV_LOG_ERR("failed to register service %s err = %"
420 				 PRId32, service.name, ret);
421 		return ret;
422 	}
423 
424 	ret = txa->conf_cb(txa->id, txa->eventdev_id, &conf, txa->conf_arg);
425 	if (ret) {
426 		txa_service_unregister(txa);
427 		return ret;
428 	}
429 
430 	rte_service_component_runstate_set(txa->service_id, 1);
431 	txa->port_id = conf.event_port_id;
432 	txa->max_nb_tx = conf.max_nb_tx;
433 	return 0;
434 }
435 
436 static struct rte_eth_dev_tx_buffer *
437 txa_service_tx_buf_alloc(struct txa_service_data *txa,
438 			const struct rte_eth_dev *dev)
439 {
440 	struct rte_eth_dev_tx_buffer *tb;
441 	uint16_t port_id;
442 
443 	port_id = dev->data->port_id;
444 	tb = rte_zmalloc_socket(txa->mem_name,
445 				RTE_ETH_TX_BUFFER_SIZE(TXA_BATCH_SIZE),
446 				0,
447 				rte_eth_dev_socket_id(port_id));
448 	if (tb == NULL)
449 		RTE_EDEV_LOG_ERR("Failed to allocate memory for tx buffer");
450 	return tb;
451 }
452 
453 static int
454 txa_service_is_queue_added(struct txa_service_data *txa,
455 			const struct rte_eth_dev *dev,
456 			uint16_t tx_queue_id)
457 {
458 	struct txa_service_queue_info *tqi;
459 
460 	tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
461 	return tqi && tqi->added;
462 }
463 
464 static int
465 txa_service_ctrl(uint8_t id, int start)
466 {
467 	int ret;
468 	struct txa_service_data *txa;
469 
470 	txa = txa_service_id_to_data(id);
471 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
472 		return 0;
473 
474 	ret = rte_service_runstate_set(txa->service_id, start);
475 	if (ret == 0 && !start) {
476 		while (rte_service_may_be_active(txa->service_id))
477 			rte_pause();
478 	}
479 	return ret;
480 }
481 
482 static void
483 txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
484 			void *userdata)
485 {
486 	struct txa_retry *tr;
487 	struct txa_service_data *data;
488 	struct rte_event_eth_tx_adapter_stats *stats;
489 	uint16_t sent = 0;
490 	unsigned int retry = 0;
491 	uint16_t i, n;
492 
493 	tr = (struct txa_retry *)(uintptr_t)userdata;
494 	data = txa_service_id_to_data(tr->id);
495 	stats = &data->stats;
496 
497 	do {
498 		n = rte_eth_tx_burst(tr->port_id, tr->tx_queue,
499 			       &pkts[sent], unsent - sent);
500 
501 		sent += n;
502 	} while (sent != unsent && retry++ < TXA_RETRY_CNT);
503 
504 	for (i = sent; i < unsent; i++)
505 		rte_pktmbuf_free(pkts[i]);
506 
507 	stats->tx_retry += retry;
508 	stats->tx_packets += sent;
509 	stats->tx_dropped += unsent - sent;
510 }
511 
512 static uint16_t
513 txa_process_event_vector(struct txa_service_data *txa,
514 			 struct rte_event_vector *vec)
515 {
516 	struct txa_service_queue_info *tqi;
517 	uint16_t port, queue, nb_tx = 0;
518 	struct rte_mbuf **mbufs;
519 	int i;
520 
521 	mbufs = (struct rte_mbuf **)vec->mbufs;
522 	if (vec->attr_valid) {
523 		port = vec->port;
524 		queue = vec->queue;
525 		tqi = txa_service_queue(txa, port, queue);
526 		if (unlikely(tqi == NULL || !tqi->added)) {
527 			rte_pktmbuf_free_bulk(mbufs, vec->nb_elem);
528 			rte_mempool_put(rte_mempool_from_obj(vec), vec);
529 			return 0;
530 		}
531 		for (i = 0; i < vec->nb_elem; i++) {
532 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
533 						   mbufs[i]);
534 		}
535 	} else {
536 		for (i = 0; i < vec->nb_elem; i++) {
537 			port = mbufs[i]->port;
538 			queue = rte_event_eth_tx_adapter_txq_get(mbufs[i]);
539 			tqi = txa_service_queue(txa, port, queue);
540 			if (unlikely(tqi == NULL || !tqi->added)) {
541 				rte_pktmbuf_free(mbufs[i]);
542 				continue;
543 			}
544 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
545 						   mbufs[i]);
546 		}
547 	}
548 	rte_mempool_put(rte_mempool_from_obj(vec), vec);
549 
550 	return nb_tx;
551 }
552 
553 static void
554 txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
555 	uint32_t n)
556 {
557 	uint32_t i;
558 	uint16_t nb_tx;
559 	struct rte_event_eth_tx_adapter_stats *stats;
560 
561 	stats = &txa->stats;
562 
563 	nb_tx = 0;
564 	for (i = 0; i < n; i++) {
565 		uint16_t port;
566 		uint16_t queue;
567 		struct txa_service_queue_info *tqi;
568 
569 		if (!(ev[i].event_type & RTE_EVENT_TYPE_VECTOR)) {
570 			struct rte_mbuf *m;
571 
572 			m = ev[i].mbuf;
573 			port = m->port;
574 			queue = rte_event_eth_tx_adapter_txq_get(m);
575 
576 			tqi = txa_service_queue(txa, port, queue);
577 			if (unlikely(tqi == NULL || !tqi->added)) {
578 				rte_pktmbuf_free(m);
579 				continue;
580 			}
581 
582 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
583 		} else {
584 			nb_tx += txa_process_event_vector(txa, ev[i].vec);
585 		}
586 	}
587 
588 	stats->tx_packets += nb_tx;
589 }
590 
591 static int32_t
592 txa_service_func(void *args)
593 {
594 	struct txa_service_data *txa = args;
595 	uint8_t dev_id;
596 	uint8_t port;
597 	uint16_t n;
598 	uint32_t nb_tx, max_nb_tx;
599 	struct rte_event ev[TXA_BATCH_SIZE];
600 
601 	dev_id = txa->eventdev_id;
602 	max_nb_tx = txa->max_nb_tx;
603 	port = txa->port_id;
604 
605 	if (txa->nb_queues == 0)
606 		return 0;
607 
608 	if (!rte_spinlock_trylock(&txa->tx_lock))
609 		return 0;
610 
611 	for (nb_tx = 0; nb_tx < max_nb_tx; nb_tx += n) {
612 
613 		n = rte_event_dequeue_burst(dev_id, port, ev, RTE_DIM(ev), 0);
614 		if (!n)
615 			break;
616 		txa_service_tx(txa, ev, n);
617 	}
618 
619 	if ((txa->loop_cnt++ & (TXA_FLUSH_THRESHOLD - 1)) == 0) {
620 
621 		struct txa_service_ethdev *tdi;
622 		struct txa_service_queue_info *tqi;
623 		struct rte_eth_dev *dev;
624 		uint16_t i;
625 
626 		tdi = txa->txa_ethdev;
627 		nb_tx = 0;
628 
629 		RTE_ETH_FOREACH_DEV(i) {
630 			uint16_t q;
631 
632 			if (i == txa->dev_count)
633 				break;
634 
635 			dev = tdi[i].dev;
636 			if (tdi[i].nb_queues == 0)
637 				continue;
638 			for (q = 0; q < dev->data->nb_tx_queues; q++) {
639 
640 				tqi = txa_service_queue(txa, i, q);
641 				if (unlikely(tqi == NULL || !tqi->added))
642 					continue;
643 
644 				nb_tx += rte_eth_tx_buffer_flush(i, q,
645 							tqi->tx_buf);
646 			}
647 		}
648 
649 		txa->stats.tx_packets += nb_tx;
650 	}
651 	rte_spinlock_unlock(&txa->tx_lock);
652 	return 0;
653 }
654 
655 static int
656 txa_service_adapter_create(uint8_t id, struct rte_eventdev *dev,
657 			struct rte_event_port_conf *port_conf)
658 {
659 	struct txa_service_data *txa;
660 	struct rte_event_port_conf *cb_conf;
661 	int ret;
662 
663 	cb_conf = rte_malloc(NULL, sizeof(*cb_conf), 0);
664 	if (cb_conf == NULL)
665 		return -ENOMEM;
666 
667 	*cb_conf = *port_conf;
668 	ret = txa_service_adapter_create_ext(id, dev, txa_service_conf_cb,
669 					cb_conf);
670 	if (ret) {
671 		rte_free(cb_conf);
672 		return ret;
673 	}
674 
675 	txa = txa_service_id_to_data(id);
676 	txa->conf_free = 1;
677 	return ret;
678 }
679 
680 static int
681 txa_service_adapter_create_ext(uint8_t id, struct rte_eventdev *dev,
682 			rte_event_eth_tx_adapter_conf_cb conf_cb,
683 			void *conf_arg)
684 {
685 	struct txa_service_data *txa;
686 	int socket_id;
687 	char mem_name[TXA_SERVICE_NAME_LEN];
688 	int ret;
689 
690 	if (conf_cb == NULL)
691 		return -EINVAL;
692 
693 	socket_id = dev->data->socket_id;
694 	snprintf(mem_name, TXA_MEM_NAME_LEN,
695 		"rte_event_eth_txa_%d",
696 		id);
697 
698 	ret = txa_service_data_init();
699 	if (ret != 0)
700 		return ret;
701 
702 	txa = rte_zmalloc_socket(mem_name,
703 				sizeof(*txa),
704 				RTE_CACHE_LINE_SIZE, socket_id);
705 	if (txa == NULL) {
706 		RTE_EDEV_LOG_ERR("failed to get mem for tx adapter");
707 		return -ENOMEM;
708 	}
709 
710 	txa->id = id;
711 	txa->eventdev_id = dev->data->dev_id;
712 	txa->socket_id = socket_id;
713 	strncpy(txa->mem_name, mem_name, TXA_SERVICE_NAME_LEN);
714 	txa->conf_cb = conf_cb;
715 	txa->conf_arg = conf_arg;
716 	txa->service_id = TXA_INVALID_SERVICE_ID;
717 	rte_spinlock_init(&txa->tx_lock);
718 	txa_service_data_array[id] = txa;
719 
720 	return 0;
721 }
722 
723 static int
724 txa_service_event_port_get(uint8_t id, uint8_t *port)
725 {
726 	struct txa_service_data *txa;
727 
728 	txa = txa_service_id_to_data(id);
729 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
730 		return -ENODEV;
731 
732 	*port = txa->port_id;
733 	return 0;
734 }
735 
736 static int
737 txa_service_adapter_free(uint8_t id)
738 {
739 	struct txa_service_data *txa;
740 
741 	txa = txa_service_id_to_data(id);
742 	if (txa->nb_queues) {
743 		RTE_EDEV_LOG_ERR("%" PRIu16 " Tx queues not deleted",
744 				txa->nb_queues);
745 		return -EBUSY;
746 	}
747 
748 	if (txa->conf_free)
749 		rte_free(txa->conf_arg);
750 	rte_free(txa);
751 	return 0;
752 }
753 
754 static int
755 txa_service_queue_add(uint8_t id,
756 		__rte_unused struct rte_eventdev *dev,
757 		const struct rte_eth_dev *eth_dev,
758 		int32_t tx_queue_id)
759 {
760 	struct txa_service_data *txa;
761 	struct txa_service_ethdev *tdi;
762 	struct txa_service_queue_info *tqi;
763 	struct rte_eth_dev_tx_buffer *tb;
764 	struct txa_retry *txa_retry;
765 	int ret = 0;
766 
767 	txa = txa_service_id_to_data(id);
768 
769 	if (tx_queue_id == -1) {
770 		int nb_queues;
771 		uint16_t i, j;
772 		uint16_t *qdone;
773 
774 		nb_queues = eth_dev->data->nb_tx_queues;
775 		if (txa->dev_count > eth_dev->data->port_id) {
776 			tdi = &txa->txa_ethdev[eth_dev->data->port_id];
777 			nb_queues -= tdi->nb_queues;
778 		}
779 
780 		qdone = rte_zmalloc(txa->mem_name,
781 				nb_queues * sizeof(*qdone), 0);
782 		if (qdone == NULL)
783 			return -ENOMEM;
784 		j = 0;
785 		for (i = 0; i < nb_queues; i++) {
786 			if (txa_service_is_queue_added(txa, eth_dev, i))
787 				continue;
788 			ret = txa_service_queue_add(id, dev, eth_dev, i);
789 			if (ret == 0)
790 				qdone[j++] = i;
791 			else
792 				break;
793 		}
794 
795 		if (i != nb_queues) {
796 			for (i = 0; i < j; i++)
797 				txa_service_queue_del(id, eth_dev, qdone[i]);
798 		}
799 		rte_free(qdone);
800 		return ret;
801 	}
802 
803 	ret = txa_service_register(txa);
804 	if (ret)
805 		return ret;
806 
807 	rte_spinlock_lock(&txa->tx_lock);
808 
809 	if (txa_service_is_queue_added(txa, eth_dev, tx_queue_id))
810 		goto ret_unlock;
811 
812 	ret = txa_service_queue_array_alloc(txa, eth_dev->data->port_id);
813 	if (ret)
814 		goto err_unlock;
815 
816 	tb = txa_service_tx_buf_alloc(txa, eth_dev);
817 	if (tb == NULL)
818 		goto err_unlock;
819 
820 	tdi = &txa->txa_ethdev[eth_dev->data->port_id];
821 	tqi = txa_service_queue(txa, eth_dev->data->port_id, tx_queue_id);
822 	if (tqi == NULL)
823 		goto err_unlock;
824 
825 	txa_retry = &tqi->txa_retry;
826 	txa_retry->id = txa->id;
827 	txa_retry->port_id = eth_dev->data->port_id;
828 	txa_retry->tx_queue = tx_queue_id;
829 
830 	rte_eth_tx_buffer_init(tb, TXA_BATCH_SIZE);
831 	rte_eth_tx_buffer_set_err_callback(tb,
832 		txa_service_buffer_retry, txa_retry);
833 
834 	tqi->tx_buf = tb;
835 	tqi->added = 1;
836 	tdi->nb_queues++;
837 	txa->nb_queues++;
838 
839 ret_unlock:
840 	rte_spinlock_unlock(&txa->tx_lock);
841 	return 0;
842 
843 err_unlock:
844 	if (txa->nb_queues == 0) {
845 		txa_service_queue_array_free(txa,
846 					eth_dev->data->port_id);
847 		txa_service_unregister(txa);
848 	}
849 
850 	rte_spinlock_unlock(&txa->tx_lock);
851 	return -1;
852 }
853 
854 static int
855 txa_service_queue_del(uint8_t id,
856 		const struct rte_eth_dev *dev,
857 		int32_t tx_queue_id)
858 {
859 	struct txa_service_data *txa;
860 	struct txa_service_queue_info *tqi;
861 	struct rte_eth_dev_tx_buffer *tb;
862 	uint16_t port_id;
863 
864 	txa = txa_service_id_to_data(id);
865 	port_id = dev->data->port_id;
866 
867 	if (tx_queue_id == -1) {
868 		uint16_t i, q, nb_queues;
869 		int ret = 0;
870 
871 		nb_queues = txa->txa_ethdev[port_id].nb_queues;
872 		if (nb_queues == 0)
873 			return 0;
874 
875 		i = 0;
876 		q = 0;
877 		tqi = txa->txa_ethdev[port_id].queues;
878 
879 		while (i < nb_queues) {
880 
881 			if (tqi[q].added) {
882 				ret = txa_service_queue_del(id, dev, q);
883 				if (ret != 0)
884 					break;
885 			}
886 			i++;
887 			q++;
888 		}
889 		return ret;
890 	}
891 
892 	txa = txa_service_id_to_data(id);
893 
894 	tqi = txa_service_queue(txa, port_id, tx_queue_id);
895 	if (tqi == NULL || !tqi->added)
896 		return 0;
897 
898 	tb = tqi->tx_buf;
899 	tqi->added = 0;
900 	tqi->tx_buf = NULL;
901 	rte_free(tb);
902 	txa->nb_queues--;
903 	txa->txa_ethdev[port_id].nb_queues--;
904 
905 	txa_service_queue_array_free(txa, port_id);
906 	return 0;
907 }
908 
909 static int
910 txa_service_id_get(uint8_t id, uint32_t *service_id)
911 {
912 	struct txa_service_data *txa;
913 
914 	txa = txa_service_id_to_data(id);
915 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
916 		return -ESRCH;
917 
918 	if (service_id == NULL)
919 		return -EINVAL;
920 
921 	*service_id = txa->service_id;
922 	return 0;
923 }
924 
925 static int
926 txa_service_start(uint8_t id)
927 {
928 	return txa_service_ctrl(id, 1);
929 }
930 
931 static int
932 txa_service_stats_get(uint8_t id,
933 		struct rte_event_eth_tx_adapter_stats *stats)
934 {
935 	struct txa_service_data *txa;
936 
937 	txa = txa_service_id_to_data(id);
938 	*stats = txa->stats;
939 	return 0;
940 }
941 
942 static int
943 txa_service_stats_reset(uint8_t id)
944 {
945 	struct txa_service_data *txa;
946 
947 	txa = txa_service_id_to_data(id);
948 	memset(&txa->stats, 0, sizeof(txa->stats));
949 	return 0;
950 }
951 
952 static int
953 txa_service_stop(uint8_t id)
954 {
955 	return txa_service_ctrl(id, 0);
956 }
957 
958 
959 int
960 rte_event_eth_tx_adapter_create(uint8_t id, uint8_t dev_id,
961 				struct rte_event_port_conf *port_conf)
962 {
963 	struct rte_eventdev *dev;
964 	int ret;
965 
966 	if (port_conf == NULL)
967 		return -EINVAL;
968 
969 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
970 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
971 
972 	dev = &rte_eventdevs[dev_id];
973 
974 	ret = txa_init();
975 	if (ret != 0)
976 		return ret;
977 
978 	if (txa_adapter_exist(id))
979 		return -EEXIST;
980 
981 	txa_dev_id_array[id] = dev_id;
982 	if (txa_dev_adapter_create(id))
983 		ret = txa_dev_adapter_create(id)(id, dev);
984 
985 	if (ret != 0) {
986 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
987 		return ret;
988 	}
989 
990 	ret = txa_service_adapter_create(id, dev, port_conf);
991 	if (ret != 0) {
992 		if (txa_dev_adapter_free(id))
993 			txa_dev_adapter_free(id)(id, dev);
994 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
995 		return ret;
996 	}
997 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, NULL, port_conf,
998 		ret);
999 	txa_dev_id_array[id] = dev_id;
1000 	return 0;
1001 }
1002 
1003 int
1004 rte_event_eth_tx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1005 				rte_event_eth_tx_adapter_conf_cb conf_cb,
1006 				void *conf_arg)
1007 {
1008 	struct rte_eventdev *dev;
1009 	int ret;
1010 
1011 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1012 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1013 
1014 	ret = txa_init();
1015 	if (ret != 0)
1016 		return ret;
1017 
1018 	if (txa_adapter_exist(id))
1019 		return -EINVAL;
1020 
1021 	dev = &rte_eventdevs[dev_id];
1022 
1023 	txa_dev_id_array[id] = dev_id;
1024 	if (txa_dev_adapter_create_ext(id))
1025 		ret = txa_dev_adapter_create_ext(id)(id, dev);
1026 
1027 	if (ret != 0) {
1028 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1029 		return ret;
1030 	}
1031 
1032 	ret = txa_service_adapter_create_ext(id, dev, conf_cb, conf_arg);
1033 	if (ret != 0) {
1034 		if (txa_dev_adapter_free(id))
1035 			txa_dev_adapter_free(id)(id, dev);
1036 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1037 		return ret;
1038 	}
1039 
1040 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, conf_cb, conf_arg,
1041 		ret);
1042 	txa_dev_id_array[id] = dev_id;
1043 	return 0;
1044 }
1045 
1046 
1047 int
1048 rte_event_eth_tx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
1049 {
1050 	TXA_CHECK_OR_ERR_RET(id);
1051 
1052 	return txa_service_event_port_get(id, event_port_id);
1053 }
1054 
1055 int
1056 rte_event_eth_tx_adapter_free(uint8_t id)
1057 {
1058 	int ret;
1059 
1060 	TXA_CHECK_OR_ERR_RET(id);
1061 
1062 	ret = txa_dev_adapter_free(id) ?
1063 		txa_dev_adapter_free(id)(id, txa_evdev(id)) :
1064 		0;
1065 
1066 	if (ret == 0)
1067 		ret = txa_service_adapter_free(id);
1068 	txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1069 
1070 	rte_eventdev_trace_eth_tx_adapter_free(id, ret);
1071 	return ret;
1072 }
1073 
1074 int
1075 rte_event_eth_tx_adapter_queue_add(uint8_t id,
1076 				uint16_t eth_dev_id,
1077 				int32_t queue)
1078 {
1079 	struct rte_eth_dev *eth_dev;
1080 	int ret;
1081 	uint32_t caps;
1082 
1083 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1084 	TXA_CHECK_OR_ERR_RET(id);
1085 
1086 	eth_dev = &rte_eth_devices[eth_dev_id];
1087 	TXA_CHECK_TXQ(eth_dev, queue);
1088 
1089 	caps = 0;
1090 	if (txa_dev_caps_get(id))
1091 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1092 
1093 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1094 		ret =  txa_dev_queue_add(id) ?
1095 					txa_dev_queue_add(id)(id,
1096 							txa_evdev(id),
1097 							eth_dev,
1098 							queue) : 0;
1099 	else
1100 		ret = txa_service_queue_add(id, txa_evdev(id), eth_dev, queue);
1101 
1102 	rte_eventdev_trace_eth_tx_adapter_queue_add(id, eth_dev_id, queue,
1103 		ret);
1104 	return ret;
1105 }
1106 
1107 int
1108 rte_event_eth_tx_adapter_queue_del(uint8_t id,
1109 				uint16_t eth_dev_id,
1110 				int32_t queue)
1111 {
1112 	struct rte_eth_dev *eth_dev;
1113 	int ret;
1114 	uint32_t caps;
1115 
1116 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1117 	TXA_CHECK_OR_ERR_RET(id);
1118 
1119 	eth_dev = &rte_eth_devices[eth_dev_id];
1120 
1121 	caps = 0;
1122 
1123 	if (txa_dev_caps_get(id))
1124 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1125 
1126 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1127 		ret =  txa_dev_queue_del(id) ?
1128 					txa_dev_queue_del(id)(id, txa_evdev(id),
1129 							eth_dev,
1130 							queue) : 0;
1131 	else
1132 		ret = txa_service_queue_del(id, eth_dev, queue);
1133 
1134 	rte_eventdev_trace_eth_tx_adapter_queue_del(id, eth_dev_id, queue,
1135 		ret);
1136 	return ret;
1137 }
1138 
1139 int
1140 rte_event_eth_tx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1141 {
1142 	TXA_CHECK_OR_ERR_RET(id);
1143 
1144 	return txa_service_id_get(id, service_id);
1145 }
1146 
1147 int
1148 rte_event_eth_tx_adapter_start(uint8_t id)
1149 {
1150 	int ret;
1151 
1152 	TXA_CHECK_OR_ERR_RET(id);
1153 
1154 	ret = txa_dev_start(id) ? txa_dev_start(id)(id, txa_evdev(id)) : 0;
1155 	if (ret == 0)
1156 		ret = txa_service_start(id);
1157 	rte_eventdev_trace_eth_tx_adapter_start(id, ret);
1158 	return ret;
1159 }
1160 
1161 int
1162 rte_event_eth_tx_adapter_stats_get(uint8_t id,
1163 				struct rte_event_eth_tx_adapter_stats *stats)
1164 {
1165 	int ret;
1166 
1167 	TXA_CHECK_OR_ERR_RET(id);
1168 
1169 	if (stats == NULL)
1170 		return -EINVAL;
1171 
1172 	*stats = (struct rte_event_eth_tx_adapter_stats){0};
1173 
1174 	ret = txa_dev_stats_get(id) ?
1175 			txa_dev_stats_get(id)(id, txa_evdev(id), stats) : 0;
1176 
1177 	if (ret == 0 && txa_service_id_get(id, NULL) != ESRCH) {
1178 		if (txa_dev_stats_get(id)) {
1179 			struct rte_event_eth_tx_adapter_stats service_stats;
1180 
1181 			ret = txa_service_stats_get(id, &service_stats);
1182 			if (ret == 0) {
1183 				stats->tx_retry += service_stats.tx_retry;
1184 				stats->tx_packets += service_stats.tx_packets;
1185 				stats->tx_dropped += service_stats.tx_dropped;
1186 			}
1187 		} else
1188 			ret = txa_service_stats_get(id, stats);
1189 	}
1190 
1191 	return ret;
1192 }
1193 
1194 int
1195 rte_event_eth_tx_adapter_stats_reset(uint8_t id)
1196 {
1197 	int ret;
1198 
1199 	TXA_CHECK_OR_ERR_RET(id);
1200 
1201 	ret = txa_dev_stats_reset(id) ?
1202 		txa_dev_stats_reset(id)(id, txa_evdev(id)) : 0;
1203 	if (ret == 0)
1204 		ret = txa_service_stats_reset(id);
1205 	return ret;
1206 }
1207 
1208 int
1209 rte_event_eth_tx_adapter_stop(uint8_t id)
1210 {
1211 	int ret;
1212 
1213 	TXA_CHECK_OR_ERR_RET(id);
1214 
1215 	ret = txa_dev_stop(id) ? txa_dev_stop(id)(id,  txa_evdev(id)) : 0;
1216 	if (ret == 0)
1217 		ret = txa_service_stop(id);
1218 	rte_eventdev_trace_eth_tx_adapter_stop(id, ret);
1219 	return ret;
1220 }
1221