xref: /dpdk/lib/port/rte_port_eventdev.c (revision ae67895b507bb6af22263c79ba0d5c374b396485)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2019 Intel Corporation
3  */
4 
5 #include <string.h>
6 #include <stdint.h>
7 
8 #include <rte_mbuf.h>
9 #include <rte_malloc.h>
10 
11 #include "rte_port_eventdev.h"
12 
13 #include "port_log.h"
14 
15 /*
16  * Port EVENTDEV Reader
17  */
18 #ifdef RTE_PORT_STATS_COLLECT
19 
20 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val) \
21 	do {port->stats.n_pkts_in += val;} while (0)
22 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val) \
23 	do {port->stats.n_pkts_drop += val;} while (0)
24 
25 #else
26 
27 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val)
28 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val)
29 
30 #endif
31 
32 struct rte_port_eventdev_reader {
33 	struct rte_port_in_stats stats;
34 
35 	uint8_t  eventdev_id;
36 	uint16_t port_id;
37 
38 	struct rte_event ev[RTE_PORT_IN_BURST_SIZE_MAX];
39 };
40 
41 static void *
rte_port_eventdev_reader_create(void * params,int socket_id)42 rte_port_eventdev_reader_create(void *params, int socket_id)
43 {
44 	struct rte_port_eventdev_reader_params *conf =
45 			params;
46 	struct rte_port_eventdev_reader *port;
47 
48 	/* Check input parameters */
49 	if (conf == NULL) {
50 		PORT_LOG(ERR, "%s: params is NULL", __func__);
51 		return NULL;
52 	}
53 
54 	/* Memory allocation */
55 	port = rte_zmalloc_socket("PORT", sizeof(*port),
56 		RTE_CACHE_LINE_SIZE, socket_id);
57 	if (port == NULL) {
58 		PORT_LOG(ERR, "%s: Failed to allocate port", __func__);
59 		return NULL;
60 	}
61 
62 	/* Initialization */
63 	port->eventdev_id = conf->eventdev_id;
64 	port->port_id = conf->port_id;
65 
66 	return port;
67 }
68 
69 static int
rte_port_eventdev_reader_rx(void * port,struct rte_mbuf ** pkts,uint32_t n_pkts)70 rte_port_eventdev_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
71 {
72 	struct rte_port_eventdev_reader *p = port;
73 	uint16_t rx_evts_cnt, i;
74 
75 	rx_evts_cnt = rte_event_dequeue_burst(p->eventdev_id, p->port_id,
76 			p->ev, n_pkts, 0);
77 
78 	for (i = 0; i < rx_evts_cnt; i++)
79 		pkts[i] = p->ev[i].mbuf;
80 
81 	RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(p, rx_evts_cnt);
82 
83 	return rx_evts_cnt;
84 }
85 
86 static int
rte_port_eventdev_reader_free(void * port)87 rte_port_eventdev_reader_free(void *port)
88 {
89 	if (port == NULL) {
90 		PORT_LOG(ERR, "%s: port is NULL", __func__);
91 		return -EINVAL;
92 	}
93 
94 	rte_free(port);
95 
96 	return 0;
97 }
98 
rte_port_eventdev_reader_stats_read(void * port,struct rte_port_in_stats * stats,int clear)99 static int rte_port_eventdev_reader_stats_read(void *port,
100 	struct rte_port_in_stats *stats, int clear)
101 {
102 	struct rte_port_eventdev_reader *p =
103 			port;
104 
105 	if (stats != NULL)
106 		memcpy(stats, &p->stats, sizeof(p->stats));
107 
108 	if (clear)
109 		memset(&p->stats, 0, sizeof(p->stats));
110 
111 	return 0;
112 }
113 
114 /*
115  * Port EVENTDEV Writer
116  */
117 #ifdef RTE_PORT_STATS_COLLECT
118 
119 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val) \
120 	do {port->stats.n_pkts_in += val;} while (0)
121 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val) \
122 	do {port->stats.n_pkts_drop += val;} while (0)
123 
124 #else
125 
126 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val)
127 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val)
128 
129 #endif
130 
131 struct rte_port_eventdev_writer {
132 	struct rte_port_out_stats stats;
133 
134 	struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
135 
136 	uint32_t enq_burst_sz;
137 	uint32_t enq_buf_count;
138 	uint64_t bsz_mask;
139 
140 	uint8_t eventdev_id;
141 	uint8_t port_id;
142 	uint8_t queue_id;
143 	uint8_t sched_type;
144 	uint8_t evt_op;
145 };
146 
147 static void *
rte_port_eventdev_writer_create(void * params,int socket_id)148 rte_port_eventdev_writer_create(void *params, int socket_id)
149 {
150 	struct rte_port_eventdev_writer_params *conf =
151 			params;
152 	struct rte_port_eventdev_writer *port;
153 	unsigned int i;
154 
155 	/* Check input parameters */
156 	if ((conf == NULL) ||
157 		(conf->enq_burst_sz == 0) ||
158 		(conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
159 		(!rte_is_power_of_2(conf->enq_burst_sz))) {
160 		PORT_LOG(ERR, "%s: Invalid input parameters", __func__);
161 		return NULL;
162 	}
163 
164 	/* Memory allocation */
165 	port = rte_zmalloc_socket("PORT", sizeof(*port),
166 		RTE_CACHE_LINE_SIZE, socket_id);
167 	if (port == NULL) {
168 		PORT_LOG(ERR, "%s: Failed to allocate port", __func__);
169 		return NULL;
170 	}
171 
172 	/* Initialization */
173 	port->enq_burst_sz = conf->enq_burst_sz;
174 	port->enq_buf_count = 0;
175 	port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
176 
177 	port->eventdev_id = conf->eventdev_id;
178 	port->port_id = conf->port_id;
179 	port->queue_id = conf->queue_id;
180 	port->sched_type = conf->sched_type;
181 	port->evt_op = conf->evt_op;
182 	memset(&port->ev, 0, sizeof(port->ev));
183 
184 	for (i = 0; i < RTE_DIM(port->ev); i++) {
185 		port->ev[i].queue_id = port->queue_id;
186 		port->ev[i].sched_type = port->sched_type;
187 		port->ev[i].op = port->evt_op;
188 	}
189 
190 	return port;
191 }
192 
193 static inline void
send_burst(struct rte_port_eventdev_writer * p)194 send_burst(struct rte_port_eventdev_writer *p)
195 {
196 	uint32_t nb_enq;
197 
198 	nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
199 			p->ev, p->enq_buf_count);
200 
201 	RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p, p->enq_buf_count -
202 			nb_enq);
203 
204 	for (; nb_enq < p->enq_buf_count; nb_enq++)
205 		rte_pktmbuf_free(p->ev[nb_enq].mbuf);
206 
207 	p->enq_buf_count = 0;
208 }
209 
210 static int
rte_port_eventdev_writer_tx(void * port,struct rte_mbuf * pkt)211 rte_port_eventdev_writer_tx(void *port, struct rte_mbuf *pkt)
212 {
213 	struct rte_port_eventdev_writer *p = port;
214 
215 	p->ev[p->enq_buf_count++].mbuf  = pkt;
216 	RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
217 	if (p->enq_buf_count >= p->enq_burst_sz)
218 		send_burst(p);
219 
220 	return 0;
221 }
222 
223 static int
rte_port_eventdev_writer_tx_bulk(void * port,struct rte_mbuf ** pkts,uint64_t pkts_mask)224 rte_port_eventdev_writer_tx_bulk(void *port,
225 	struct rte_mbuf **pkts,
226 	uint64_t pkts_mask)
227 {
228 	struct rte_port_eventdev_writer *p =
229 			port;
230 	uint64_t bsz_mask = p->bsz_mask;
231 	uint32_t enq_buf_count = p->enq_buf_count;
232 	uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
233 					((pkts_mask & bsz_mask) ^ bsz_mask);
234 
235 	if (expr == 0) {
236 		uint64_t n_pkts = rte_popcount64(pkts_mask);
237 		uint32_t i, n_enq_ok;
238 
239 		if (enq_buf_count)
240 			send_burst(p);
241 
242 		RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
243 
244 		struct rte_event events[2 * RTE_PORT_IN_BURST_SIZE_MAX] = {};
245 		for (i = 0; i < n_pkts; i++) {
246 			events[i].mbuf = pkts[i];
247 			events[i].queue_id = p->queue_id;
248 			events[i].sched_type = p->sched_type;
249 			events[i].op = p->evt_op;
250 		}
251 
252 		n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
253 				events, n_pkts);
254 
255 		RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p,
256 				n_pkts - n_enq_ok);
257 		for (; n_enq_ok < n_pkts; n_enq_ok++)
258 			rte_pktmbuf_free(pkts[n_enq_ok]);
259 
260 	} else {
261 		for (; pkts_mask;) {
262 			uint32_t pkt_index = rte_ctz64(pkts_mask);
263 			uint64_t pkt_mask = 1LLU << pkt_index;
264 
265 			p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
266 
267 			RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
268 			pkts_mask &= ~pkt_mask;
269 		}
270 
271 		p->enq_buf_count = enq_buf_count;
272 		if (enq_buf_count >= p->enq_burst_sz)
273 			send_burst(p);
274 	}
275 
276 	return 0;
277 }
278 
279 static int
rte_port_eventdev_writer_flush(void * port)280 rte_port_eventdev_writer_flush(void *port)
281 {
282 	struct rte_port_eventdev_writer *p =
283 			port;
284 
285 	if (p->enq_buf_count > 0)
286 		send_burst(p);
287 
288 	return 0;
289 }
290 
291 static int
rte_port_eventdev_writer_free(void * port)292 rte_port_eventdev_writer_free(void *port)
293 {
294 	if (port == NULL) {
295 		PORT_LOG(ERR, "%s: Port is NULL", __func__);
296 		return -EINVAL;
297 	}
298 
299 	rte_port_eventdev_writer_flush(port);
300 	rte_free(port);
301 
302 	return 0;
303 }
304 
rte_port_eventdev_writer_stats_read(void * port,struct rte_port_out_stats * stats,int clear)305 static int rte_port_eventdev_writer_stats_read(void *port,
306 	struct rte_port_out_stats *stats, int clear)
307 {
308 	struct rte_port_eventdev_writer *p =
309 			port;
310 
311 	if (stats != NULL)
312 		memcpy(stats, &p->stats, sizeof(p->stats));
313 
314 	if (clear)
315 		memset(&p->stats, 0, sizeof(p->stats));
316 
317 	return 0;
318 }
319 
320 /*
321  * Port EVENTDEV Writer Nodrop
322  */
323 #ifdef RTE_PORT_STATS_COLLECT
324 
325 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
326 	do {port->stats.n_pkts_in += val;} while (0)
327 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
328 	do {port->stats.n_pkts_drop += val;} while (0)
329 
330 #else
331 
332 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
333 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
334 
335 #endif
336 
337 struct rte_port_eventdev_writer_nodrop {
338 	struct rte_port_out_stats stats;
339 
340 	struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
341 
342 	uint32_t enq_burst_sz;
343 	uint32_t enq_buf_count;
344 	uint64_t bsz_mask;
345 	uint64_t n_retries;
346 	uint8_t eventdev_id;
347 	uint8_t port_id;
348 	uint8_t queue_id;
349 	uint8_t sched_type;
350 	uint8_t evt_op;
351 };
352 
353 
354 static void *
rte_port_eventdev_writer_nodrop_create(void * params,int socket_id)355 rte_port_eventdev_writer_nodrop_create(void *params, int socket_id)
356 {
357 	struct rte_port_eventdev_writer_nodrop_params *conf =
358 			params;
359 	struct rte_port_eventdev_writer_nodrop *port;
360 	unsigned int i;
361 
362 	/* Check input parameters */
363 	if ((conf == NULL) ||
364 		(conf->enq_burst_sz == 0) ||
365 		(conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
366 		(!rte_is_power_of_2(conf->enq_burst_sz))) {
367 		PORT_LOG(ERR, "%s: Invalid input parameters", __func__);
368 		return NULL;
369 	}
370 
371 	/* Memory allocation */
372 	port = rte_zmalloc_socket("PORT", sizeof(*port),
373 		RTE_CACHE_LINE_SIZE, socket_id);
374 	if (port == NULL) {
375 		PORT_LOG(ERR, "%s: Failed to allocate port", __func__);
376 		return NULL;
377 	}
378 
379 	/* Initialization */
380 	port->enq_burst_sz = conf->enq_burst_sz;
381 	port->enq_buf_count = 0;
382 	port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
383 
384 	port->eventdev_id = conf->eventdev_id;
385 	port->port_id = conf->port_id;
386 	port->queue_id = conf->queue_id;
387 	port->sched_type = conf->sched_type;
388 	port->evt_op = conf->evt_op;
389 	memset(&port->ev, 0, sizeof(port->ev));
390 
391 	for (i = 0; i < RTE_DIM(port->ev); i++) {
392 		port->ev[i].queue_id = port->queue_id;
393 		port->ev[i].sched_type = port->sched_type;
394 		port->ev[i].op = port->evt_op;
395 	}
396 	/*
397 	 * When n_retries is 0 it means that we should wait for every event to
398 	 * send no matter how many retries should it take. To limit number of
399 	 * branches in fast path, we use UINT64_MAX instead of branching.
400 	 */
401 	port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
402 
403 	return port;
404 }
405 
406 static inline void
send_burst_nodrop(struct rte_port_eventdev_writer_nodrop * p)407 send_burst_nodrop(struct rte_port_eventdev_writer_nodrop *p)
408 {
409 	uint32_t nb_enq, i;
410 
411 	nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
412 			p->ev, p->enq_buf_count);
413 
414 	/* We sent all the packets in a first try */
415 	if (nb_enq >= p->enq_buf_count) {
416 		p->enq_buf_count = 0;
417 		return;
418 	}
419 
420 	for (i = 0; i < p->n_retries; i++) {
421 		nb_enq += rte_event_enqueue_burst(p->eventdev_id, p->port_id,
422 							p->ev + nb_enq,
423 							p->enq_buf_count - nb_enq);
424 
425 		/* We sent all the events in more than one try */
426 		if (nb_enq >= p->enq_buf_count) {
427 			p->enq_buf_count = 0;
428 			return;
429 		}
430 	}
431 	/* We didn't send the events in maximum allowed attempts */
432 	RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(p,
433 			p->enq_buf_count - nb_enq);
434 	for (; nb_enq < p->enq_buf_count; nb_enq++)
435 		rte_pktmbuf_free(p->ev[nb_enq].mbuf);
436 
437 	p->enq_buf_count = 0;
438 }
439 
440 static int
rte_port_eventdev_writer_nodrop_tx(void * port,struct rte_mbuf * pkt)441 rte_port_eventdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
442 {
443 	struct rte_port_eventdev_writer_nodrop *p = port;
444 
445 	p->ev[p->enq_buf_count++].mbuf = pkt;
446 
447 	RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
448 	if (p->enq_buf_count >= p->enq_burst_sz)
449 		send_burst_nodrop(p);
450 
451 	return 0;
452 }
453 
454 static int
rte_port_eventdev_writer_nodrop_tx_bulk(void * port,struct rte_mbuf ** pkts,uint64_t pkts_mask)455 rte_port_eventdev_writer_nodrop_tx_bulk(void *port,
456 	struct rte_mbuf **pkts,
457 	uint64_t pkts_mask)
458 {
459 	struct rte_port_eventdev_writer_nodrop *p =
460 			port;
461 
462 	uint64_t bsz_mask = p->bsz_mask;
463 	uint32_t enq_buf_count = p->enq_buf_count;
464 	uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
465 					((pkts_mask & bsz_mask) ^ bsz_mask);
466 
467 	if (expr == 0) {
468 		uint64_t n_pkts = rte_popcount64(pkts_mask);
469 		uint32_t i, n_enq_ok;
470 
471 		if (enq_buf_count)
472 			send_burst_nodrop(p);
473 
474 		RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
475 
476 		struct rte_event events[RTE_PORT_IN_BURST_SIZE_MAX] = {};
477 
478 		for (i = 0; i < n_pkts; i++) {
479 			events[i].mbuf = pkts[i];
480 			events[i].queue_id = p->queue_id;
481 			events[i].sched_type = p->sched_type;
482 			events[i].op = p->evt_op;
483 		}
484 
485 		n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
486 				events, n_pkts);
487 
488 		if (n_enq_ok >= n_pkts)
489 			return 0;
490 
491 		/*
492 		 * If we did not manage to enqueue all events in single burst,
493 		 * move remaining events to the buffer and call send burst.
494 		 */
495 		for (; n_enq_ok < n_pkts; n_enq_ok++) {
496 			struct rte_mbuf *pkt = pkts[n_enq_ok];
497 			p->ev[p->enq_buf_count++].mbuf = pkt;
498 		}
499 		send_burst_nodrop(p);
500 	} else {
501 		for (; pkts_mask;) {
502 			uint32_t pkt_index = rte_ctz64(pkts_mask);
503 			uint64_t pkt_mask = 1LLU << pkt_index;
504 
505 			p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
506 
507 			RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
508 			pkts_mask &= ~pkt_mask;
509 		}
510 
511 		p->enq_buf_count = enq_buf_count;
512 		if (enq_buf_count >= p->enq_burst_sz)
513 			send_burst_nodrop(p);
514 	}
515 
516 	return 0;
517 }
518 
519 static int
rte_port_eventdev_writer_nodrop_flush(void * port)520 rte_port_eventdev_writer_nodrop_flush(void *port)
521 {
522 	struct rte_port_eventdev_writer_nodrop *p =
523 			port;
524 
525 	if (p->enq_buf_count > 0)
526 		send_burst_nodrop(p);
527 
528 	return 0;
529 }
530 
531 static int
rte_port_eventdev_writer_nodrop_free(void * port)532 rte_port_eventdev_writer_nodrop_free(void *port)
533 {
534 	if (port == NULL) {
535 		PORT_LOG(ERR, "%s: Port is NULL", __func__);
536 		return -EINVAL;
537 	}
538 
539 	rte_port_eventdev_writer_nodrop_flush(port);
540 	rte_free(port);
541 
542 	return 0;
543 }
544 
rte_port_eventdev_writer_nodrop_stats_read(void * port,struct rte_port_out_stats * stats,int clear)545 static int rte_port_eventdev_writer_nodrop_stats_read(void *port,
546 	struct rte_port_out_stats *stats, int clear)
547 {
548 	struct rte_port_eventdev_writer_nodrop *p =
549 			port;
550 
551 	if (stats != NULL)
552 		memcpy(stats, &p->stats, sizeof(p->stats));
553 
554 	if (clear)
555 		memset(&p->stats, 0, sizeof(p->stats));
556 
557 	return 0;
558 }
559 
560 /*
561  * Summary of port operations
562  */
563 struct rte_port_in_ops rte_port_eventdev_reader_ops = {
564 	.f_create = rte_port_eventdev_reader_create,
565 	.f_free = rte_port_eventdev_reader_free,
566 	.f_rx = rte_port_eventdev_reader_rx,
567 	.f_stats = rte_port_eventdev_reader_stats_read,
568 };
569 
570 struct rte_port_out_ops rte_port_eventdev_writer_ops = {
571 	.f_create = rte_port_eventdev_writer_create,
572 	.f_free = rte_port_eventdev_writer_free,
573 	.f_tx = rte_port_eventdev_writer_tx,
574 	.f_tx_bulk = rte_port_eventdev_writer_tx_bulk,
575 	.f_flush = rte_port_eventdev_writer_flush,
576 	.f_stats = rte_port_eventdev_writer_stats_read,
577 };
578 
579 struct rte_port_out_ops rte_port_eventdev_writer_nodrop_ops = {
580 	.f_create = rte_port_eventdev_writer_nodrop_create,
581 	.f_free = rte_port_eventdev_writer_nodrop_free,
582 	.f_tx = rte_port_eventdev_writer_nodrop_tx,
583 	.f_tx_bulk = rte_port_eventdev_writer_nodrop_tx_bulk,
584 	.f_flush = rte_port_eventdev_writer_nodrop_flush,
585 	.f_stats = rte_port_eventdev_writer_nodrop_stats_read,
586 };
587