xref: /dpdk/lib/port/rte_swx_port_ethdev.c (revision 45b3fcbfd15348468c428b9db2691f71208a9507)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4 #include <string.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <stdint.h>
8 
9 #include <rte_mbuf.h>
10 #include <rte_ethdev.h>
11 #include <rte_hexdump.h>
12 
13 #include "rte_swx_port_ethdev.h"
14 
15 #define CHECK(condition)                                                       \
16 do {                                                                           \
17 	if (!(condition))                                                      \
18 		return NULL;                                                   \
19 } while (0)
20 
21 #ifndef TRACE_LEVEL
22 #define TRACE_LEVEL 0
23 #endif
24 
25 #if TRACE_LEVEL
26 #define TRACE(...) printf(__VA_ARGS__)
27 #else
28 #define TRACE(...)
29 #endif
30 
31 /*
32  * Port ETHDEV Reader
33  */
34 struct reader {
35 	struct {
36 		uint16_t port_id;
37 		uint16_t queue_id;
38 		uint32_t burst_size;
39 	} params;
40 	struct rte_swx_port_in_stats stats;
41 	struct rte_mbuf **pkts;
42 	int n_pkts;
43 	int pos;
44 };
45 
46 static void *
reader_create(void * args)47 reader_create(void *args)
48 {
49 	struct rte_eth_dev_info info;
50 	struct rte_swx_port_ethdev_reader_params *params = args;
51 	struct reader *p;
52 	int status;
53 	uint16_t port_id;
54 
55 	/* Check input parameters. */
56 	CHECK(params);
57 
58 	CHECK(params->dev_name);
59 	status = rte_eth_dev_get_port_by_name(params->dev_name, &port_id);
60 	CHECK(!status);
61 
62 	status = rte_eth_dev_info_get(port_id, &info);
63 	CHECK((status == -ENOTSUP) || (params->queue_id < info.nb_rx_queues));
64 
65 	CHECK(params->burst_size);
66 
67 	/* Memory allocation. */
68 	p = calloc(1, sizeof(struct reader));
69 	CHECK(p);
70 
71 	p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
72 	if (!p->pkts) {
73 		free(p);
74 		CHECK(0);
75 	}
76 
77 	/* Initialization. */
78 	p->params.port_id = port_id;
79 	p->params.queue_id = params->queue_id;
80 	p->params.burst_size = params->burst_size;
81 
82 	return p;
83 }
84 
85 static int
reader_pkt_rx(void * port,struct rte_swx_pkt * pkt)86 reader_pkt_rx(void *port, struct rte_swx_pkt *pkt)
87 {
88 	struct reader *p = port;
89 	struct rte_mbuf *m;
90 
91 	if (p->pos == p->n_pkts) {
92 		int n_pkts;
93 
94 		n_pkts = rte_eth_rx_burst(p->params.port_id,
95 					  p->params.queue_id,
96 					  p->pkts,
97 					  p->params.burst_size);
98 		if (!n_pkts) {
99 			p->stats.n_empty++;
100 			return 0;
101 		}
102 
103 		TRACE("[Ethdev RX port %u queue %u] %d packets in\n",
104 		      (uint32_t)p->params.port_id,
105 		      (uint32_t)p->params.queue_id,
106 		      n_pkts);
107 
108 		p->n_pkts = n_pkts;
109 		p->pos = 0;
110 	}
111 
112 	m = p->pkts[p->pos++];
113 	pkt->handle = m;
114 	pkt->pkt = m->buf_addr;
115 	pkt->offset = m->data_off;
116 	pkt->length = m->pkt_len;
117 
118 	TRACE("[Ethdev RX port %u queue %u] Pkt %d (%u bytes at offset %u)\n",
119 	      (uint32_t)p->params.port_id,
120 	      (uint32_t)p->params.queue_id,
121 	      p->pos - 1,
122 	      pkt->length,
123 	      pkt->offset);
124 	if (TRACE_LEVEL)
125 		rte_hexdump(stdout,
126 			    NULL,
127 			    &((uint8_t *)m->buf_addr)[m->data_off],
128 			    m->data_len);
129 
130 	p->stats.n_pkts++;
131 	p->stats.n_bytes += pkt->length;
132 
133 	return 1;
134 }
135 
136 static void
reader_free(void * port)137 reader_free(void *port)
138 {
139 	struct reader *p = port;
140 	int i;
141 
142 	if (!p)
143 		return;
144 
145 	for (i = 0; i < p->n_pkts; i++) {
146 		struct rte_mbuf *pkt = p->pkts[i];
147 
148 		rte_pktmbuf_free(pkt);
149 	}
150 
151 	free(p->pkts);
152 	free(p);
153 }
154 
155 static void
reader_stats_read(void * port,struct rte_swx_port_in_stats * stats)156 reader_stats_read(void *port, struct rte_swx_port_in_stats *stats)
157 {
158 	struct reader *p = port;
159 
160 	memcpy(stats, &p->stats, sizeof(p->stats));
161 }
162 
163 /*
164  * Port ETHDEV Writer
165  */
166 struct writer {
167 	struct {
168 		uint16_t port_id;
169 		uint16_t queue_id;
170 		uint32_t burst_size;
171 	} params;
172 	struct rte_swx_port_out_stats stats;
173 
174 	struct rte_mbuf **pkts;
175 	int n_pkts;
176 	uint32_t n_bytes;
177 	int flush_flag;
178 };
179 
180 static void *
writer_create(void * args)181 writer_create(void *args)
182 {
183 	struct rte_eth_dev_info info;
184 	struct rte_swx_port_ethdev_writer_params *params = args;
185 	struct writer *p;
186 	int status;
187 	uint16_t port_id;
188 
189 	/* Check input parameters. */
190 	CHECK(params);
191 
192 	CHECK(params->dev_name);
193 	status = rte_eth_dev_get_port_by_name(params->dev_name, &port_id);
194 	CHECK(!status);
195 
196 	status = rte_eth_dev_info_get(port_id, &info);
197 	CHECK((status == -ENOTSUP) || (params->queue_id < info.nb_tx_queues));
198 
199 	CHECK(params->burst_size);
200 
201 	/* Memory allocation. */
202 	p = calloc(1, sizeof(struct writer));
203 	CHECK(p);
204 
205 	p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
206 	if (!p->pkts) {
207 		free(p);
208 		CHECK(0);
209 	}
210 
211 	/* Initialization. */
212 	p->params.port_id = port_id;
213 	p->params.queue_id = params->queue_id;
214 	p->params.burst_size = params->burst_size;
215 
216 	return p;
217 }
218 
219 static inline void
__writer_flush(struct writer * p)220 __writer_flush(struct writer *p)
221 {
222 	struct rte_mbuf **pkts = p->pkts;
223 	uint64_t n_pkts_total = p->stats.n_pkts;
224 	uint64_t n_bytes_total = p->stats.n_bytes;
225 	uint64_t n_pkts_drop_total = p->stats.n_pkts_drop;
226 	uint64_t n_bytes_drop_total = p->stats.n_bytes_drop;
227 	int n_pkts = p->n_pkts, n_pkts_drop, n_pkts_tx;
228 	uint32_t n_bytes = p->n_bytes, n_bytes_drop = 0;
229 
230 	/* Packet TX. */
231 	n_pkts_tx = rte_eth_tx_burst(p->params.port_id,
232 				     p->params.queue_id,
233 				     pkts,
234 				     n_pkts);
235 
236 	/* Packet drop. */
237 	n_pkts_drop = n_pkts - n_pkts_tx;
238 
239 	for ( ; n_pkts_tx < n_pkts; n_pkts_tx++) {
240 		struct rte_mbuf *m = pkts[n_pkts_tx];
241 
242 		n_bytes_drop += m->pkt_len;
243 		rte_pktmbuf_free(m);
244 	}
245 
246 	/* Port update. */
247 	p->stats.n_pkts = n_pkts_total + n_pkts - n_pkts_drop;
248 	p->stats.n_bytes = n_bytes_total + n_bytes - n_bytes_drop;
249 	p->stats.n_pkts_drop = n_pkts_drop_total + n_pkts_drop;
250 	p->stats.n_bytes_drop = n_bytes_drop_total + n_bytes_drop;
251 	p->n_pkts = 0;
252 	p->n_bytes = 0;
253 	p->flush_flag = 0;
254 
255 	TRACE("[Ethdev TX port %u queue %u] Buffered packets flushed: %d out, %d dropped\n",
256 	      (uint32_t)p->params.port_id,
257 	      (uint32_t)p->params.queue_id,
258 	      n_pkts - n_pkts_drop,
259 	      n_pkts_drop);
260 }
261 
262 static void
writer_pkt_tx(void * port,struct rte_swx_pkt * pkt)263 writer_pkt_tx(void *port, struct rte_swx_pkt *pkt)
264 {
265 	struct writer *p = port;
266 	int n_pkts = p->n_pkts;
267 	uint32_t n_bytes = p->n_bytes;
268 	struct rte_mbuf *m = pkt->handle;
269 	uint32_t pkt_length = pkt->length;
270 
271 	TRACE("[Ethdev TX port %u queue %u] Pkt %d (%u bytes at offset %u)\n",
272 	      (uint32_t)p->params.port_id,
273 	      (uint32_t)p->params.queue_id,
274 	      p->n_pkts - 1,
275 	      pkt->length,
276 	      pkt->offset);
277 	if (TRACE_LEVEL)
278 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
279 
280 	m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
281 	m->pkt_len = pkt_length;
282 	m->data_off = (uint16_t)pkt->offset;
283 
284 	p->pkts[n_pkts++] = m;
285 	p->n_pkts = n_pkts;
286 	p->n_bytes = n_bytes + pkt_length;
287 
288 	if (n_pkts == (int)p->params.burst_size)
289 		__writer_flush(p);
290 }
291 
292 static void
writer_pkt_fast_clone_tx(void * port,struct rte_swx_pkt * pkt)293 writer_pkt_fast_clone_tx(void *port, struct rte_swx_pkt *pkt)
294 {
295 	struct writer *p = port;
296 	int n_pkts = p->n_pkts;
297 	uint32_t n_bytes = p->n_bytes;
298 	uint64_t n_pkts_clone = p->stats.n_pkts_clone;
299 	struct rte_mbuf *m = pkt->handle;
300 	uint32_t pkt_length = pkt->length;
301 
302 	TRACE("[Ethdev TX port %u queue %u] Pkt %d (%u bytes at offset %u) (fast clone)\n",
303 	      (uint32_t)p->params.port_id,
304 	      (uint32_t)p->params.queue_id,
305 	      p->n_pkts - 1,
306 	      pkt->length,
307 	      pkt->offset);
308 	if (TRACE_LEVEL)
309 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
310 
311 	m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
312 	m->pkt_len = pkt_length;
313 	m->data_off = (uint16_t)pkt->offset;
314 	rte_pktmbuf_refcnt_update(m, 1);
315 
316 	p->pkts[n_pkts++] = m;
317 	p->n_pkts = n_pkts;
318 	p->n_bytes = n_bytes + pkt_length;
319 	p->stats.n_pkts_clone = n_pkts_clone + 1;
320 
321 	if (n_pkts == (int)p->params.burst_size)
322 		__writer_flush(p);
323 }
324 
325 static void
writer_pkt_clone_tx(void * port,struct rte_swx_pkt * pkt,uint32_t truncation_length)326 writer_pkt_clone_tx(void *port, struct rte_swx_pkt *pkt, uint32_t truncation_length)
327 {
328 	struct writer *p = port;
329 	int n_pkts = p->n_pkts;
330 	uint32_t n_bytes = p->n_bytes;
331 	uint64_t n_pkts_clone = p->stats.n_pkts_clone;
332 	struct rte_mbuf *m = pkt->handle, *m_clone;
333 	uint32_t pkt_length = pkt->length;
334 
335 	TRACE("[Ethdev TX port %u queue %u] Pkt %d (%u bytes at offset %u) (clone)\n",
336 	      (uint32_t)p->params.port_id,
337 	      (uint32_t)p->params.queue_id,
338 	      p->n_pkts - 1,
339 	      pkt->length,
340 	      pkt->offset);
341 	if (TRACE_LEVEL)
342 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
343 
344 	m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
345 	m->pkt_len = pkt_length;
346 	m->data_off = (uint16_t)pkt->offset;
347 
348 	m_clone = rte_pktmbuf_copy(m, m->pool, 0, truncation_length);
349 	if (!m_clone) {
350 		p->stats.n_pkts_clone_err++;
351 		return;
352 	}
353 
354 	p->pkts[n_pkts++] = m_clone;
355 	p->n_pkts = n_pkts;
356 	p->n_bytes = n_bytes + pkt_length;
357 	p->stats.n_pkts_clone = n_pkts_clone + 1;
358 
359 	if (n_pkts == (int)p->params.burst_size)
360 		__writer_flush(p);
361 }
362 
363 static void
writer_flush(void * port)364 writer_flush(void *port)
365 {
366 	struct writer *p = port;
367 
368 	if (p->n_pkts && p->flush_flag)
369 		__writer_flush(p);
370 
371 	p->flush_flag = 1;
372 }
373 
374 static void
writer_free(void * port)375 writer_free(void *port)
376 {
377 	struct writer *p = port;
378 	int i;
379 
380 	if (!p)
381 		return;
382 
383 	for (i = 0; i < p->n_pkts; i++) {
384 		struct rte_mbuf *m = p->pkts[i];
385 
386 		rte_pktmbuf_free(m);
387 	}
388 
389 	free(p->pkts);
390 	free(port);
391 }
392 
393 static void
writer_stats_read(void * port,struct rte_swx_port_out_stats * stats)394 writer_stats_read(void *port, struct rte_swx_port_out_stats *stats)
395 {
396 	struct writer *p = port;
397 
398 	memcpy(stats, &p->stats, sizeof(p->stats));
399 }
400 
401 /*
402  * Summary of port operations
403  */
404 struct rte_swx_port_in_ops rte_swx_port_ethdev_reader_ops = {
405 	.create = reader_create,
406 	.free = reader_free,
407 	.pkt_rx = reader_pkt_rx,
408 	.stats_read = reader_stats_read,
409 };
410 
411 struct rte_swx_port_out_ops rte_swx_port_ethdev_writer_ops = {
412 	.create = writer_create,
413 	.free = writer_free,
414 	.pkt_tx = writer_pkt_tx,
415 	.pkt_fast_clone_tx = writer_pkt_fast_clone_tx,
416 	.pkt_clone_tx = writer_pkt_clone_tx,
417 	.flush = writer_flush,
418 	.stats_read = writer_stats_read,
419 };
420