xref: /dpdk/lib/eventdev/rte_event_eth_tx_adapter.c (revision e5fb1a9698e7111473ca0980fdf6c0edb7acdf91)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation.
3  */
4 #include <rte_spinlock.h>
5 #include <rte_service_component.h>
6 #include <rte_ethdev.h>
7 
8 #include "eventdev_pmd.h"
9 #include "rte_eventdev_trace.h"
10 #include "rte_event_eth_tx_adapter.h"
11 
12 #define TXA_BATCH_SIZE		32
13 #define TXA_SERVICE_NAME_LEN	32
14 #define TXA_MEM_NAME_LEN	32
15 #define TXA_FLUSH_THRESHOLD	1024
16 #define TXA_RETRY_CNT		100
17 #define TXA_MAX_NB_TX		128
18 #define TXA_INVALID_DEV_ID	INT32_C(-1)
19 #define TXA_INVALID_SERVICE_ID	INT64_C(-1)
20 
21 #define txa_evdev(id) (&rte_eventdevs[txa_dev_id_array[(id)]])
22 
23 #define txa_dev_caps_get(id) txa_evdev((id))->dev_ops->eth_tx_adapter_caps_get
24 
25 #define txa_dev_adapter_create(t) txa_evdev(t)->dev_ops->eth_tx_adapter_create
26 
27 #define txa_dev_adapter_create_ext(t) \
28 				txa_evdev(t)->dev_ops->eth_tx_adapter_create
29 
30 #define txa_dev_adapter_free(t) txa_evdev(t)->dev_ops->eth_tx_adapter_free
31 
32 #define txa_dev_queue_add(id) txa_evdev(id)->dev_ops->eth_tx_adapter_queue_add
33 
34 #define txa_dev_queue_del(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_del
35 
36 #define txa_dev_start(t) txa_evdev(t)->dev_ops->eth_tx_adapter_start
37 
38 #define txa_dev_stop(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stop
39 
40 #define txa_dev_stats_reset(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_reset
41 
42 #define txa_dev_stats_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_get
43 
44 #define RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) \
45 do { \
46 	if (!txa_valid_id(id)) { \
47 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d", id); \
48 		return retval; \
49 	} \
50 } while (0)
51 
52 #define TXA_CHECK_OR_ERR_RET(id) \
53 do {\
54 	int ret; \
55 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET((id), -EINVAL); \
56 	ret = txa_init(); \
57 	if (ret != 0) \
58 		return ret; \
59 	if (!txa_adapter_exist((id))) \
60 		return -EINVAL; \
61 } while (0)
62 
63 #define TXA_CHECK_TXQ(dev, queue) \
64 do {\
65 	if ((dev)->data->nb_tx_queues == 0) { \
66 		RTE_EDEV_LOG_ERR("No tx queues configured"); \
67 		return -EINVAL; \
68 	} \
69 	if ((queue) != -1 && \
70 		(uint16_t)(queue) >= (dev)->data->nb_tx_queues) { \
71 		RTE_EDEV_LOG_ERR("Invalid tx queue_id %" PRIu16, \
72 				(uint16_t)(queue)); \
73 		return -EINVAL; \
74 	} \
75 } while (0)
76 
77 /* Tx retry callback structure */
78 struct txa_retry {
79 	/* Ethernet port id */
80 	uint16_t port_id;
81 	/* Tx queue */
82 	uint16_t tx_queue;
83 	/* Adapter ID */
84 	uint8_t id;
85 };
86 
87 /* Per queue structure */
88 struct txa_service_queue_info {
89 	/* Queue has been added */
90 	uint8_t added;
91 	/* Retry callback argument */
92 	struct txa_retry txa_retry;
93 	/* Tx buffer */
94 	struct rte_eth_dev_tx_buffer *tx_buf;
95 };
96 
97 /* PMD private structure */
98 struct txa_service_data {
99 	/* Max mbufs processed in any service function invocation */
100 	uint32_t max_nb_tx;
101 	/* Number of Tx queues in adapter */
102 	uint32_t nb_queues;
103 	/*  Synchronization with data path */
104 	rte_spinlock_t tx_lock;
105 	/* Event port ID */
106 	uint8_t port_id;
107 	/* Event device identifier */
108 	uint8_t eventdev_id;
109 	/* Highest port id supported + 1 */
110 	uint16_t dev_count;
111 	/* Loop count to flush Tx buffers */
112 	int loop_cnt;
113 	/* Per ethernet device structure */
114 	struct txa_service_ethdev *txa_ethdev;
115 	/* Statistics */
116 	struct rte_event_eth_tx_adapter_stats stats;
117 	/* Adapter Identifier */
118 	uint8_t id;
119 	/* Conf arg must be freed */
120 	uint8_t conf_free;
121 	/* Configuration callback */
122 	rte_event_eth_tx_adapter_conf_cb conf_cb;
123 	/* Configuration callback argument */
124 	void *conf_arg;
125 	/* socket id */
126 	int socket_id;
127 	/* Per adapter EAL service */
128 	int64_t service_id;
129 	/* Memory allocation name */
130 	char mem_name[TXA_MEM_NAME_LEN];
131 } __rte_cache_aligned;
132 
133 /* Per eth device structure */
134 struct txa_service_ethdev {
135 	/* Pointer to ethernet device */
136 	struct rte_eth_dev *dev;
137 	/* Number of queues added */
138 	uint16_t nb_queues;
139 	/* PMD specific queue data */
140 	void *queues;
141 };
142 
143 /* Array of adapter instances, initialized with event device id
144  * when adapter is created
145  */
146 static int *txa_dev_id_array;
147 
148 /* Array of pointers to service implementation data */
149 static struct txa_service_data **txa_service_data_array;
150 
151 static int32_t txa_service_func(void *args);
152 static int txa_service_adapter_create_ext(uint8_t id,
153 			struct rte_eventdev *dev,
154 			rte_event_eth_tx_adapter_conf_cb conf_cb,
155 			void *conf_arg);
156 static int txa_service_queue_del(uint8_t id,
157 				const struct rte_eth_dev *dev,
158 				int32_t tx_queue_id);
159 
160 static int
161 txa_adapter_exist(uint8_t id)
162 {
163 	return txa_dev_id_array[id] != TXA_INVALID_DEV_ID;
164 }
165 
166 static inline int
167 txa_valid_id(uint8_t id)
168 {
169 	return id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
170 }
171 
172 static void *
173 txa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
174 {
175 	const struct rte_memzone *mz;
176 	unsigned int sz;
177 
178 	sz = elt_size * nb_elems;
179 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
180 
181 	mz = rte_memzone_lookup(name);
182 	if (mz == NULL) {
183 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
184 						 RTE_CACHE_LINE_SIZE);
185 		if (mz == NULL) {
186 			RTE_EDEV_LOG_ERR("failed to reserve memzone"
187 					" name = %s err = %"
188 					PRId32, name, rte_errno);
189 			return NULL;
190 		}
191 	}
192 
193 	return  mz->addr;
194 }
195 
196 static int
197 txa_dev_id_array_init(void)
198 {
199 	if (txa_dev_id_array == NULL) {
200 		int i;
201 
202 		txa_dev_id_array = txa_memzone_array_get("txa_adapter_array",
203 					sizeof(int),
204 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
205 		if (txa_dev_id_array == NULL)
206 			return -ENOMEM;
207 
208 		for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
209 			txa_dev_id_array[i] = TXA_INVALID_DEV_ID;
210 	}
211 
212 	return 0;
213 }
214 
215 static int
216 txa_init(void)
217 {
218 	return txa_dev_id_array_init();
219 }
220 
221 static int
222 txa_service_data_init(void)
223 {
224 	if (txa_service_data_array == NULL) {
225 		txa_service_data_array =
226 				txa_memzone_array_get("txa_service_data_array",
227 					sizeof(int),
228 					RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
229 		if (txa_service_data_array == NULL)
230 			return -ENOMEM;
231 	}
232 
233 	return 0;
234 }
235 
236 static inline struct txa_service_data *
237 txa_service_id_to_data(uint8_t id)
238 {
239 	return txa_service_data_array[id];
240 }
241 
242 static inline struct txa_service_queue_info *
243 txa_service_queue(struct txa_service_data *txa, uint16_t port_id,
244 		uint16_t tx_queue_id)
245 {
246 	struct txa_service_queue_info *tqi;
247 
248 	if (unlikely(txa->txa_ethdev == NULL || txa->dev_count < port_id + 1))
249 		return NULL;
250 
251 	tqi = txa->txa_ethdev[port_id].queues;
252 
253 	return likely(tqi != NULL) ? tqi + tx_queue_id : NULL;
254 }
255 
256 static int
257 txa_service_conf_cb(uint8_t __rte_unused id, uint8_t dev_id,
258 		struct rte_event_eth_tx_adapter_conf *conf, void *arg)
259 {
260 	int ret;
261 	struct rte_eventdev *dev;
262 	struct rte_event_port_conf *pc;
263 	struct rte_event_dev_config dev_conf;
264 	int started;
265 	uint8_t port_id;
266 
267 	pc = arg;
268 	dev = &rte_eventdevs[dev_id];
269 	dev_conf = dev->data->dev_conf;
270 
271 	started = dev->data->dev_started;
272 	if (started)
273 		rte_event_dev_stop(dev_id);
274 
275 	port_id = dev_conf.nb_event_ports;
276 	dev_conf.nb_event_ports += 1;
277 
278 	ret = rte_event_dev_configure(dev_id, &dev_conf);
279 	if (ret) {
280 		RTE_EDEV_LOG_ERR("failed to configure event dev %u",
281 						dev_id);
282 		if (started) {
283 			if (rte_event_dev_start(dev_id))
284 				return -EIO;
285 		}
286 		return ret;
287 	}
288 
289 	pc->event_port_cfg = 0;
290 	ret = rte_event_port_setup(dev_id, port_id, pc);
291 	if (ret) {
292 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
293 					port_id);
294 		if (started) {
295 			if (rte_event_dev_start(dev_id))
296 				return -EIO;
297 		}
298 		return ret;
299 	}
300 
301 	conf->event_port_id = port_id;
302 	conf->max_nb_tx = TXA_MAX_NB_TX;
303 	if (started)
304 		ret = rte_event_dev_start(dev_id);
305 	return ret;
306 }
307 
308 static int
309 txa_service_ethdev_alloc(struct txa_service_data *txa)
310 {
311 	struct txa_service_ethdev *txa_ethdev;
312 	uint16_t i, dev_count;
313 
314 	dev_count = rte_eth_dev_count_avail();
315 	if (txa->txa_ethdev && dev_count == txa->dev_count)
316 		return 0;
317 
318 	txa_ethdev = rte_zmalloc_socket(txa->mem_name,
319 					dev_count * sizeof(*txa_ethdev),
320 					0,
321 					txa->socket_id);
322 	if (txa_ethdev == NULL) {
323 		RTE_EDEV_LOG_ERR("Failed to alloc txa::txa_ethdev ");
324 		return -ENOMEM;
325 	}
326 
327 	if (txa->dev_count)
328 		memcpy(txa_ethdev, txa->txa_ethdev,
329 			txa->dev_count * sizeof(*txa_ethdev));
330 
331 	RTE_ETH_FOREACH_DEV(i) {
332 		if (i == dev_count)
333 			break;
334 		txa_ethdev[i].dev = &rte_eth_devices[i];
335 	}
336 
337 	txa->txa_ethdev = txa_ethdev;
338 	txa->dev_count = dev_count;
339 	return 0;
340 }
341 
342 static int
343 txa_service_queue_array_alloc(struct txa_service_data *txa,
344 			uint16_t port_id)
345 {
346 	struct txa_service_queue_info *tqi;
347 	uint16_t nb_queue;
348 	int ret;
349 
350 	ret = txa_service_ethdev_alloc(txa);
351 	if (ret != 0)
352 		return ret;
353 
354 	if (txa->txa_ethdev[port_id].queues)
355 		return 0;
356 
357 	nb_queue = txa->txa_ethdev[port_id].dev->data->nb_tx_queues;
358 	tqi = rte_zmalloc_socket(txa->mem_name,
359 				nb_queue *
360 				sizeof(struct txa_service_queue_info), 0,
361 				txa->socket_id);
362 	if (tqi == NULL)
363 		return -ENOMEM;
364 	txa->txa_ethdev[port_id].queues = tqi;
365 	return 0;
366 }
367 
368 static void
369 txa_service_queue_array_free(struct txa_service_data *txa,
370 			uint16_t port_id)
371 {
372 	struct txa_service_ethdev *txa_ethdev;
373 	struct txa_service_queue_info *tqi;
374 
375 	txa_ethdev = &txa->txa_ethdev[port_id];
376 	if (txa->txa_ethdev == NULL || txa_ethdev->nb_queues != 0)
377 		return;
378 
379 	tqi = txa_ethdev->queues;
380 	txa_ethdev->queues = NULL;
381 	rte_free(tqi);
382 
383 	if (txa->nb_queues == 0) {
384 		rte_free(txa->txa_ethdev);
385 		txa->txa_ethdev = NULL;
386 	}
387 }
388 
389 static void
390 txa_service_unregister(struct txa_service_data *txa)
391 {
392 	if (txa->service_id != TXA_INVALID_SERVICE_ID) {
393 		rte_service_component_runstate_set(txa->service_id, 0);
394 		while (rte_service_may_be_active(txa->service_id))
395 			rte_pause();
396 		rte_service_component_unregister(txa->service_id);
397 	}
398 	txa->service_id = TXA_INVALID_SERVICE_ID;
399 }
400 
401 static int
402 txa_service_register(struct txa_service_data *txa)
403 {
404 	int ret;
405 	struct rte_service_spec service;
406 	struct rte_event_eth_tx_adapter_conf conf;
407 
408 	if (txa->service_id != TXA_INVALID_SERVICE_ID)
409 		return 0;
410 
411 	memset(&service, 0, sizeof(service));
412 	snprintf(service.name, TXA_SERVICE_NAME_LEN, "txa_%d", txa->id);
413 	service.socket_id = txa->socket_id;
414 	service.callback = txa_service_func;
415 	service.callback_userdata = txa;
416 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
417 	ret = rte_service_component_register(&service,
418 					(uint32_t *)&txa->service_id);
419 	if (ret) {
420 		RTE_EDEV_LOG_ERR("failed to register service %s err = %"
421 				 PRId32, service.name, ret);
422 		return ret;
423 	}
424 
425 	ret = txa->conf_cb(txa->id, txa->eventdev_id, &conf, txa->conf_arg);
426 	if (ret) {
427 		txa_service_unregister(txa);
428 		return ret;
429 	}
430 
431 	rte_service_component_runstate_set(txa->service_id, 1);
432 	txa->port_id = conf.event_port_id;
433 	txa->max_nb_tx = conf.max_nb_tx;
434 	return 0;
435 }
436 
437 static struct rte_eth_dev_tx_buffer *
438 txa_service_tx_buf_alloc(struct txa_service_data *txa,
439 			const struct rte_eth_dev *dev)
440 {
441 	struct rte_eth_dev_tx_buffer *tb;
442 	uint16_t port_id;
443 
444 	port_id = dev->data->port_id;
445 	tb = rte_zmalloc_socket(txa->mem_name,
446 				RTE_ETH_TX_BUFFER_SIZE(TXA_BATCH_SIZE),
447 				0,
448 				rte_eth_dev_socket_id(port_id));
449 	if (tb == NULL)
450 		RTE_EDEV_LOG_ERR("Failed to allocate memory for tx buffer");
451 	return tb;
452 }
453 
454 static int
455 txa_service_is_queue_added(struct txa_service_data *txa,
456 			const struct rte_eth_dev *dev,
457 			uint16_t tx_queue_id)
458 {
459 	struct txa_service_queue_info *tqi;
460 
461 	tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
462 	return tqi && tqi->added;
463 }
464 
465 static int
466 txa_service_ctrl(uint8_t id, int start)
467 {
468 	int ret;
469 	struct txa_service_data *txa;
470 
471 	txa = txa_service_id_to_data(id);
472 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
473 		return 0;
474 
475 	ret = rte_service_runstate_set(txa->service_id, start);
476 	if (ret == 0 && !start) {
477 		while (rte_service_may_be_active(txa->service_id))
478 			rte_pause();
479 	}
480 	return ret;
481 }
482 
483 static void
484 txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
485 			void *userdata)
486 {
487 	struct txa_retry *tr;
488 	struct txa_service_data *data;
489 	struct rte_event_eth_tx_adapter_stats *stats;
490 	uint16_t sent = 0;
491 	unsigned int retry = 0;
492 	uint16_t i, n;
493 
494 	tr = (struct txa_retry *)(uintptr_t)userdata;
495 	data = txa_service_id_to_data(tr->id);
496 	stats = &data->stats;
497 
498 	do {
499 		n = rte_eth_tx_burst(tr->port_id, tr->tx_queue,
500 			       &pkts[sent], unsent - sent);
501 
502 		sent += n;
503 	} while (sent != unsent && retry++ < TXA_RETRY_CNT);
504 
505 	for (i = sent; i < unsent; i++)
506 		rte_pktmbuf_free(pkts[i]);
507 
508 	stats->tx_retry += retry;
509 	stats->tx_packets += sent;
510 	stats->tx_dropped += unsent - sent;
511 }
512 
513 static uint16_t
514 txa_process_event_vector(struct txa_service_data *txa,
515 			 struct rte_event_vector *vec)
516 {
517 	struct txa_service_queue_info *tqi;
518 	uint16_t port, queue, nb_tx = 0;
519 	struct rte_mbuf **mbufs;
520 	int i;
521 
522 	mbufs = (struct rte_mbuf **)vec->mbufs;
523 	if (vec->attr_valid) {
524 		port = vec->port;
525 		queue = vec->queue;
526 		tqi = txa_service_queue(txa, port, queue);
527 		if (unlikely(tqi == NULL || !tqi->added)) {
528 			rte_pktmbuf_free_bulk(mbufs, vec->nb_elem);
529 			rte_mempool_put(rte_mempool_from_obj(vec), vec);
530 			return 0;
531 		}
532 		for (i = 0; i < vec->nb_elem; i++) {
533 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
534 						   mbufs[i]);
535 		}
536 	} else {
537 		for (i = 0; i < vec->nb_elem; i++) {
538 			port = mbufs[i]->port;
539 			queue = rte_event_eth_tx_adapter_txq_get(mbufs[i]);
540 			tqi = txa_service_queue(txa, port, queue);
541 			if (unlikely(tqi == NULL || !tqi->added)) {
542 				rte_pktmbuf_free(mbufs[i]);
543 				continue;
544 			}
545 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
546 						   mbufs[i]);
547 		}
548 	}
549 	rte_mempool_put(rte_mempool_from_obj(vec), vec);
550 
551 	return nb_tx;
552 }
553 
554 static void
555 txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
556 	uint32_t n)
557 {
558 	uint32_t i;
559 	uint16_t nb_tx;
560 	struct rte_event_eth_tx_adapter_stats *stats;
561 
562 	stats = &txa->stats;
563 
564 	nb_tx = 0;
565 	for (i = 0; i < n; i++) {
566 		uint16_t port;
567 		uint16_t queue;
568 		struct txa_service_queue_info *tqi;
569 
570 		if (!(ev[i].event_type & RTE_EVENT_TYPE_VECTOR)) {
571 			struct rte_mbuf *m;
572 
573 			m = ev[i].mbuf;
574 			port = m->port;
575 			queue = rte_event_eth_tx_adapter_txq_get(m);
576 
577 			tqi = txa_service_queue(txa, port, queue);
578 			if (unlikely(tqi == NULL || !tqi->added)) {
579 				rte_pktmbuf_free(m);
580 				continue;
581 			}
582 
583 			nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
584 		} else {
585 			nb_tx += txa_process_event_vector(txa, ev[i].vec);
586 		}
587 	}
588 
589 	stats->tx_packets += nb_tx;
590 }
591 
592 static int32_t
593 txa_service_func(void *args)
594 {
595 	struct txa_service_data *txa = args;
596 	uint8_t dev_id;
597 	uint8_t port;
598 	uint16_t n;
599 	uint32_t nb_tx, max_nb_tx;
600 	struct rte_event ev[TXA_BATCH_SIZE];
601 
602 	dev_id = txa->eventdev_id;
603 	max_nb_tx = txa->max_nb_tx;
604 	port = txa->port_id;
605 
606 	if (txa->nb_queues == 0)
607 		return 0;
608 
609 	if (!rte_spinlock_trylock(&txa->tx_lock))
610 		return 0;
611 
612 	for (nb_tx = 0; nb_tx < max_nb_tx; nb_tx += n) {
613 
614 		n = rte_event_dequeue_burst(dev_id, port, ev, RTE_DIM(ev), 0);
615 		if (!n)
616 			break;
617 		txa_service_tx(txa, ev, n);
618 	}
619 
620 	if ((txa->loop_cnt++ & (TXA_FLUSH_THRESHOLD - 1)) == 0) {
621 
622 		struct txa_service_ethdev *tdi;
623 		struct txa_service_queue_info *tqi;
624 		struct rte_eth_dev *dev;
625 		uint16_t i;
626 
627 		tdi = txa->txa_ethdev;
628 		nb_tx = 0;
629 
630 		RTE_ETH_FOREACH_DEV(i) {
631 			uint16_t q;
632 
633 			if (i == txa->dev_count)
634 				break;
635 
636 			dev = tdi[i].dev;
637 			if (tdi[i].nb_queues == 0)
638 				continue;
639 			for (q = 0; q < dev->data->nb_tx_queues; q++) {
640 
641 				tqi = txa_service_queue(txa, i, q);
642 				if (unlikely(tqi == NULL || !tqi->added))
643 					continue;
644 
645 				nb_tx += rte_eth_tx_buffer_flush(i, q,
646 							tqi->tx_buf);
647 			}
648 		}
649 
650 		txa->stats.tx_packets += nb_tx;
651 	}
652 	rte_spinlock_unlock(&txa->tx_lock);
653 	return 0;
654 }
655 
656 static int
657 txa_service_adapter_create(uint8_t id, struct rte_eventdev *dev,
658 			struct rte_event_port_conf *port_conf)
659 {
660 	struct txa_service_data *txa;
661 	struct rte_event_port_conf *cb_conf;
662 	int ret;
663 
664 	cb_conf = rte_malloc(NULL, sizeof(*cb_conf), 0);
665 	if (cb_conf == NULL)
666 		return -ENOMEM;
667 
668 	*cb_conf = *port_conf;
669 	ret = txa_service_adapter_create_ext(id, dev, txa_service_conf_cb,
670 					cb_conf);
671 	if (ret) {
672 		rte_free(cb_conf);
673 		return ret;
674 	}
675 
676 	txa = txa_service_id_to_data(id);
677 	txa->conf_free = 1;
678 	return ret;
679 }
680 
681 static int
682 txa_service_adapter_create_ext(uint8_t id, struct rte_eventdev *dev,
683 			rte_event_eth_tx_adapter_conf_cb conf_cb,
684 			void *conf_arg)
685 {
686 	struct txa_service_data *txa;
687 	int socket_id;
688 	char mem_name[TXA_SERVICE_NAME_LEN];
689 	int ret;
690 
691 	if (conf_cb == NULL)
692 		return -EINVAL;
693 
694 	socket_id = dev->data->socket_id;
695 	snprintf(mem_name, TXA_MEM_NAME_LEN,
696 		"rte_event_eth_txa_%d",
697 		id);
698 
699 	ret = txa_service_data_init();
700 	if (ret != 0)
701 		return ret;
702 
703 	txa = rte_zmalloc_socket(mem_name,
704 				sizeof(*txa),
705 				RTE_CACHE_LINE_SIZE, socket_id);
706 	if (txa == NULL) {
707 		RTE_EDEV_LOG_ERR("failed to get mem for tx adapter");
708 		return -ENOMEM;
709 	}
710 
711 	txa->id = id;
712 	txa->eventdev_id = dev->data->dev_id;
713 	txa->socket_id = socket_id;
714 	strncpy(txa->mem_name, mem_name, TXA_SERVICE_NAME_LEN);
715 	txa->conf_cb = conf_cb;
716 	txa->conf_arg = conf_arg;
717 	txa->service_id = TXA_INVALID_SERVICE_ID;
718 	rte_spinlock_init(&txa->tx_lock);
719 	txa_service_data_array[id] = txa;
720 
721 	return 0;
722 }
723 
724 static int
725 txa_service_event_port_get(uint8_t id, uint8_t *port)
726 {
727 	struct txa_service_data *txa;
728 
729 	txa = txa_service_id_to_data(id);
730 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
731 		return -ENODEV;
732 
733 	*port = txa->port_id;
734 	return 0;
735 }
736 
737 static int
738 txa_service_adapter_free(uint8_t id)
739 {
740 	struct txa_service_data *txa;
741 
742 	txa = txa_service_id_to_data(id);
743 	if (txa->nb_queues) {
744 		RTE_EDEV_LOG_ERR("%" PRIu16 " Tx queues not deleted",
745 				txa->nb_queues);
746 		return -EBUSY;
747 	}
748 
749 	if (txa->conf_free)
750 		rte_free(txa->conf_arg);
751 	rte_free(txa);
752 	return 0;
753 }
754 
755 static int
756 txa_service_queue_add(uint8_t id,
757 		__rte_unused struct rte_eventdev *dev,
758 		const struct rte_eth_dev *eth_dev,
759 		int32_t tx_queue_id)
760 {
761 	struct txa_service_data *txa;
762 	struct txa_service_ethdev *tdi;
763 	struct txa_service_queue_info *tqi;
764 	struct rte_eth_dev_tx_buffer *tb;
765 	struct txa_retry *txa_retry;
766 	int ret = 0;
767 
768 	txa = txa_service_id_to_data(id);
769 
770 	if (tx_queue_id == -1) {
771 		int nb_queues;
772 		uint16_t i, j;
773 		uint16_t *qdone;
774 
775 		nb_queues = eth_dev->data->nb_tx_queues;
776 		if (txa->dev_count > eth_dev->data->port_id) {
777 			tdi = &txa->txa_ethdev[eth_dev->data->port_id];
778 			nb_queues -= tdi->nb_queues;
779 		}
780 
781 		qdone = rte_zmalloc(txa->mem_name,
782 				nb_queues * sizeof(*qdone), 0);
783 		if (qdone == NULL)
784 			return -ENOMEM;
785 		j = 0;
786 		for (i = 0; i < nb_queues; i++) {
787 			if (txa_service_is_queue_added(txa, eth_dev, i))
788 				continue;
789 			ret = txa_service_queue_add(id, dev, eth_dev, i);
790 			if (ret == 0)
791 				qdone[j++] = i;
792 			else
793 				break;
794 		}
795 
796 		if (i != nb_queues) {
797 			for (i = 0; i < j; i++)
798 				txa_service_queue_del(id, eth_dev, qdone[i]);
799 		}
800 		rte_free(qdone);
801 		return ret;
802 	}
803 
804 	ret = txa_service_register(txa);
805 	if (ret)
806 		return ret;
807 
808 	rte_spinlock_lock(&txa->tx_lock);
809 
810 	if (txa_service_is_queue_added(txa, eth_dev, tx_queue_id)) {
811 		rte_spinlock_unlock(&txa->tx_lock);
812 		return 0;
813 	}
814 
815 	ret = txa_service_queue_array_alloc(txa, eth_dev->data->port_id);
816 	if (ret)
817 		goto err_unlock;
818 
819 	tb = txa_service_tx_buf_alloc(txa, eth_dev);
820 	if (tb == NULL)
821 		goto err_unlock;
822 
823 	tdi = &txa->txa_ethdev[eth_dev->data->port_id];
824 	tqi = txa_service_queue(txa, eth_dev->data->port_id, tx_queue_id);
825 
826 	txa_retry = &tqi->txa_retry;
827 	txa_retry->id = txa->id;
828 	txa_retry->port_id = eth_dev->data->port_id;
829 	txa_retry->tx_queue = tx_queue_id;
830 
831 	rte_eth_tx_buffer_init(tb, TXA_BATCH_SIZE);
832 	rte_eth_tx_buffer_set_err_callback(tb,
833 		txa_service_buffer_retry, txa_retry);
834 
835 	tqi->tx_buf = tb;
836 	tqi->added = 1;
837 	tdi->nb_queues++;
838 	txa->nb_queues++;
839 
840 err_unlock:
841 	if (txa->nb_queues == 0) {
842 		txa_service_queue_array_free(txa,
843 					eth_dev->data->port_id);
844 		txa_service_unregister(txa);
845 	}
846 
847 	rte_spinlock_unlock(&txa->tx_lock);
848 	return 0;
849 }
850 
851 static int
852 txa_service_queue_del(uint8_t id,
853 		const struct rte_eth_dev *dev,
854 		int32_t tx_queue_id)
855 {
856 	struct txa_service_data *txa;
857 	struct txa_service_queue_info *tqi;
858 	struct rte_eth_dev_tx_buffer *tb;
859 	uint16_t port_id;
860 
861 	txa = txa_service_id_to_data(id);
862 	port_id = dev->data->port_id;
863 
864 	if (tx_queue_id == -1) {
865 		uint16_t i, q, nb_queues;
866 		int ret = 0;
867 
868 		nb_queues = txa->nb_queues;
869 		if (nb_queues == 0)
870 			return 0;
871 
872 		i = 0;
873 		q = 0;
874 		tqi = txa->txa_ethdev[port_id].queues;
875 
876 		while (i < nb_queues) {
877 
878 			if (tqi[q].added) {
879 				ret = txa_service_queue_del(id, dev, q);
880 				if (ret != 0)
881 					break;
882 			}
883 			i++;
884 			q++;
885 		}
886 		return ret;
887 	}
888 
889 	txa = txa_service_id_to_data(id);
890 
891 	tqi = txa_service_queue(txa, port_id, tx_queue_id);
892 	if (tqi == NULL || !tqi->added)
893 		return 0;
894 
895 	tb = tqi->tx_buf;
896 	tqi->added = 0;
897 	tqi->tx_buf = NULL;
898 	rte_free(tb);
899 	txa->nb_queues--;
900 	txa->txa_ethdev[port_id].nb_queues--;
901 
902 	txa_service_queue_array_free(txa, port_id);
903 	return 0;
904 }
905 
906 static int
907 txa_service_id_get(uint8_t id, uint32_t *service_id)
908 {
909 	struct txa_service_data *txa;
910 
911 	txa = txa_service_id_to_data(id);
912 	if (txa->service_id == TXA_INVALID_SERVICE_ID)
913 		return -ESRCH;
914 
915 	if (service_id == NULL)
916 		return -EINVAL;
917 
918 	*service_id = txa->service_id;
919 	return 0;
920 }
921 
922 static int
923 txa_service_start(uint8_t id)
924 {
925 	return txa_service_ctrl(id, 1);
926 }
927 
928 static int
929 txa_service_stats_get(uint8_t id,
930 		struct rte_event_eth_tx_adapter_stats *stats)
931 {
932 	struct txa_service_data *txa;
933 
934 	txa = txa_service_id_to_data(id);
935 	*stats = txa->stats;
936 	return 0;
937 }
938 
939 static int
940 txa_service_stats_reset(uint8_t id)
941 {
942 	struct txa_service_data *txa;
943 
944 	txa = txa_service_id_to_data(id);
945 	memset(&txa->stats, 0, sizeof(txa->stats));
946 	return 0;
947 }
948 
949 static int
950 txa_service_stop(uint8_t id)
951 {
952 	return txa_service_ctrl(id, 0);
953 }
954 
955 
956 int
957 rte_event_eth_tx_adapter_create(uint8_t id, uint8_t dev_id,
958 				struct rte_event_port_conf *port_conf)
959 {
960 	struct rte_eventdev *dev;
961 	int ret;
962 
963 	if (port_conf == NULL)
964 		return -EINVAL;
965 
966 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
967 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
968 
969 	dev = &rte_eventdevs[dev_id];
970 
971 	ret = txa_init();
972 	if (ret != 0)
973 		return ret;
974 
975 	if (txa_adapter_exist(id))
976 		return -EEXIST;
977 
978 	txa_dev_id_array[id] = dev_id;
979 	if (txa_dev_adapter_create(id))
980 		ret = txa_dev_adapter_create(id)(id, dev);
981 
982 	if (ret != 0) {
983 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
984 		return ret;
985 	}
986 
987 	ret = txa_service_adapter_create(id, dev, port_conf);
988 	if (ret != 0) {
989 		if (txa_dev_adapter_free(id))
990 			txa_dev_adapter_free(id)(id, dev);
991 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
992 		return ret;
993 	}
994 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, NULL, port_conf,
995 		ret);
996 	txa_dev_id_array[id] = dev_id;
997 	return 0;
998 }
999 
1000 int
1001 rte_event_eth_tx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1002 				rte_event_eth_tx_adapter_conf_cb conf_cb,
1003 				void *conf_arg)
1004 {
1005 	struct rte_eventdev *dev;
1006 	int ret;
1007 
1008 	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1009 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1010 
1011 	ret = txa_init();
1012 	if (ret != 0)
1013 		return ret;
1014 
1015 	if (txa_adapter_exist(id))
1016 		return -EINVAL;
1017 
1018 	dev = &rte_eventdevs[dev_id];
1019 
1020 	txa_dev_id_array[id] = dev_id;
1021 	if (txa_dev_adapter_create_ext(id))
1022 		ret = txa_dev_adapter_create_ext(id)(id, dev);
1023 
1024 	if (ret != 0) {
1025 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1026 		return ret;
1027 	}
1028 
1029 	ret = txa_service_adapter_create_ext(id, dev, conf_cb, conf_arg);
1030 	if (ret != 0) {
1031 		if (txa_dev_adapter_free(id))
1032 			txa_dev_adapter_free(id)(id, dev);
1033 		txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1034 		return ret;
1035 	}
1036 
1037 	rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, conf_cb, conf_arg,
1038 		ret);
1039 	txa_dev_id_array[id] = dev_id;
1040 	return 0;
1041 }
1042 
1043 
1044 int
1045 rte_event_eth_tx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
1046 {
1047 	TXA_CHECK_OR_ERR_RET(id);
1048 
1049 	return txa_service_event_port_get(id, event_port_id);
1050 }
1051 
1052 int
1053 rte_event_eth_tx_adapter_free(uint8_t id)
1054 {
1055 	int ret;
1056 
1057 	TXA_CHECK_OR_ERR_RET(id);
1058 
1059 	ret = txa_dev_adapter_free(id) ?
1060 		txa_dev_adapter_free(id)(id, txa_evdev(id)) :
1061 		0;
1062 
1063 	if (ret == 0)
1064 		ret = txa_service_adapter_free(id);
1065 	txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1066 
1067 	rte_eventdev_trace_eth_tx_adapter_free(id, ret);
1068 	return ret;
1069 }
1070 
1071 int
1072 rte_event_eth_tx_adapter_queue_add(uint8_t id,
1073 				uint16_t eth_dev_id,
1074 				int32_t queue)
1075 {
1076 	struct rte_eth_dev *eth_dev;
1077 	int ret;
1078 	uint32_t caps;
1079 
1080 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1081 	TXA_CHECK_OR_ERR_RET(id);
1082 
1083 	eth_dev = &rte_eth_devices[eth_dev_id];
1084 	TXA_CHECK_TXQ(eth_dev, queue);
1085 
1086 	caps = 0;
1087 	if (txa_dev_caps_get(id))
1088 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1089 
1090 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1091 		ret =  txa_dev_queue_add(id) ?
1092 					txa_dev_queue_add(id)(id,
1093 							txa_evdev(id),
1094 							eth_dev,
1095 							queue) : 0;
1096 	else
1097 		ret = txa_service_queue_add(id, txa_evdev(id), eth_dev, queue);
1098 
1099 	rte_eventdev_trace_eth_tx_adapter_queue_add(id, eth_dev_id, queue,
1100 		ret);
1101 	return ret;
1102 }
1103 
1104 int
1105 rte_event_eth_tx_adapter_queue_del(uint8_t id,
1106 				uint16_t eth_dev_id,
1107 				int32_t queue)
1108 {
1109 	struct rte_eth_dev *eth_dev;
1110 	int ret;
1111 	uint32_t caps;
1112 
1113 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1114 	TXA_CHECK_OR_ERR_RET(id);
1115 
1116 	eth_dev = &rte_eth_devices[eth_dev_id];
1117 
1118 	caps = 0;
1119 
1120 	if (txa_dev_caps_get(id))
1121 		txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1122 
1123 	if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1124 		ret =  txa_dev_queue_del(id) ?
1125 					txa_dev_queue_del(id)(id, txa_evdev(id),
1126 							eth_dev,
1127 							queue) : 0;
1128 	else
1129 		ret = txa_service_queue_del(id, eth_dev, queue);
1130 
1131 	rte_eventdev_trace_eth_tx_adapter_queue_del(id, eth_dev_id, queue,
1132 		ret);
1133 	return ret;
1134 }
1135 
1136 int
1137 rte_event_eth_tx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1138 {
1139 	TXA_CHECK_OR_ERR_RET(id);
1140 
1141 	return txa_service_id_get(id, service_id);
1142 }
1143 
1144 int
1145 rte_event_eth_tx_adapter_start(uint8_t id)
1146 {
1147 	int ret;
1148 
1149 	TXA_CHECK_OR_ERR_RET(id);
1150 
1151 	ret = txa_dev_start(id) ? txa_dev_start(id)(id, txa_evdev(id)) : 0;
1152 	if (ret == 0)
1153 		ret = txa_service_start(id);
1154 	rte_eventdev_trace_eth_tx_adapter_start(id, ret);
1155 	return ret;
1156 }
1157 
1158 int
1159 rte_event_eth_tx_adapter_stats_get(uint8_t id,
1160 				struct rte_event_eth_tx_adapter_stats *stats)
1161 {
1162 	int ret;
1163 
1164 	TXA_CHECK_OR_ERR_RET(id);
1165 
1166 	if (stats == NULL)
1167 		return -EINVAL;
1168 
1169 	*stats = (struct rte_event_eth_tx_adapter_stats){0};
1170 
1171 	ret = txa_dev_stats_get(id) ?
1172 			txa_dev_stats_get(id)(id, txa_evdev(id), stats) : 0;
1173 
1174 	if (ret == 0 && txa_service_id_get(id, NULL) != ESRCH) {
1175 		if (txa_dev_stats_get(id)) {
1176 			struct rte_event_eth_tx_adapter_stats service_stats;
1177 
1178 			ret = txa_service_stats_get(id, &service_stats);
1179 			if (ret == 0) {
1180 				stats->tx_retry += service_stats.tx_retry;
1181 				stats->tx_packets += service_stats.tx_packets;
1182 				stats->tx_dropped += service_stats.tx_dropped;
1183 			}
1184 		} else
1185 			ret = txa_service_stats_get(id, stats);
1186 	}
1187 
1188 	return ret;
1189 }
1190 
1191 int
1192 rte_event_eth_tx_adapter_stats_reset(uint8_t id)
1193 {
1194 	int ret;
1195 
1196 	TXA_CHECK_OR_ERR_RET(id);
1197 
1198 	ret = txa_dev_stats_reset(id) ?
1199 		txa_dev_stats_reset(id)(id, txa_evdev(id)) : 0;
1200 	if (ret == 0)
1201 		ret = txa_service_stats_reset(id);
1202 	return ret;
1203 }
1204 
1205 int
1206 rte_event_eth_tx_adapter_stop(uint8_t id)
1207 {
1208 	int ret;
1209 
1210 	TXA_CHECK_OR_ERR_RET(id);
1211 
1212 	ret = txa_dev_stop(id) ? txa_dev_stop(id)(id,  txa_evdev(id)) : 0;
1213 	if (ret == 0)
1214 		ret = txa_service_stop(id);
1215 	rte_eventdev_trace_eth_tx_adapter_stop(id, ret);
1216 	return ret;
1217 }
1218