xref: /dpdk/lib/port/rte_port_eventdev.c (revision daa02b5cddbb8e11b31d41e2bf7bb1ae64dcae2f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2019 Intel Corporation
3  */
4 
5 #include <string.h>
6 #include <stdint.h>
7 
8 #include <rte_mbuf.h>
9 #include <rte_malloc.h>
10 
11 #include "rte_port_eventdev.h"
12 
13 /*
14  * Port EVENTDEV Reader
15  */
16 #ifdef RTE_PORT_STATS_COLLECT
17 
18 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val) \
19 	do {port->stats.n_pkts_in += val;} while (0)
20 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val) \
21 	do {port->stats.n_pkts_drop += val;} while (0)
22 
23 #else
24 
25 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val)
26 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val)
27 
28 #endif
29 
30 struct rte_port_eventdev_reader {
31 	struct rte_port_in_stats stats;
32 
33 	uint8_t  eventdev_id;
34 	uint16_t port_id;
35 
36 	struct rte_event ev[RTE_PORT_IN_BURST_SIZE_MAX];
37 };
38 
39 static void *
40 rte_port_eventdev_reader_create(void *params, int socket_id)
41 {
42 	struct rte_port_eventdev_reader_params *conf =
43 			params;
44 	struct rte_port_eventdev_reader *port;
45 
46 	/* Check input parameters */
47 	if (conf == NULL) {
48 		RTE_LOG(ERR, PORT, "%s: params is NULL\n", __func__);
49 		return NULL;
50 	}
51 
52 	/* Memory allocation */
53 	port = rte_zmalloc_socket("PORT", sizeof(*port),
54 		RTE_CACHE_LINE_SIZE, socket_id);
55 	if (port == NULL) {
56 		RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
57 		return NULL;
58 	}
59 
60 	/* Initialization */
61 	port->eventdev_id = conf->eventdev_id;
62 	port->port_id = conf->port_id;
63 
64 	return port;
65 }
66 
67 static int
68 rte_port_eventdev_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
69 {
70 	struct rte_port_eventdev_reader *p = port;
71 	uint16_t rx_evts_cnt, i;
72 
73 	rx_evts_cnt = rte_event_dequeue_burst(p->eventdev_id, p->port_id,
74 			p->ev, n_pkts, 0);
75 
76 	for (i = 0; i < rx_evts_cnt; i++)
77 		pkts[i] = p->ev[i].mbuf;
78 
79 	RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(p, rx_evts_cnt);
80 
81 	return rx_evts_cnt;
82 }
83 
84 static int
85 rte_port_eventdev_reader_free(void *port)
86 {
87 	if (port == NULL) {
88 		RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
89 		return -EINVAL;
90 	}
91 
92 	rte_free(port);
93 
94 	return 0;
95 }
96 
97 static int rte_port_eventdev_reader_stats_read(void *port,
98 	struct rte_port_in_stats *stats, int clear)
99 {
100 	struct rte_port_eventdev_reader *p =
101 			port;
102 
103 	if (stats != NULL)
104 		memcpy(stats, &p->stats, sizeof(p->stats));
105 
106 	if (clear)
107 		memset(&p->stats, 0, sizeof(p->stats));
108 
109 	return 0;
110 }
111 
112 /*
113  * Port EVENTDEV Writer
114  */
115 #ifdef RTE_PORT_STATS_COLLECT
116 
117 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val) \
118 	do {port->stats.n_pkts_in += val;} while (0)
119 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val) \
120 	do {port->stats.n_pkts_drop += val;} while (0)
121 
122 #else
123 
124 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val)
125 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val)
126 
127 #endif
128 
129 struct rte_port_eventdev_writer {
130 	struct rte_port_out_stats stats;
131 
132 	struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
133 
134 	uint32_t enq_burst_sz;
135 	uint32_t enq_buf_count;
136 	uint64_t bsz_mask;
137 
138 	uint8_t eventdev_id;
139 	uint8_t port_id;
140 	uint8_t queue_id;
141 	uint8_t sched_type;
142 	uint8_t evt_op;
143 };
144 
145 static void *
146 rte_port_eventdev_writer_create(void *params, int socket_id)
147 {
148 	struct rte_port_eventdev_writer_params *conf =
149 			params;
150 	struct rte_port_eventdev_writer *port;
151 	unsigned int i;
152 
153 	/* Check input parameters */
154 	if ((conf == NULL) ||
155 		(conf->enq_burst_sz == 0) ||
156 		(conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
157 		(!rte_is_power_of_2(conf->enq_burst_sz))) {
158 		RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__);
159 		return NULL;
160 	}
161 
162 	/* Memory allocation */
163 	port = rte_zmalloc_socket("PORT", sizeof(*port),
164 		RTE_CACHE_LINE_SIZE, socket_id);
165 	if (port == NULL) {
166 		RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
167 		return NULL;
168 	}
169 
170 	/* Initialization */
171 	port->enq_burst_sz = conf->enq_burst_sz;
172 	port->enq_buf_count = 0;
173 	port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
174 
175 	port->eventdev_id = conf->eventdev_id;
176 	port->port_id = conf->port_id;
177 	port->queue_id = conf->queue_id;
178 	port->sched_type = conf->sched_type;
179 	port->evt_op = conf->evt_op;
180 	memset(&port->ev, 0, sizeof(port->ev));
181 
182 	for (i = 0; i < RTE_DIM(port->ev); i++) {
183 		port->ev[i].queue_id = port->queue_id;
184 		port->ev[i].sched_type = port->sched_type;
185 		port->ev[i].op = port->evt_op;
186 	}
187 
188 	return port;
189 }
190 
191 static inline void
192 send_burst(struct rte_port_eventdev_writer *p)
193 {
194 	uint32_t nb_enq;
195 
196 	nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
197 			p->ev, p->enq_buf_count);
198 
199 	RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p, p->enq_buf_count -
200 			nb_enq);
201 
202 	for (; nb_enq < p->enq_buf_count; nb_enq++)
203 		rte_pktmbuf_free(p->ev[nb_enq].mbuf);
204 
205 	p->enq_buf_count = 0;
206 }
207 
208 static int
209 rte_port_eventdev_writer_tx(void *port, struct rte_mbuf *pkt)
210 {
211 	struct rte_port_eventdev_writer *p = port;
212 
213 	p->ev[p->enq_buf_count++].mbuf  = pkt;
214 	RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
215 	if (p->enq_buf_count >= p->enq_burst_sz)
216 		send_burst(p);
217 
218 	return 0;
219 }
220 
221 static int
222 rte_port_eventdev_writer_tx_bulk(void *port,
223 	struct rte_mbuf **pkts,
224 	uint64_t pkts_mask)
225 {
226 	struct rte_port_eventdev_writer *p =
227 			port;
228 	uint64_t bsz_mask = p->bsz_mask;
229 	uint32_t enq_buf_count = p->enq_buf_count;
230 	uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
231 					((pkts_mask & bsz_mask) ^ bsz_mask);
232 
233 	if (expr == 0) {
234 		uint64_t n_pkts = __builtin_popcountll(pkts_mask);
235 		uint32_t i, n_enq_ok;
236 
237 		if (enq_buf_count)
238 			send_burst(p);
239 
240 		RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
241 
242 		struct rte_event events[2 * RTE_PORT_IN_BURST_SIZE_MAX] = {};
243 		for (i = 0; i < n_pkts; i++) {
244 			events[i].mbuf = pkts[i];
245 			events[i].queue_id = p->queue_id;
246 			events[i].sched_type = p->sched_type;
247 			events[i].op = p->evt_op;
248 		}
249 
250 		n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
251 				events, n_pkts);
252 
253 		RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p,
254 				n_pkts - n_enq_ok);
255 		for (; n_enq_ok < n_pkts; n_enq_ok++)
256 			rte_pktmbuf_free(pkts[n_enq_ok]);
257 
258 	} else {
259 		for (; pkts_mask;) {
260 			uint32_t pkt_index = __builtin_ctzll(pkts_mask);
261 			uint64_t pkt_mask = 1LLU << pkt_index;
262 
263 			p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
264 
265 			RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
266 			pkts_mask &= ~pkt_mask;
267 		}
268 
269 		p->enq_buf_count = enq_buf_count;
270 		if (enq_buf_count >= p->enq_burst_sz)
271 			send_burst(p);
272 	}
273 
274 	return 0;
275 }
276 
277 static int
278 rte_port_eventdev_writer_flush(void *port)
279 {
280 	struct rte_port_eventdev_writer *p =
281 			port;
282 
283 	if (p->enq_buf_count > 0)
284 		send_burst(p);
285 
286 	return 0;
287 }
288 
289 static int
290 rte_port_eventdev_writer_free(void *port)
291 {
292 	if (port == NULL) {
293 		RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
294 		return -EINVAL;
295 	}
296 
297 	rte_port_eventdev_writer_flush(port);
298 	rte_free(port);
299 
300 	return 0;
301 }
302 
303 static int rte_port_eventdev_writer_stats_read(void *port,
304 	struct rte_port_out_stats *stats, int clear)
305 {
306 	struct rte_port_eventdev_writer *p =
307 			port;
308 
309 	if (stats != NULL)
310 		memcpy(stats, &p->stats, sizeof(p->stats));
311 
312 	if (clear)
313 		memset(&p->stats, 0, sizeof(p->stats));
314 
315 	return 0;
316 }
317 
318 /*
319  * Port EVENTDEV Writer Nodrop
320  */
321 #ifdef RTE_PORT_STATS_COLLECT
322 
323 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
324 	do {port->stats.n_pkts_in += val;} while (0)
325 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
326 	do {port->stats.n_pkts_drop += val;} while (0)
327 
328 #else
329 
330 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
331 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
332 
333 #endif
334 
335 struct rte_port_eventdev_writer_nodrop {
336 	struct rte_port_out_stats stats;
337 
338 	struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
339 
340 	uint32_t enq_burst_sz;
341 	uint32_t enq_buf_count;
342 	uint64_t bsz_mask;
343 	uint64_t n_retries;
344 	uint8_t eventdev_id;
345 	uint8_t port_id;
346 	uint8_t queue_id;
347 	uint8_t sched_type;
348 	uint8_t evt_op;
349 };
350 
351 
352 static void *
353 rte_port_eventdev_writer_nodrop_create(void *params, int socket_id)
354 {
355 	struct rte_port_eventdev_writer_nodrop_params *conf =
356 			params;
357 	struct rte_port_eventdev_writer_nodrop *port;
358 	unsigned int i;
359 
360 	/* Check input parameters */
361 	if ((conf == NULL) ||
362 		(conf->enq_burst_sz == 0) ||
363 		(conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
364 		(!rte_is_power_of_2(conf->enq_burst_sz))) {
365 		RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__);
366 		return NULL;
367 	}
368 
369 	/* Memory allocation */
370 	port = rte_zmalloc_socket("PORT", sizeof(*port),
371 		RTE_CACHE_LINE_SIZE, socket_id);
372 	if (port == NULL) {
373 		RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
374 		return NULL;
375 	}
376 
377 	/* Initialization */
378 	port->enq_burst_sz = conf->enq_burst_sz;
379 	port->enq_buf_count = 0;
380 	port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
381 
382 	port->eventdev_id = conf->eventdev_id;
383 	port->port_id = conf->port_id;
384 	port->queue_id = conf->queue_id;
385 	port->sched_type = conf->sched_type;
386 	port->evt_op = conf->evt_op;
387 	memset(&port->ev, 0, sizeof(port->ev));
388 
389 	for (i = 0; i < RTE_DIM(port->ev); i++) {
390 		port->ev[i].queue_id = port->queue_id;
391 		port->ev[i].sched_type = port->sched_type;
392 		port->ev[i].op = port->evt_op;
393 	}
394 	/*
395 	 * When n_retries is 0 it means that we should wait for every event to
396 	 * send no matter how many retries should it take. To limit number of
397 	 * branches in fast path, we use UINT64_MAX instead of branching.
398 	 */
399 	port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
400 
401 	return port;
402 }
403 
404 static inline void
405 send_burst_nodrop(struct rte_port_eventdev_writer_nodrop *p)
406 {
407 	uint32_t nb_enq, i;
408 
409 	nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
410 			p->ev, p->enq_buf_count);
411 
412 	/* We sent all the packets in a first try */
413 	if (nb_enq >= p->enq_buf_count) {
414 		p->enq_buf_count = 0;
415 		return;
416 	}
417 
418 	for (i = 0; i < p->n_retries; i++) {
419 		nb_enq += rte_event_enqueue_burst(p->eventdev_id, p->port_id,
420 							p->ev + nb_enq,
421 							p->enq_buf_count - nb_enq);
422 
423 		/* We sent all the events in more than one try */
424 		if (nb_enq >= p->enq_buf_count) {
425 			p->enq_buf_count = 0;
426 			return;
427 		}
428 	}
429 	/* We didn't send the events in maximum allowed attempts */
430 	RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(p,
431 			p->enq_buf_count - nb_enq);
432 	for (; nb_enq < p->enq_buf_count; nb_enq++)
433 		rte_pktmbuf_free(p->ev[nb_enq].mbuf);
434 
435 	p->enq_buf_count = 0;
436 }
437 
438 static int
439 rte_port_eventdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
440 {
441 	struct rte_port_eventdev_writer_nodrop *p = port;
442 
443 	p->ev[p->enq_buf_count++].mbuf = pkt;
444 
445 	RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
446 	if (p->enq_buf_count >= p->enq_burst_sz)
447 		send_burst_nodrop(p);
448 
449 	return 0;
450 }
451 
452 static int
453 rte_port_eventdev_writer_nodrop_tx_bulk(void *port,
454 	struct rte_mbuf **pkts,
455 	uint64_t pkts_mask)
456 {
457 	struct rte_port_eventdev_writer_nodrop *p =
458 			port;
459 
460 	uint64_t bsz_mask = p->bsz_mask;
461 	uint32_t enq_buf_count = p->enq_buf_count;
462 	uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
463 					((pkts_mask & bsz_mask) ^ bsz_mask);
464 
465 	if (expr == 0) {
466 		uint64_t n_pkts = __builtin_popcountll(pkts_mask);
467 		uint32_t i, n_enq_ok;
468 
469 		if (enq_buf_count)
470 			send_burst_nodrop(p);
471 
472 		RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
473 
474 		struct rte_event events[RTE_PORT_IN_BURST_SIZE_MAX] = {};
475 
476 		for (i = 0; i < n_pkts; i++) {
477 			events[i].mbuf = pkts[i];
478 			events[i].queue_id = p->queue_id;
479 			events[i].sched_type = p->sched_type;
480 			events[i].op = p->evt_op;
481 		}
482 
483 		n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
484 				events, n_pkts);
485 
486 		if (n_enq_ok >= n_pkts)
487 			return 0;
488 
489 		/*
490 		 * If we did not manage to enqueue all events in single burst,
491 		 * move remaining events to the buffer and call send burst.
492 		 */
493 		for (; n_enq_ok < n_pkts; n_enq_ok++) {
494 			struct rte_mbuf *pkt = pkts[n_enq_ok];
495 			p->ev[p->enq_buf_count++].mbuf = pkt;
496 		}
497 		send_burst_nodrop(p);
498 	} else {
499 		for (; pkts_mask;) {
500 			uint32_t pkt_index = __builtin_ctzll(pkts_mask);
501 			uint64_t pkt_mask = 1LLU << pkt_index;
502 
503 			p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
504 
505 			RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
506 			pkts_mask &= ~pkt_mask;
507 		}
508 
509 		p->enq_buf_count = enq_buf_count;
510 		if (enq_buf_count >= p->enq_burst_sz)
511 			send_burst_nodrop(p);
512 	}
513 
514 	return 0;
515 }
516 
517 static int
518 rte_port_eventdev_writer_nodrop_flush(void *port)
519 {
520 	struct rte_port_eventdev_writer_nodrop *p =
521 			port;
522 
523 	if (p->enq_buf_count > 0)
524 		send_burst_nodrop(p);
525 
526 	return 0;
527 }
528 
529 static int
530 rte_port_eventdev_writer_nodrop_free(void *port)
531 {
532 	if (port == NULL) {
533 		RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
534 		return -EINVAL;
535 	}
536 
537 	rte_port_eventdev_writer_nodrop_flush(port);
538 	rte_free(port);
539 
540 	return 0;
541 }
542 
543 static int rte_port_eventdev_writer_nodrop_stats_read(void *port,
544 	struct rte_port_out_stats *stats, int clear)
545 {
546 	struct rte_port_eventdev_writer_nodrop *p =
547 			port;
548 
549 	if (stats != NULL)
550 		memcpy(stats, &p->stats, sizeof(p->stats));
551 
552 	if (clear)
553 		memset(&p->stats, 0, sizeof(p->stats));
554 
555 	return 0;
556 }
557 
558 /*
559  * Summary of port operations
560  */
561 struct rte_port_in_ops rte_port_eventdev_reader_ops = {
562 	.f_create = rte_port_eventdev_reader_create,
563 	.f_free = rte_port_eventdev_reader_free,
564 	.f_rx = rte_port_eventdev_reader_rx,
565 	.f_stats = rte_port_eventdev_reader_stats_read,
566 };
567 
568 struct rte_port_out_ops rte_port_eventdev_writer_ops = {
569 	.f_create = rte_port_eventdev_writer_create,
570 	.f_free = rte_port_eventdev_writer_free,
571 	.f_tx = rte_port_eventdev_writer_tx,
572 	.f_tx_bulk = rte_port_eventdev_writer_tx_bulk,
573 	.f_flush = rte_port_eventdev_writer_flush,
574 	.f_stats = rte_port_eventdev_writer_stats_read,
575 };
576 
577 struct rte_port_out_ops rte_port_eventdev_writer_nodrop_ops = {
578 	.f_create = rte_port_eventdev_writer_nodrop_create,
579 	.f_free = rte_port_eventdev_writer_nodrop_free,
580 	.f_tx = rte_port_eventdev_writer_nodrop_tx,
581 	.f_tx_bulk = rte_port_eventdev_writer_nodrop_tx_bulk,
582 	.f_flush = rte_port_eventdev_writer_nodrop_flush,
583 	.f_stats = rte_port_eventdev_writer_nodrop_stats_read,
584 };
585