1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2019 Intel Corporation
3 */
4
5 #include <string.h>
6 #include <stdint.h>
7
8 #include <rte_mbuf.h>
9 #include <rte_malloc.h>
10
11 #include "rte_port_eventdev.h"
12
13 #include "port_log.h"
14
15 /*
16 * Port EVENTDEV Reader
17 */
18 #ifdef RTE_PORT_STATS_COLLECT
19
20 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val) \
21 do {port->stats.n_pkts_in += val;} while (0)
22 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val) \
23 do {port->stats.n_pkts_drop += val;} while (0)
24
25 #else
26
27 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val)
28 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val)
29
30 #endif
31
32 struct rte_port_eventdev_reader {
33 struct rte_port_in_stats stats;
34
35 uint8_t eventdev_id;
36 uint16_t port_id;
37
38 struct rte_event ev[RTE_PORT_IN_BURST_SIZE_MAX];
39 };
40
41 static void *
rte_port_eventdev_reader_create(void * params,int socket_id)42 rte_port_eventdev_reader_create(void *params, int socket_id)
43 {
44 struct rte_port_eventdev_reader_params *conf =
45 params;
46 struct rte_port_eventdev_reader *port;
47
48 /* Check input parameters */
49 if (conf == NULL) {
50 PORT_LOG(ERR, "%s: params is NULL", __func__);
51 return NULL;
52 }
53
54 /* Memory allocation */
55 port = rte_zmalloc_socket("PORT", sizeof(*port),
56 RTE_CACHE_LINE_SIZE, socket_id);
57 if (port == NULL) {
58 PORT_LOG(ERR, "%s: Failed to allocate port", __func__);
59 return NULL;
60 }
61
62 /* Initialization */
63 port->eventdev_id = conf->eventdev_id;
64 port->port_id = conf->port_id;
65
66 return port;
67 }
68
69 static int
rte_port_eventdev_reader_rx(void * port,struct rte_mbuf ** pkts,uint32_t n_pkts)70 rte_port_eventdev_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
71 {
72 struct rte_port_eventdev_reader *p = port;
73 uint16_t rx_evts_cnt, i;
74
75 rx_evts_cnt = rte_event_dequeue_burst(p->eventdev_id, p->port_id,
76 p->ev, n_pkts, 0);
77
78 for (i = 0; i < rx_evts_cnt; i++)
79 pkts[i] = p->ev[i].mbuf;
80
81 RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(p, rx_evts_cnt);
82
83 return rx_evts_cnt;
84 }
85
86 static int
rte_port_eventdev_reader_free(void * port)87 rte_port_eventdev_reader_free(void *port)
88 {
89 if (port == NULL) {
90 PORT_LOG(ERR, "%s: port is NULL", __func__);
91 return -EINVAL;
92 }
93
94 rte_free(port);
95
96 return 0;
97 }
98
rte_port_eventdev_reader_stats_read(void * port,struct rte_port_in_stats * stats,int clear)99 static int rte_port_eventdev_reader_stats_read(void *port,
100 struct rte_port_in_stats *stats, int clear)
101 {
102 struct rte_port_eventdev_reader *p =
103 port;
104
105 if (stats != NULL)
106 memcpy(stats, &p->stats, sizeof(p->stats));
107
108 if (clear)
109 memset(&p->stats, 0, sizeof(p->stats));
110
111 return 0;
112 }
113
114 /*
115 * Port EVENTDEV Writer
116 */
117 #ifdef RTE_PORT_STATS_COLLECT
118
119 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val) \
120 do {port->stats.n_pkts_in += val;} while (0)
121 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val) \
122 do {port->stats.n_pkts_drop += val;} while (0)
123
124 #else
125
126 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val)
127 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val)
128
129 #endif
130
131 struct rte_port_eventdev_writer {
132 struct rte_port_out_stats stats;
133
134 struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
135
136 uint32_t enq_burst_sz;
137 uint32_t enq_buf_count;
138 uint64_t bsz_mask;
139
140 uint8_t eventdev_id;
141 uint8_t port_id;
142 uint8_t queue_id;
143 uint8_t sched_type;
144 uint8_t evt_op;
145 };
146
147 static void *
rte_port_eventdev_writer_create(void * params,int socket_id)148 rte_port_eventdev_writer_create(void *params, int socket_id)
149 {
150 struct rte_port_eventdev_writer_params *conf =
151 params;
152 struct rte_port_eventdev_writer *port;
153 unsigned int i;
154
155 /* Check input parameters */
156 if ((conf == NULL) ||
157 (conf->enq_burst_sz == 0) ||
158 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
159 (!rte_is_power_of_2(conf->enq_burst_sz))) {
160 PORT_LOG(ERR, "%s: Invalid input parameters", __func__);
161 return NULL;
162 }
163
164 /* Memory allocation */
165 port = rte_zmalloc_socket("PORT", sizeof(*port),
166 RTE_CACHE_LINE_SIZE, socket_id);
167 if (port == NULL) {
168 PORT_LOG(ERR, "%s: Failed to allocate port", __func__);
169 return NULL;
170 }
171
172 /* Initialization */
173 port->enq_burst_sz = conf->enq_burst_sz;
174 port->enq_buf_count = 0;
175 port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
176
177 port->eventdev_id = conf->eventdev_id;
178 port->port_id = conf->port_id;
179 port->queue_id = conf->queue_id;
180 port->sched_type = conf->sched_type;
181 port->evt_op = conf->evt_op;
182 memset(&port->ev, 0, sizeof(port->ev));
183
184 for (i = 0; i < RTE_DIM(port->ev); i++) {
185 port->ev[i].queue_id = port->queue_id;
186 port->ev[i].sched_type = port->sched_type;
187 port->ev[i].op = port->evt_op;
188 }
189
190 return port;
191 }
192
193 static inline void
send_burst(struct rte_port_eventdev_writer * p)194 send_burst(struct rte_port_eventdev_writer *p)
195 {
196 uint32_t nb_enq;
197
198 nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
199 p->ev, p->enq_buf_count);
200
201 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p, p->enq_buf_count -
202 nb_enq);
203
204 for (; nb_enq < p->enq_buf_count; nb_enq++)
205 rte_pktmbuf_free(p->ev[nb_enq].mbuf);
206
207 p->enq_buf_count = 0;
208 }
209
210 static int
rte_port_eventdev_writer_tx(void * port,struct rte_mbuf * pkt)211 rte_port_eventdev_writer_tx(void *port, struct rte_mbuf *pkt)
212 {
213 struct rte_port_eventdev_writer *p = port;
214
215 p->ev[p->enq_buf_count++].mbuf = pkt;
216 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
217 if (p->enq_buf_count >= p->enq_burst_sz)
218 send_burst(p);
219
220 return 0;
221 }
222
223 static int
rte_port_eventdev_writer_tx_bulk(void * port,struct rte_mbuf ** pkts,uint64_t pkts_mask)224 rte_port_eventdev_writer_tx_bulk(void *port,
225 struct rte_mbuf **pkts,
226 uint64_t pkts_mask)
227 {
228 struct rte_port_eventdev_writer *p =
229 port;
230 uint64_t bsz_mask = p->bsz_mask;
231 uint32_t enq_buf_count = p->enq_buf_count;
232 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
233 ((pkts_mask & bsz_mask) ^ bsz_mask);
234
235 if (expr == 0) {
236 uint64_t n_pkts = rte_popcount64(pkts_mask);
237 uint32_t i, n_enq_ok;
238
239 if (enq_buf_count)
240 send_burst(p);
241
242 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
243
244 struct rte_event events[2 * RTE_PORT_IN_BURST_SIZE_MAX] = {};
245 for (i = 0; i < n_pkts; i++) {
246 events[i].mbuf = pkts[i];
247 events[i].queue_id = p->queue_id;
248 events[i].sched_type = p->sched_type;
249 events[i].op = p->evt_op;
250 }
251
252 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
253 events, n_pkts);
254
255 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p,
256 n_pkts - n_enq_ok);
257 for (; n_enq_ok < n_pkts; n_enq_ok++)
258 rte_pktmbuf_free(pkts[n_enq_ok]);
259
260 } else {
261 for (; pkts_mask;) {
262 uint32_t pkt_index = rte_ctz64(pkts_mask);
263 uint64_t pkt_mask = 1LLU << pkt_index;
264
265 p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
266
267 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
268 pkts_mask &= ~pkt_mask;
269 }
270
271 p->enq_buf_count = enq_buf_count;
272 if (enq_buf_count >= p->enq_burst_sz)
273 send_burst(p);
274 }
275
276 return 0;
277 }
278
279 static int
rte_port_eventdev_writer_flush(void * port)280 rte_port_eventdev_writer_flush(void *port)
281 {
282 struct rte_port_eventdev_writer *p =
283 port;
284
285 if (p->enq_buf_count > 0)
286 send_burst(p);
287
288 return 0;
289 }
290
291 static int
rte_port_eventdev_writer_free(void * port)292 rte_port_eventdev_writer_free(void *port)
293 {
294 if (port == NULL) {
295 PORT_LOG(ERR, "%s: Port is NULL", __func__);
296 return -EINVAL;
297 }
298
299 rte_port_eventdev_writer_flush(port);
300 rte_free(port);
301
302 return 0;
303 }
304
rte_port_eventdev_writer_stats_read(void * port,struct rte_port_out_stats * stats,int clear)305 static int rte_port_eventdev_writer_stats_read(void *port,
306 struct rte_port_out_stats *stats, int clear)
307 {
308 struct rte_port_eventdev_writer *p =
309 port;
310
311 if (stats != NULL)
312 memcpy(stats, &p->stats, sizeof(p->stats));
313
314 if (clear)
315 memset(&p->stats, 0, sizeof(p->stats));
316
317 return 0;
318 }
319
320 /*
321 * Port EVENTDEV Writer Nodrop
322 */
323 #ifdef RTE_PORT_STATS_COLLECT
324
325 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
326 do {port->stats.n_pkts_in += val;} while (0)
327 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
328 do {port->stats.n_pkts_drop += val;} while (0)
329
330 #else
331
332 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
333 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
334
335 #endif
336
337 struct rte_port_eventdev_writer_nodrop {
338 struct rte_port_out_stats stats;
339
340 struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
341
342 uint32_t enq_burst_sz;
343 uint32_t enq_buf_count;
344 uint64_t bsz_mask;
345 uint64_t n_retries;
346 uint8_t eventdev_id;
347 uint8_t port_id;
348 uint8_t queue_id;
349 uint8_t sched_type;
350 uint8_t evt_op;
351 };
352
353
354 static void *
rte_port_eventdev_writer_nodrop_create(void * params,int socket_id)355 rte_port_eventdev_writer_nodrop_create(void *params, int socket_id)
356 {
357 struct rte_port_eventdev_writer_nodrop_params *conf =
358 params;
359 struct rte_port_eventdev_writer_nodrop *port;
360 unsigned int i;
361
362 /* Check input parameters */
363 if ((conf == NULL) ||
364 (conf->enq_burst_sz == 0) ||
365 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
366 (!rte_is_power_of_2(conf->enq_burst_sz))) {
367 PORT_LOG(ERR, "%s: Invalid input parameters", __func__);
368 return NULL;
369 }
370
371 /* Memory allocation */
372 port = rte_zmalloc_socket("PORT", sizeof(*port),
373 RTE_CACHE_LINE_SIZE, socket_id);
374 if (port == NULL) {
375 PORT_LOG(ERR, "%s: Failed to allocate port", __func__);
376 return NULL;
377 }
378
379 /* Initialization */
380 port->enq_burst_sz = conf->enq_burst_sz;
381 port->enq_buf_count = 0;
382 port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
383
384 port->eventdev_id = conf->eventdev_id;
385 port->port_id = conf->port_id;
386 port->queue_id = conf->queue_id;
387 port->sched_type = conf->sched_type;
388 port->evt_op = conf->evt_op;
389 memset(&port->ev, 0, sizeof(port->ev));
390
391 for (i = 0; i < RTE_DIM(port->ev); i++) {
392 port->ev[i].queue_id = port->queue_id;
393 port->ev[i].sched_type = port->sched_type;
394 port->ev[i].op = port->evt_op;
395 }
396 /*
397 * When n_retries is 0 it means that we should wait for every event to
398 * send no matter how many retries should it take. To limit number of
399 * branches in fast path, we use UINT64_MAX instead of branching.
400 */
401 port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
402
403 return port;
404 }
405
406 static inline void
send_burst_nodrop(struct rte_port_eventdev_writer_nodrop * p)407 send_burst_nodrop(struct rte_port_eventdev_writer_nodrop *p)
408 {
409 uint32_t nb_enq, i;
410
411 nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
412 p->ev, p->enq_buf_count);
413
414 /* We sent all the packets in a first try */
415 if (nb_enq >= p->enq_buf_count) {
416 p->enq_buf_count = 0;
417 return;
418 }
419
420 for (i = 0; i < p->n_retries; i++) {
421 nb_enq += rte_event_enqueue_burst(p->eventdev_id, p->port_id,
422 p->ev + nb_enq,
423 p->enq_buf_count - nb_enq);
424
425 /* We sent all the events in more than one try */
426 if (nb_enq >= p->enq_buf_count) {
427 p->enq_buf_count = 0;
428 return;
429 }
430 }
431 /* We didn't send the events in maximum allowed attempts */
432 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(p,
433 p->enq_buf_count - nb_enq);
434 for (; nb_enq < p->enq_buf_count; nb_enq++)
435 rte_pktmbuf_free(p->ev[nb_enq].mbuf);
436
437 p->enq_buf_count = 0;
438 }
439
440 static int
rte_port_eventdev_writer_nodrop_tx(void * port,struct rte_mbuf * pkt)441 rte_port_eventdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
442 {
443 struct rte_port_eventdev_writer_nodrop *p = port;
444
445 p->ev[p->enq_buf_count++].mbuf = pkt;
446
447 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
448 if (p->enq_buf_count >= p->enq_burst_sz)
449 send_burst_nodrop(p);
450
451 return 0;
452 }
453
454 static int
rte_port_eventdev_writer_nodrop_tx_bulk(void * port,struct rte_mbuf ** pkts,uint64_t pkts_mask)455 rte_port_eventdev_writer_nodrop_tx_bulk(void *port,
456 struct rte_mbuf **pkts,
457 uint64_t pkts_mask)
458 {
459 struct rte_port_eventdev_writer_nodrop *p =
460 port;
461
462 uint64_t bsz_mask = p->bsz_mask;
463 uint32_t enq_buf_count = p->enq_buf_count;
464 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
465 ((pkts_mask & bsz_mask) ^ bsz_mask);
466
467 if (expr == 0) {
468 uint64_t n_pkts = rte_popcount64(pkts_mask);
469 uint32_t i, n_enq_ok;
470
471 if (enq_buf_count)
472 send_burst_nodrop(p);
473
474 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
475
476 struct rte_event events[RTE_PORT_IN_BURST_SIZE_MAX] = {};
477
478 for (i = 0; i < n_pkts; i++) {
479 events[i].mbuf = pkts[i];
480 events[i].queue_id = p->queue_id;
481 events[i].sched_type = p->sched_type;
482 events[i].op = p->evt_op;
483 }
484
485 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
486 events, n_pkts);
487
488 if (n_enq_ok >= n_pkts)
489 return 0;
490
491 /*
492 * If we did not manage to enqueue all events in single burst,
493 * move remaining events to the buffer and call send burst.
494 */
495 for (; n_enq_ok < n_pkts; n_enq_ok++) {
496 struct rte_mbuf *pkt = pkts[n_enq_ok];
497 p->ev[p->enq_buf_count++].mbuf = pkt;
498 }
499 send_burst_nodrop(p);
500 } else {
501 for (; pkts_mask;) {
502 uint32_t pkt_index = rte_ctz64(pkts_mask);
503 uint64_t pkt_mask = 1LLU << pkt_index;
504
505 p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
506
507 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
508 pkts_mask &= ~pkt_mask;
509 }
510
511 p->enq_buf_count = enq_buf_count;
512 if (enq_buf_count >= p->enq_burst_sz)
513 send_burst_nodrop(p);
514 }
515
516 return 0;
517 }
518
519 static int
rte_port_eventdev_writer_nodrop_flush(void * port)520 rte_port_eventdev_writer_nodrop_flush(void *port)
521 {
522 struct rte_port_eventdev_writer_nodrop *p =
523 port;
524
525 if (p->enq_buf_count > 0)
526 send_burst_nodrop(p);
527
528 return 0;
529 }
530
531 static int
rte_port_eventdev_writer_nodrop_free(void * port)532 rte_port_eventdev_writer_nodrop_free(void *port)
533 {
534 if (port == NULL) {
535 PORT_LOG(ERR, "%s: Port is NULL", __func__);
536 return -EINVAL;
537 }
538
539 rte_port_eventdev_writer_nodrop_flush(port);
540 rte_free(port);
541
542 return 0;
543 }
544
rte_port_eventdev_writer_nodrop_stats_read(void * port,struct rte_port_out_stats * stats,int clear)545 static int rte_port_eventdev_writer_nodrop_stats_read(void *port,
546 struct rte_port_out_stats *stats, int clear)
547 {
548 struct rte_port_eventdev_writer_nodrop *p =
549 port;
550
551 if (stats != NULL)
552 memcpy(stats, &p->stats, sizeof(p->stats));
553
554 if (clear)
555 memset(&p->stats, 0, sizeof(p->stats));
556
557 return 0;
558 }
559
560 /*
561 * Summary of port operations
562 */
563 struct rte_port_in_ops rte_port_eventdev_reader_ops = {
564 .f_create = rte_port_eventdev_reader_create,
565 .f_free = rte_port_eventdev_reader_free,
566 .f_rx = rte_port_eventdev_reader_rx,
567 .f_stats = rte_port_eventdev_reader_stats_read,
568 };
569
570 struct rte_port_out_ops rte_port_eventdev_writer_ops = {
571 .f_create = rte_port_eventdev_writer_create,
572 .f_free = rte_port_eventdev_writer_free,
573 .f_tx = rte_port_eventdev_writer_tx,
574 .f_tx_bulk = rte_port_eventdev_writer_tx_bulk,
575 .f_flush = rte_port_eventdev_writer_flush,
576 .f_stats = rte_port_eventdev_writer_stats_read,
577 };
578
579 struct rte_port_out_ops rte_port_eventdev_writer_nodrop_ops = {
580 .f_create = rte_port_eventdev_writer_nodrop_create,
581 .f_free = rte_port_eventdev_writer_nodrop_free,
582 .f_tx = rte_port_eventdev_writer_nodrop_tx,
583 .f_tx_bulk = rte_port_eventdev_writer_nodrop_tx_bulk,
584 .f_flush = rte_port_eventdev_writer_nodrop_flush,
585 .f_stats = rte_port_eventdev_writer_nodrop_stats_read,
586 };
587