xref: /dpdk/lib/port/rte_swx_port_ring.c (revision f8b0c950a58f62b9247832b0ff8e716fdabddfb6)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2021 Intel Corporation
3  */
4 #include <string.h>
5 #include <stdint.h>
6 #include <stdlib.h>
7 
8 #include <rte_mbuf.h>
9 #include <rte_ring.h>
10 #include <rte_hexdump.h>
11 
12 #include "rte_swx_port_ring.h"
13 
14 #ifndef TRACE_LEVEL
15 #define TRACE_LEVEL 0
16 #endif
17 
18 #if TRACE_LEVEL
19 #define TRACE(...) printf(__VA_ARGS__)
20 #else
21 #define TRACE(...)
22 #endif
23 
24 /*
25  * Reader
26  */
27 struct reader {
28 	struct {
29 		struct rte_ring *ring;
30 		char *name;
31 		uint32_t burst_size;
32 	} params;
33 	struct rte_swx_port_in_stats stats;
34 	struct rte_mbuf **pkts;
35 	int n_pkts;
36 	int pos;
37 };
38 
39 static void *
reader_create(void * args)40 reader_create(void *args)
41 {
42 	struct rte_swx_port_ring_reader_params *params = args;
43 	struct rte_ring *ring;
44 	struct reader *p = NULL;
45 
46 	/* Check input parameters. */
47 	if (!params || !params->name || !params->burst_size)
48 		goto error;
49 
50 	ring = rte_ring_lookup(params->name);
51 	if (!ring)
52 		goto error;
53 
54 	/* Memory allocation. */
55 	p = calloc(1, sizeof(struct reader));
56 	if (!p)
57 		goto error;
58 
59 	p->params.name = strdup(params->name);
60 	if (!p->params.name)
61 		goto error;
62 
63 	p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
64 	if (!p->pkts)
65 		goto error;
66 
67 	/* Initialization. */
68 	p->params.ring = ring;
69 	p->params.burst_size = params->burst_size;
70 
71 	return p;
72 
73 error:
74 	if (!p)
75 		return NULL;
76 
77 	free(p->pkts);
78 	free(p->params.name);
79 	free(p);
80 	return NULL;
81 }
82 
83 static int
reader_pkt_rx(void * port,struct rte_swx_pkt * pkt)84 reader_pkt_rx(void *port, struct rte_swx_pkt *pkt)
85 {
86 	struct reader *p = port;
87 	struct rte_mbuf *m;
88 
89 	if (p->pos == p->n_pkts) {
90 		int n_pkts;
91 
92 		n_pkts = rte_ring_sc_dequeue_burst(p->params.ring,
93 			(void **) p->pkts,
94 			p->params.burst_size,
95 			NULL);
96 		if (!n_pkts) {
97 			p->stats.n_empty++;
98 			return 0;
99 		}
100 
101 		TRACE("[Ring %s] %d packets in\n",
102 		      p->params.name,
103 		      n_pkts);
104 
105 		p->n_pkts = n_pkts;
106 		p->pos = 0;
107 	}
108 
109 	m = p->pkts[p->pos++];
110 	pkt->handle = m;
111 	pkt->pkt = m->buf_addr;
112 	pkt->offset = m->data_off;
113 	pkt->length = m->pkt_len;
114 
115 	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n",
116 	      (uint32_t)p->params.name,
117 	      p->pos - 1,
118 	      pkt->length,
119 	      pkt->offset);
120 	if (TRACE_LEVEL)
121 		rte_hexdump(stdout,
122 			    NULL,
123 			    &((uint8_t *)m->buf_addr)[m->data_off],
124 			    m->data_len);
125 
126 	p->stats.n_pkts++;
127 	p->stats.n_bytes += pkt->length;
128 
129 	return 1;
130 }
131 
132 static void
reader_free(void * port)133 reader_free(void *port)
134 {
135 	struct reader *p = port;
136 	int i;
137 
138 	if (!p)
139 		return;
140 
141 	for (i = 0; i < p->n_pkts; i++) {
142 		struct rte_mbuf *pkt = p->pkts[i];
143 
144 		rte_pktmbuf_free(pkt);
145 	}
146 
147 	free(p->pkts);
148 	free(p->params.name);
149 	free(p);
150 }
151 
152 static void
reader_stats_read(void * port,struct rte_swx_port_in_stats * stats)153 reader_stats_read(void *port, struct rte_swx_port_in_stats *stats)
154 {
155 	struct reader *p = port;
156 
157 	if (!stats)
158 		return;
159 
160 	memcpy(stats, &p->stats, sizeof(p->stats));
161 }
162 
163 /*
164  * Writer
165  */
166 struct writer {
167 	struct {
168 		struct rte_ring *ring;
169 		char *name;
170 		uint32_t burst_size;
171 	} params;
172 	struct rte_swx_port_out_stats stats;
173 
174 	struct rte_mbuf **pkts;
175 	int n_pkts;
176 	uint32_t n_bytes;
177 	int flush_flag;
178 };
179 
180 static void *
writer_create(void * args)181 writer_create(void *args)
182 {
183 	struct rte_swx_port_ring_writer_params *params = args;
184 	struct rte_ring *ring;
185 	struct writer *p = NULL;
186 
187 	/* Check input parameters. */
188 	if (!params || !params->name || !params->burst_size)
189 		goto error;
190 
191 	ring = rte_ring_lookup(params->name);
192 	if (!ring)
193 		goto error;
194 
195 	/* Memory allocation. */
196 	p = calloc(1, sizeof(struct writer));
197 	if (!p)
198 		goto error;
199 
200 	p->params.name = strdup(params->name);
201 	if (!p->params.name)
202 		goto error;
203 
204 	p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
205 	if (!p->pkts)
206 		goto error;
207 
208 	/* Initialization. */
209 	p->params.ring = ring;
210 	p->params.burst_size = params->burst_size;
211 
212 	return p;
213 
214 error:
215 	if (!p)
216 		return NULL;
217 
218 	free(p->params.name);
219 	free(p->pkts);
220 	free(p);
221 	return NULL;
222 }
223 
224 static inline void
__writer_flush(struct writer * p)225 __writer_flush(struct writer *p)
226 {
227 	struct rte_mbuf **pkts = p->pkts;
228 	uint64_t n_pkts_total = p->stats.n_pkts;
229 	uint64_t n_bytes_total = p->stats.n_bytes;
230 	uint64_t n_pkts_drop_total = p->stats.n_pkts_drop;
231 	uint64_t n_bytes_drop_total = p->stats.n_bytes_drop;
232 	int n_pkts = p->n_pkts, n_pkts_drop, n_pkts_tx;
233 	uint32_t n_bytes = p->n_bytes, n_bytes_drop = 0;
234 
235 	/* Packet TX. */
236 	n_pkts_tx = rte_ring_sp_enqueue_burst(p->params.ring,
237 					      (void **)pkts,
238 					      n_pkts,
239 					      NULL);
240 
241 	/* Packet drop. */
242 	n_pkts_drop = n_pkts - n_pkts_tx;
243 
244 	for ( ; n_pkts_tx < n_pkts; n_pkts_tx++) {
245 		struct rte_mbuf *m = pkts[n_pkts_tx];
246 
247 		n_bytes_drop += m->pkt_len;
248 		rte_pktmbuf_free(m);
249 	}
250 
251 	/* Port update. */
252 	p->stats.n_pkts = n_pkts_total + n_pkts - n_pkts_drop;
253 	p->stats.n_bytes = n_bytes_total + n_bytes - n_bytes_drop;
254 	p->stats.n_pkts_drop = n_pkts_drop_total + n_pkts_drop;
255 	p->stats.n_bytes_drop = n_bytes_drop_total + n_bytes_drop;
256 	p->n_pkts = 0;
257 	p->n_bytes = 0;
258 	p->flush_flag = 0;
259 
260 	TRACE("[Ring %s] Buffered packets flushed: %d out, %d dropped\n",
261 	      p->params.name,
262 	      n_pkts - n_pkts_drop,
263 	      n_pkts_drop);
264 }
265 
266 static void
writer_pkt_tx(void * port,struct rte_swx_pkt * pkt)267 writer_pkt_tx(void *port, struct rte_swx_pkt *pkt)
268 {
269 	struct writer *p = port;
270 	int n_pkts = p->n_pkts;
271 	uint32_t n_bytes = p->n_bytes;
272 	struct rte_mbuf *m = pkt->handle;
273 	uint32_t pkt_length = pkt->length;
274 
275 	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n",
276 	      p->params.name,
277 	      p->n_pkts - 1,
278 	      pkt->length,
279 	      pkt->offset);
280 	if (TRACE_LEVEL)
281 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
282 
283 	m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
284 	m->pkt_len = pkt_length;
285 	m->data_off = (uint16_t)pkt->offset;
286 
287 	p->pkts[n_pkts++] = m;
288 	p->n_pkts = n_pkts;
289 	p->n_bytes = n_bytes + pkt_length;
290 
291 	if (n_pkts == (int)p->params.burst_size)
292 		__writer_flush(p);
293 }
294 
295 static void
writer_pkt_fast_clone_tx(void * port,struct rte_swx_pkt * pkt)296 writer_pkt_fast_clone_tx(void *port, struct rte_swx_pkt *pkt)
297 {
298 	struct writer *p = port;
299 	int n_pkts = p->n_pkts;
300 	uint32_t n_bytes = p->n_bytes;
301 	uint64_t n_pkts_clone = p->stats.n_pkts_clone;
302 	struct rte_mbuf *m = pkt->handle;
303 	uint32_t pkt_length = pkt->length;
304 
305 	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u) (fast clone)\n",
306 	      p->params.name,
307 	      p->n_pkts - 1,
308 	      pkt->length,
309 	      pkt->offset);
310 	if (TRACE_LEVEL)
311 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
312 
313 	m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
314 	m->pkt_len = pkt_length;
315 	m->data_off = (uint16_t)pkt->offset;
316 	rte_pktmbuf_refcnt_update(m, 1);
317 
318 	p->pkts[n_pkts++] = m;
319 	p->n_pkts = n_pkts;
320 	p->n_bytes = n_bytes + pkt_length;
321 	p->stats.n_pkts_clone = n_pkts_clone + 1;
322 
323 	if (n_pkts == (int)p->params.burst_size)
324 		__writer_flush(p);
325 }
326 
327 static void
writer_pkt_clone_tx(void * port,struct rte_swx_pkt * pkt,uint32_t truncation_length)328 writer_pkt_clone_tx(void *port, struct rte_swx_pkt *pkt, uint32_t truncation_length)
329 {
330 	struct writer *p = port;
331 	int n_pkts = p->n_pkts;
332 	uint32_t n_bytes = p->n_bytes;
333 	uint64_t n_pkts_clone = p->stats.n_pkts_clone;
334 	struct rte_mbuf *m = pkt->handle, *m_clone;
335 	uint32_t pkt_length = pkt->length;
336 
337 	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u) (clone)\n",
338 	      p->params.name,
339 	      p->n_pkts - 1,
340 	      pkt->length,
341 	      pkt->offset);
342 	if (TRACE_LEVEL)
343 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
344 
345 	m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
346 	m->pkt_len = pkt_length;
347 	m->data_off = (uint16_t)pkt->offset;
348 
349 	m_clone = rte_pktmbuf_copy(m, m->pool, 0, truncation_length);
350 	if (!m_clone) {
351 		p->stats.n_pkts_clone_err++;
352 		return;
353 	}
354 
355 	p->pkts[n_pkts++] = m_clone;
356 	p->n_pkts = n_pkts;
357 	p->n_bytes = n_bytes + pkt_length;
358 	p->stats.n_pkts_clone = n_pkts_clone + 1;
359 
360 	if (n_pkts == (int)p->params.burst_size)
361 		__writer_flush(p);
362 }
363 
364 static void
writer_flush(void * port)365 writer_flush(void *port)
366 {
367 	struct writer *p = port;
368 
369 	if (p->n_pkts && p->flush_flag)
370 		__writer_flush(p);
371 
372 	p->flush_flag = 1;
373 }
374 
375 static void
writer_free(void * port)376 writer_free(void *port)
377 {
378 	struct writer *p = port;
379 	int i;
380 
381 	if (!p)
382 		return;
383 
384 	for (i = 0; i < p->n_pkts; i++) {
385 		struct rte_mbuf *m = p->pkts[i];
386 
387 		rte_pktmbuf_free(m);
388 	}
389 
390 	free(p->pkts);
391 	free(p->params.name);
392 	free(port);
393 }
394 
395 static void
writer_stats_read(void * port,struct rte_swx_port_out_stats * stats)396 writer_stats_read(void *port, struct rte_swx_port_out_stats *stats)
397 {
398 	struct writer *p = port;
399 
400 	if (!stats)
401 		return;
402 
403 	memcpy(stats, &p->stats, sizeof(p->stats));
404 }
405 
406 /*
407  * Summary of port operations
408  */
409 struct rte_swx_port_in_ops rte_swx_port_ring_reader_ops = {
410 	.create = reader_create,
411 	.free = reader_free,
412 	.pkt_rx = reader_pkt_rx,
413 	.stats_read = reader_stats_read,
414 };
415 
416 struct rte_swx_port_out_ops rte_swx_port_ring_writer_ops = {
417 	.create = writer_create,
418 	.free = writer_free,
419 	.pkt_tx = writer_pkt_tx,
420 	.pkt_fast_clone_tx = writer_pkt_fast_clone_tx,
421 	.pkt_clone_tx = writer_pkt_clone_tx,
422 	.flush = writer_flush,
423 	.stats_read = writer_stats_read,
424 };
425