xref: /dpdk/lib/port/rte_swx_port_ring.c (revision 42a8fc7daa46256d150278fc9a7a846e27945a0c)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2021 Intel Corporation
3  */
4 #include <string.h>
5 #include <stdint.h>
6 
7 #include <rte_mbuf.h>
8 #include <rte_ring.h>
9 #include <rte_hexdump.h>
10 
11 #include "rte_swx_port_ring.h"
12 
13 #ifndef TRACE_LEVEL
14 #define TRACE_LEVEL 0
15 #endif
16 
17 #if TRACE_LEVEL
18 #define TRACE(...) printf(__VA_ARGS__)
19 #else
20 #define TRACE(...)
21 #endif
22 
23 /*
24  * Reader
25  */
26 struct reader {
27 	struct {
28 		struct rte_ring *ring;
29 		char *name;
30 		uint32_t burst_size;
31 	} params;
32 	struct rte_swx_port_in_stats stats;
33 	struct rte_mbuf **pkts;
34 	int n_pkts;
35 	int pos;
36 };
37 
38 static void *
39 reader_create(void *args)
40 {
41 	struct rte_swx_port_ring_reader_params *params = args;
42 	struct rte_ring *ring;
43 	struct reader *p = NULL;
44 
45 	/* Check input parameters. */
46 	if (!params || !params->name || !params->burst_size)
47 		goto error;
48 
49 	ring = rte_ring_lookup(params->name);
50 	if (!ring)
51 		goto error;
52 
53 	/* Memory allocation. */
54 	p = calloc(1, sizeof(struct reader));
55 	if (!p)
56 		goto error;
57 
58 	p->params.name = strdup(params->name);
59 	if (!p->params.name)
60 		goto error;
61 
62 	p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
63 	if (!p->pkts)
64 		goto error;
65 
66 	/* Initialization. */
67 	p->params.ring = ring;
68 	p->params.burst_size = params->burst_size;
69 
70 	return p;
71 
72 error:
73 	if (!p)
74 		return NULL;
75 
76 	free(p->pkts);
77 	free(p->params.name);
78 	free(p);
79 	return NULL;
80 }
81 
82 static int
83 reader_pkt_rx(void *port, struct rte_swx_pkt *pkt)
84 {
85 	struct reader *p = port;
86 	struct rte_mbuf *m;
87 
88 	if (p->pos == p->n_pkts) {
89 		int n_pkts;
90 
91 		n_pkts = rte_ring_sc_dequeue_burst(p->params.ring,
92 			(void **) p->pkts,
93 			p->params.burst_size,
94 			NULL);
95 		if (!n_pkts) {
96 			p->stats.n_empty++;
97 			return 0;
98 		}
99 
100 		TRACE("[Ring %s] %d packets in\n",
101 		      p->params.name,
102 		      n_pkts);
103 
104 		p->n_pkts = n_pkts;
105 		p->pos = 0;
106 	}
107 
108 	m = p->pkts[p->pos++];
109 	pkt->handle = m;
110 	pkt->pkt = m->buf_addr;
111 	pkt->offset = m->data_off;
112 	pkt->length = m->pkt_len;
113 
114 	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n",
115 	      (uint32_t)p->params.name,
116 	      p->pos - 1,
117 	      pkt->length,
118 	      pkt->offset);
119 	if (TRACE_LEVEL)
120 		rte_hexdump(stdout,
121 			    NULL,
122 			    &((uint8_t *)m->buf_addr)[m->data_off],
123 			    m->data_len);
124 
125 	p->stats.n_pkts++;
126 	p->stats.n_bytes += pkt->length;
127 
128 	return 1;
129 }
130 
131 static void
132 reader_free(void *port)
133 {
134 	struct reader *p = port;
135 	int i;
136 
137 	if (!p)
138 		return;
139 
140 	for (i = 0; i < p->n_pkts; i++) {
141 		struct rte_mbuf *pkt = p->pkts[i];
142 
143 		rte_pktmbuf_free(pkt);
144 	}
145 
146 	free(p->pkts);
147 	free(p->params.name);
148 	free(p);
149 }
150 
151 static void
152 reader_stats_read(void *port, struct rte_swx_port_in_stats *stats)
153 {
154 	struct reader *p = port;
155 
156 	if (!stats)
157 		return;
158 
159 	memcpy(stats, &p->stats, sizeof(p->stats));
160 }
161 
162 /*
163  * Writer
164  */
165 struct writer {
166 	struct {
167 		struct rte_ring *ring;
168 		char *name;
169 		uint32_t burst_size;
170 	} params;
171 	struct rte_swx_port_out_stats stats;
172 
173 	struct rte_mbuf **pkts;
174 	int n_pkts;
175 };
176 
177 static void *
178 writer_create(void *args)
179 {
180 	struct rte_swx_port_ring_writer_params *params = args;
181 	struct rte_ring *ring;
182 	struct writer *p = NULL;
183 
184 	/* Check input parameters. */
185 	if (!params || !params->name || !params->burst_size)
186 		goto error;
187 
188 	ring = rte_ring_lookup(params->name);
189 	if (!ring)
190 		goto error;
191 
192 	/* Memory allocation. */
193 	p = calloc(1, sizeof(struct writer));
194 	if (!p)
195 		goto error;
196 
197 	p->params.name = strdup(params->name);
198 	if (!p->params.name)
199 		goto error;
200 
201 	p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
202 	if (!p->pkts)
203 		goto error;
204 
205 	/* Initialization. */
206 	p->params.ring = ring;
207 	p->params.burst_size = params->burst_size;
208 
209 	return p;
210 
211 error:
212 	if (!p)
213 		return NULL;
214 
215 	free(p->params.name);
216 	free(p->pkts);
217 	free(p);
218 	return NULL;
219 }
220 
221 static void
222 __writer_flush(struct writer *p)
223 {
224 	int n_pkts;
225 
226 	for (n_pkts = 0; ; ) {
227 		n_pkts += rte_ring_sp_enqueue_burst(p->params.ring,
228 						    (void **)p->pkts + n_pkts,
229 						    p->n_pkts - n_pkts,
230 						    NULL);
231 
232 		TRACE("[Ring %s] %d packets out\n", p->params.name, n_pkts);
233 
234 		if (n_pkts == p->n_pkts)
235 			break;
236 	}
237 
238 	p->n_pkts = 0;
239 }
240 
241 static void
242 writer_pkt_tx(void *port, struct rte_swx_pkt *pkt)
243 {
244 	struct writer *p = port;
245 	struct rte_mbuf *m = pkt->handle;
246 
247 	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n",
248 	      p->params.name,
249 	      p->n_pkts - 1,
250 	      pkt->length,
251 	      pkt->offset);
252 	if (TRACE_LEVEL)
253 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
254 
255 	m->data_len = (uint16_t)(pkt->length + m->data_len - m->pkt_len);
256 	m->pkt_len = pkt->length;
257 	m->data_off = (uint16_t)pkt->offset;
258 
259 	p->stats.n_pkts++;
260 	p->stats.n_bytes += pkt->length;
261 
262 	p->pkts[p->n_pkts++] = m;
263 	if (p->n_pkts ==  (int)p->params.burst_size)
264 		__writer_flush(p);
265 }
266 
267 static void
268 writer_pkt_fast_clone_tx(void *port, struct rte_swx_pkt *pkt)
269 {
270 	struct writer *p = port;
271 	struct rte_mbuf *m = pkt->handle;
272 
273 	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u) (fast clone)\n",
274 	      p->params.name,
275 	      p->n_pkts - 1,
276 	      pkt->length,
277 	      pkt->offset);
278 	if (TRACE_LEVEL)
279 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
280 
281 	m->data_len = (uint16_t)(pkt->length + m->data_len - m->pkt_len);
282 	m->pkt_len = pkt->length;
283 	m->data_off = (uint16_t)pkt->offset;
284 	rte_pktmbuf_refcnt_update(m, 1);
285 
286 	p->stats.n_pkts++;
287 	p->stats.n_bytes += pkt->length;
288 	p->stats.n_pkts_clone++;
289 
290 	p->pkts[p->n_pkts++] = m;
291 	if (p->n_pkts == (int)p->params.burst_size)
292 		__writer_flush(p);
293 }
294 
295 static void
296 writer_pkt_clone_tx(void *port, struct rte_swx_pkt *pkt, uint32_t truncation_length)
297 {
298 	struct writer *p = port;
299 	struct rte_mbuf *m = pkt->handle, *m_clone;
300 
301 	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u) (clone)\n",
302 	      p->params.name,
303 	      p->n_pkts - 1,
304 	      pkt->length,
305 	      pkt->offset);
306 	if (TRACE_LEVEL)
307 		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
308 
309 	m->data_len = (uint16_t)(pkt->length + m->data_len - m->pkt_len);
310 	m->pkt_len = pkt->length;
311 	m->data_off = (uint16_t)pkt->offset;
312 
313 	m_clone = rte_pktmbuf_copy(m, m->pool, 0, truncation_length);
314 	if (!m_clone) {
315 		p->stats.n_pkts_clone_err++;
316 		return;
317 	}
318 
319 	p->stats.n_pkts++;
320 	p->stats.n_bytes += pkt->length;
321 	p->stats.n_pkts_clone++;
322 
323 	p->pkts[p->n_pkts++] = m_clone;
324 	if (p->n_pkts == (int)p->params.burst_size)
325 		__writer_flush(p);
326 }
327 
328 static void
329 writer_flush(void *port)
330 {
331 	struct writer *p = port;
332 
333 	if (p->n_pkts)
334 		__writer_flush(p);
335 }
336 
337 static void
338 writer_free(void *port)
339 {
340 	struct writer *p = port;
341 
342 	if (!p)
343 		return;
344 
345 	writer_flush(p);
346 	free(p->pkts);
347 	free(p->params.name);
348 	free(port);
349 }
350 
351 static void
352 writer_stats_read(void *port, struct rte_swx_port_out_stats *stats)
353 {
354 	struct writer *p = port;
355 
356 	if (!stats)
357 		return;
358 
359 	memcpy(stats, &p->stats, sizeof(p->stats));
360 }
361 
362 /*
363  * Summary of port operations
364  */
365 struct rte_swx_port_in_ops rte_swx_port_ring_reader_ops = {
366 	.create = reader_create,
367 	.free = reader_free,
368 	.pkt_rx = reader_pkt_rx,
369 	.stats_read = reader_stats_read,
370 };
371 
372 struct rte_swx_port_out_ops rte_swx_port_ring_writer_ops = {
373 	.create = writer_create,
374 	.free = writer_free,
375 	.pkt_tx = writer_pkt_tx,
376 	.pkt_fast_clone_tx = writer_pkt_fast_clone_tx,
377 	.pkt_clone_tx = writer_pkt_clone_tx,
378 	.flush = writer_flush,
379 	.stats_read = writer_stats_read,
380 };
381