1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2021 Intel Corporation
3 */
4 #include <string.h>
5 #include <stdint.h>
6 #include <stdlib.h>
7
8 #include <rte_mbuf.h>
9 #include <rte_ring.h>
10 #include <rte_hexdump.h>
11
12 #include "rte_swx_port_ring.h"
13
14 #ifndef TRACE_LEVEL
15 #define TRACE_LEVEL 0
16 #endif
17
18 #if TRACE_LEVEL
19 #define TRACE(...) printf(__VA_ARGS__)
20 #else
21 #define TRACE(...)
22 #endif
23
24 /*
25 * Reader
26 */
27 struct reader {
28 struct {
29 struct rte_ring *ring;
30 char *name;
31 uint32_t burst_size;
32 } params;
33 struct rte_swx_port_in_stats stats;
34 struct rte_mbuf **pkts;
35 int n_pkts;
36 int pos;
37 };
38
39 static void *
reader_create(void * args)40 reader_create(void *args)
41 {
42 struct rte_swx_port_ring_reader_params *params = args;
43 struct rte_ring *ring;
44 struct reader *p = NULL;
45
46 /* Check input parameters. */
47 if (!params || !params->name || !params->burst_size)
48 goto error;
49
50 ring = rte_ring_lookup(params->name);
51 if (!ring)
52 goto error;
53
54 /* Memory allocation. */
55 p = calloc(1, sizeof(struct reader));
56 if (!p)
57 goto error;
58
59 p->params.name = strdup(params->name);
60 if (!p->params.name)
61 goto error;
62
63 p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
64 if (!p->pkts)
65 goto error;
66
67 /* Initialization. */
68 p->params.ring = ring;
69 p->params.burst_size = params->burst_size;
70
71 return p;
72
73 error:
74 if (!p)
75 return NULL;
76
77 free(p->pkts);
78 free(p->params.name);
79 free(p);
80 return NULL;
81 }
82
83 static int
reader_pkt_rx(void * port,struct rte_swx_pkt * pkt)84 reader_pkt_rx(void *port, struct rte_swx_pkt *pkt)
85 {
86 struct reader *p = port;
87 struct rte_mbuf *m;
88
89 if (p->pos == p->n_pkts) {
90 int n_pkts;
91
92 n_pkts = rte_ring_sc_dequeue_burst(p->params.ring,
93 (void **) p->pkts,
94 p->params.burst_size,
95 NULL);
96 if (!n_pkts) {
97 p->stats.n_empty++;
98 return 0;
99 }
100
101 TRACE("[Ring %s] %d packets in\n",
102 p->params.name,
103 n_pkts);
104
105 p->n_pkts = n_pkts;
106 p->pos = 0;
107 }
108
109 m = p->pkts[p->pos++];
110 pkt->handle = m;
111 pkt->pkt = m->buf_addr;
112 pkt->offset = m->data_off;
113 pkt->length = m->pkt_len;
114
115 TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n",
116 (uint32_t)p->params.name,
117 p->pos - 1,
118 pkt->length,
119 pkt->offset);
120 if (TRACE_LEVEL)
121 rte_hexdump(stdout,
122 NULL,
123 &((uint8_t *)m->buf_addr)[m->data_off],
124 m->data_len);
125
126 p->stats.n_pkts++;
127 p->stats.n_bytes += pkt->length;
128
129 return 1;
130 }
131
132 static void
reader_free(void * port)133 reader_free(void *port)
134 {
135 struct reader *p = port;
136 int i;
137
138 if (!p)
139 return;
140
141 for (i = 0; i < p->n_pkts; i++) {
142 struct rte_mbuf *pkt = p->pkts[i];
143
144 rte_pktmbuf_free(pkt);
145 }
146
147 free(p->pkts);
148 free(p->params.name);
149 free(p);
150 }
151
152 static void
reader_stats_read(void * port,struct rte_swx_port_in_stats * stats)153 reader_stats_read(void *port, struct rte_swx_port_in_stats *stats)
154 {
155 struct reader *p = port;
156
157 if (!stats)
158 return;
159
160 memcpy(stats, &p->stats, sizeof(p->stats));
161 }
162
163 /*
164 * Writer
165 */
166 struct writer {
167 struct {
168 struct rte_ring *ring;
169 char *name;
170 uint32_t burst_size;
171 } params;
172 struct rte_swx_port_out_stats stats;
173
174 struct rte_mbuf **pkts;
175 int n_pkts;
176 uint32_t n_bytes;
177 int flush_flag;
178 };
179
180 static void *
writer_create(void * args)181 writer_create(void *args)
182 {
183 struct rte_swx_port_ring_writer_params *params = args;
184 struct rte_ring *ring;
185 struct writer *p = NULL;
186
187 /* Check input parameters. */
188 if (!params || !params->name || !params->burst_size)
189 goto error;
190
191 ring = rte_ring_lookup(params->name);
192 if (!ring)
193 goto error;
194
195 /* Memory allocation. */
196 p = calloc(1, sizeof(struct writer));
197 if (!p)
198 goto error;
199
200 p->params.name = strdup(params->name);
201 if (!p->params.name)
202 goto error;
203
204 p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
205 if (!p->pkts)
206 goto error;
207
208 /* Initialization. */
209 p->params.ring = ring;
210 p->params.burst_size = params->burst_size;
211
212 return p;
213
214 error:
215 if (!p)
216 return NULL;
217
218 free(p->params.name);
219 free(p->pkts);
220 free(p);
221 return NULL;
222 }
223
224 static inline void
__writer_flush(struct writer * p)225 __writer_flush(struct writer *p)
226 {
227 struct rte_mbuf **pkts = p->pkts;
228 uint64_t n_pkts_total = p->stats.n_pkts;
229 uint64_t n_bytes_total = p->stats.n_bytes;
230 uint64_t n_pkts_drop_total = p->stats.n_pkts_drop;
231 uint64_t n_bytes_drop_total = p->stats.n_bytes_drop;
232 int n_pkts = p->n_pkts, n_pkts_drop, n_pkts_tx;
233 uint32_t n_bytes = p->n_bytes, n_bytes_drop = 0;
234
235 /* Packet TX. */
236 n_pkts_tx = rte_ring_sp_enqueue_burst(p->params.ring,
237 (void **)pkts,
238 n_pkts,
239 NULL);
240
241 /* Packet drop. */
242 n_pkts_drop = n_pkts - n_pkts_tx;
243
244 for ( ; n_pkts_tx < n_pkts; n_pkts_tx++) {
245 struct rte_mbuf *m = pkts[n_pkts_tx];
246
247 n_bytes_drop += m->pkt_len;
248 rte_pktmbuf_free(m);
249 }
250
251 /* Port update. */
252 p->stats.n_pkts = n_pkts_total + n_pkts - n_pkts_drop;
253 p->stats.n_bytes = n_bytes_total + n_bytes - n_bytes_drop;
254 p->stats.n_pkts_drop = n_pkts_drop_total + n_pkts_drop;
255 p->stats.n_bytes_drop = n_bytes_drop_total + n_bytes_drop;
256 p->n_pkts = 0;
257 p->n_bytes = 0;
258 p->flush_flag = 0;
259
260 TRACE("[Ring %s] Buffered packets flushed: %d out, %d dropped\n",
261 p->params.name,
262 n_pkts - n_pkts_drop,
263 n_pkts_drop);
264 }
265
266 static void
writer_pkt_tx(void * port,struct rte_swx_pkt * pkt)267 writer_pkt_tx(void *port, struct rte_swx_pkt *pkt)
268 {
269 struct writer *p = port;
270 int n_pkts = p->n_pkts;
271 uint32_t n_bytes = p->n_bytes;
272 struct rte_mbuf *m = pkt->handle;
273 uint32_t pkt_length = pkt->length;
274
275 TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n",
276 p->params.name,
277 p->n_pkts - 1,
278 pkt->length,
279 pkt->offset);
280 if (TRACE_LEVEL)
281 rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
282
283 m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
284 m->pkt_len = pkt_length;
285 m->data_off = (uint16_t)pkt->offset;
286
287 p->pkts[n_pkts++] = m;
288 p->n_pkts = n_pkts;
289 p->n_bytes = n_bytes + pkt_length;
290
291 if (n_pkts == (int)p->params.burst_size)
292 __writer_flush(p);
293 }
294
295 static void
writer_pkt_fast_clone_tx(void * port,struct rte_swx_pkt * pkt)296 writer_pkt_fast_clone_tx(void *port, struct rte_swx_pkt *pkt)
297 {
298 struct writer *p = port;
299 int n_pkts = p->n_pkts;
300 uint32_t n_bytes = p->n_bytes;
301 uint64_t n_pkts_clone = p->stats.n_pkts_clone;
302 struct rte_mbuf *m = pkt->handle;
303 uint32_t pkt_length = pkt->length;
304
305 TRACE("[Ring %s] Pkt %d (%u bytes at offset %u) (fast clone)\n",
306 p->params.name,
307 p->n_pkts - 1,
308 pkt->length,
309 pkt->offset);
310 if (TRACE_LEVEL)
311 rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
312
313 m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
314 m->pkt_len = pkt_length;
315 m->data_off = (uint16_t)pkt->offset;
316 rte_pktmbuf_refcnt_update(m, 1);
317
318 p->pkts[n_pkts++] = m;
319 p->n_pkts = n_pkts;
320 p->n_bytes = n_bytes + pkt_length;
321 p->stats.n_pkts_clone = n_pkts_clone + 1;
322
323 if (n_pkts == (int)p->params.burst_size)
324 __writer_flush(p);
325 }
326
327 static void
writer_pkt_clone_tx(void * port,struct rte_swx_pkt * pkt,uint32_t truncation_length)328 writer_pkt_clone_tx(void *port, struct rte_swx_pkt *pkt, uint32_t truncation_length)
329 {
330 struct writer *p = port;
331 int n_pkts = p->n_pkts;
332 uint32_t n_bytes = p->n_bytes;
333 uint64_t n_pkts_clone = p->stats.n_pkts_clone;
334 struct rte_mbuf *m = pkt->handle, *m_clone;
335 uint32_t pkt_length = pkt->length;
336
337 TRACE("[Ring %s] Pkt %d (%u bytes at offset %u) (clone)\n",
338 p->params.name,
339 p->n_pkts - 1,
340 pkt->length,
341 pkt->offset);
342 if (TRACE_LEVEL)
343 rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
344
345 m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len);
346 m->pkt_len = pkt_length;
347 m->data_off = (uint16_t)pkt->offset;
348
349 m_clone = rte_pktmbuf_copy(m, m->pool, 0, truncation_length);
350 if (!m_clone) {
351 p->stats.n_pkts_clone_err++;
352 return;
353 }
354
355 p->pkts[n_pkts++] = m_clone;
356 p->n_pkts = n_pkts;
357 p->n_bytes = n_bytes + pkt_length;
358 p->stats.n_pkts_clone = n_pkts_clone + 1;
359
360 if (n_pkts == (int)p->params.burst_size)
361 __writer_flush(p);
362 }
363
364 static void
writer_flush(void * port)365 writer_flush(void *port)
366 {
367 struct writer *p = port;
368
369 if (p->n_pkts && p->flush_flag)
370 __writer_flush(p);
371
372 p->flush_flag = 1;
373 }
374
375 static void
writer_free(void * port)376 writer_free(void *port)
377 {
378 struct writer *p = port;
379 int i;
380
381 if (!p)
382 return;
383
384 for (i = 0; i < p->n_pkts; i++) {
385 struct rte_mbuf *m = p->pkts[i];
386
387 rte_pktmbuf_free(m);
388 }
389
390 free(p->pkts);
391 free(p->params.name);
392 free(port);
393 }
394
395 static void
writer_stats_read(void * port,struct rte_swx_port_out_stats * stats)396 writer_stats_read(void *port, struct rte_swx_port_out_stats *stats)
397 {
398 struct writer *p = port;
399
400 if (!stats)
401 return;
402
403 memcpy(stats, &p->stats, sizeof(p->stats));
404 }
405
406 /*
407 * Summary of port operations
408 */
409 struct rte_swx_port_in_ops rte_swx_port_ring_reader_ops = {
410 .create = reader_create,
411 .free = reader_free,
412 .pkt_rx = reader_pkt_rx,
413 .stats_read = reader_stats_read,
414 };
415
416 struct rte_swx_port_out_ops rte_swx_port_ring_writer_ops = {
417 .create = writer_create,
418 .free = writer_free,
419 .pkt_tx = writer_pkt_tx,
420 .pkt_fast_clone_tx = writer_pkt_fast_clone_tx,
421 .pkt_clone_tx = writer_pkt_clone_tx,
422 .flush = writer_flush,
423 .stats_read = writer_stats_read,
424 };
425