xref: /dpdk/lib/dispatcher/rte_dispatcher.c (revision 3da59f30a23f2e795d2315f3d949e1b3e0ce0c3d)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2023 Ericsson AB
3  */
4 
5 #include <stdbool.h>
6 #include <stdint.h>
7 
8 #include <rte_branch_prediction.h>
9 #include <rte_common.h>
10 #include <rte_lcore.h>
11 #include <rte_random.h>
12 #include <rte_service_component.h>
13 
14 #include "eventdev_pmd.h"
15 
16 #include <rte_dispatcher.h>
17 
18 #define EVD_MAX_PORTS_PER_LCORE 4
19 #define EVD_MAX_HANDLERS 32
20 #define EVD_MAX_FINALIZERS 16
21 #define EVD_AVG_PRIO_INTERVAL 2000
22 #define EVD_SERVICE_NAME "dispatcher"
23 
24 struct rte_dispatcher_lcore_port {
25 	uint8_t port_id;
26 	uint16_t batch_size;
27 	uint64_t timeout;
28 };
29 
30 struct rte_dispatcher_handler {
31 	int id;
32 	rte_dispatcher_match_t match_fun;
33 	void *match_data;
34 	rte_dispatcher_process_t process_fun;
35 	void *process_data;
36 };
37 
38 struct rte_dispatcher_finalizer {
39 	int id;
40 	rte_dispatcher_finalize_t finalize_fun;
41 	void *finalize_data;
42 };
43 
44 struct rte_dispatcher_lcore {
45 	uint8_t num_ports;
46 	uint16_t num_handlers;
47 	int32_t prio_count;
48 	struct rte_dispatcher_lcore_port ports[EVD_MAX_PORTS_PER_LCORE];
49 	struct rte_dispatcher_handler handlers[EVD_MAX_HANDLERS];
50 	struct rte_dispatcher_stats stats;
51 } __rte_cache_aligned;
52 
53 struct rte_dispatcher {
54 	uint8_t event_dev_id;
55 	int socket_id;
56 	uint32_t service_id;
57 	struct rte_dispatcher_lcore lcores[RTE_MAX_LCORE];
58 	uint16_t num_finalizers;
59 	struct rte_dispatcher_finalizer finalizers[EVD_MAX_FINALIZERS];
60 };
61 
62 static int
63 evd_lookup_handler_idx(struct rte_dispatcher_lcore *lcore,
64 	const struct rte_event *event)
65 {
66 	uint16_t i;
67 
68 	for (i = 0; i < lcore->num_handlers; i++) {
69 		struct rte_dispatcher_handler *handler =
70 			&lcore->handlers[i];
71 
72 		if (handler->match_fun(event, handler->match_data))
73 			return i;
74 	}
75 
76 	return -1;
77 }
78 
79 static void
80 evd_prioritize_handler(struct rte_dispatcher_lcore *lcore,
81 	int handler_idx)
82 {
83 	struct rte_dispatcher_handler tmp;
84 
85 	if (handler_idx == 0)
86 		return;
87 
88 	/* Let the lucky handler "bubble" up the list */
89 
90 	tmp = lcore->handlers[handler_idx - 1];
91 	lcore->handlers[handler_idx - 1] = lcore->handlers[handler_idx];
92 	lcore->handlers[handler_idx] = tmp;
93 }
94 
95 static inline void
96 evd_consider_prioritize_handler(struct rte_dispatcher_lcore *lcore,
97 	int handler_idx, uint16_t handler_events)
98 {
99 	lcore->prio_count -= handler_events;
100 
101 	if (unlikely(lcore->prio_count <= 0)) {
102 		evd_prioritize_handler(lcore, handler_idx);
103 
104 		/*
105 		 * Randomize the interval in the unlikely case
106 		 * the traffic follow some very strict pattern.
107 		 */
108 		lcore->prio_count =
109 			rte_rand_max(EVD_AVG_PRIO_INTERVAL) +
110 			EVD_AVG_PRIO_INTERVAL / 2;
111 	}
112 }
113 
114 static inline void
115 evd_dispatch_events(struct rte_dispatcher *dispatcher,
116 	struct rte_dispatcher_lcore *lcore,
117 	struct rte_dispatcher_lcore_port *port,
118 	struct rte_event *events, uint16_t num_events)
119 {
120 	int i;
121 	struct rte_event bursts[EVD_MAX_HANDLERS][num_events];
122 	uint16_t burst_lens[EVD_MAX_HANDLERS] = { 0 };
123 	uint16_t drop_count = 0;
124 	uint16_t dispatch_count;
125 	uint16_t dispatched = 0;
126 
127 	for (i = 0; i < num_events; i++) {
128 		struct rte_event *event = &events[i];
129 		int handler_idx;
130 
131 		handler_idx = evd_lookup_handler_idx(lcore, event);
132 
133 		if (unlikely(handler_idx < 0)) {
134 			drop_count++;
135 			continue;
136 		}
137 
138 		bursts[handler_idx][burst_lens[handler_idx]] = *event;
139 		burst_lens[handler_idx]++;
140 	}
141 
142 	dispatch_count = num_events - drop_count;
143 
144 	for (i = 0; i < lcore->num_handlers &&
145 		 dispatched < dispatch_count; i++) {
146 		struct rte_dispatcher_handler *handler =
147 			&lcore->handlers[i];
148 		uint16_t len = burst_lens[i];
149 
150 		if (len == 0)
151 			continue;
152 
153 		handler->process_fun(dispatcher->event_dev_id, port->port_id,
154 				     bursts[i], len, handler->process_data);
155 
156 		dispatched += len;
157 
158 		/*
159 		 * Safe, since any reshuffling will only involve
160 		 * already-processed handlers.
161 		 */
162 		evd_consider_prioritize_handler(lcore, i, len);
163 	}
164 
165 	lcore->stats.ev_batch_count++;
166 	lcore->stats.ev_dispatch_count += dispatch_count;
167 	lcore->stats.ev_drop_count += drop_count;
168 
169 	for (i = 0; i < dispatcher->num_finalizers; i++) {
170 		struct rte_dispatcher_finalizer *finalizer =
171 			&dispatcher->finalizers[i];
172 
173 		finalizer->finalize_fun(dispatcher->event_dev_id,
174 					port->port_id,
175 					finalizer->finalize_data);
176 	}
177 }
178 
179 static __rte_always_inline uint16_t
180 evd_port_dequeue(struct rte_dispatcher *dispatcher,
181 	struct rte_dispatcher_lcore *lcore,
182 	struct rte_dispatcher_lcore_port *port)
183 {
184 	uint16_t batch_size = port->batch_size;
185 	struct rte_event events[batch_size];
186 	uint16_t n;
187 
188 	n = rte_event_dequeue_burst(dispatcher->event_dev_id, port->port_id,
189 				    events, batch_size, port->timeout);
190 
191 	if (likely(n > 0))
192 		evd_dispatch_events(dispatcher, lcore, port, events, n);
193 
194 	lcore->stats.poll_count++;
195 
196 	return n;
197 }
198 
199 static __rte_always_inline uint16_t
200 evd_lcore_process(struct rte_dispatcher *dispatcher,
201 	struct rte_dispatcher_lcore *lcore)
202 {
203 	uint16_t i;
204 	uint16_t event_count = 0;
205 
206 	for (i = 0; i < lcore->num_ports; i++) {
207 		struct rte_dispatcher_lcore_port *port =
208 			&lcore->ports[i];
209 
210 		event_count += evd_port_dequeue(dispatcher, lcore, port);
211 	}
212 
213 	return event_count;
214 }
215 
216 static int32_t
217 evd_process(void *userdata)
218 {
219 	struct rte_dispatcher *dispatcher = userdata;
220 	unsigned int lcore_id = rte_lcore_id();
221 	struct rte_dispatcher_lcore *lcore =
222 		&dispatcher->lcores[lcore_id];
223 	uint64_t event_count;
224 
225 	event_count = evd_lcore_process(dispatcher, lcore);
226 
227 	if (unlikely(event_count == 0))
228 		return -EAGAIN;
229 
230 	return 0;
231 }
232 
233 static int
234 evd_service_register(struct rte_dispatcher *dispatcher)
235 {
236 	struct rte_service_spec service = {
237 		.callback = evd_process,
238 		.callback_userdata = dispatcher,
239 		.capabilities = RTE_SERVICE_CAP_MT_SAFE,
240 		.socket_id = dispatcher->socket_id
241 	};
242 	int rc;
243 
244 	snprintf(service.name, sizeof(service.name), EVD_SERVICE_NAME);
245 
246 	rc = rte_service_component_register(&service, &dispatcher->service_id);
247 	if (rc != 0)
248 		RTE_EDEV_LOG_ERR("Registration of dispatcher service "
249 				 "%s failed with error code %d",
250 				 service.name, rc);
251 
252 	return rc;
253 }
254 
255 static int
256 evd_service_unregister(struct rte_dispatcher *dispatcher)
257 {
258 	int rc;
259 
260 	rc = rte_service_component_unregister(dispatcher->service_id);
261 	if (rc != 0)
262 		RTE_EDEV_LOG_ERR("Unregistration of dispatcher service "
263 				 "failed with error code %d", rc);
264 
265 	return rc;
266 }
267 
268 struct rte_dispatcher *
269 rte_dispatcher_create(uint8_t event_dev_id)
270 {
271 	int socket_id;
272 	struct rte_dispatcher *dispatcher;
273 	int rc;
274 
275 	socket_id = rte_event_dev_socket_id(event_dev_id);
276 
277 	dispatcher =
278 		rte_malloc_socket("dispatcher", sizeof(struct rte_dispatcher),
279 				  RTE_CACHE_LINE_SIZE, socket_id);
280 
281 	if (dispatcher == NULL) {
282 		RTE_EDEV_LOG_ERR("Unable to allocate memory for dispatcher");
283 		rte_errno = ENOMEM;
284 		return NULL;
285 	}
286 
287 	*dispatcher = (struct rte_dispatcher) {
288 		.event_dev_id = event_dev_id,
289 		.socket_id = socket_id
290 	};
291 
292 	rc = evd_service_register(dispatcher);
293 	if (rc < 0) {
294 		rte_free(dispatcher);
295 		rte_errno = -rc;
296 		return NULL;
297 	}
298 
299 	return dispatcher;
300 }
301 
302 int
303 rte_dispatcher_free(struct rte_dispatcher *dispatcher)
304 {
305 	int rc;
306 
307 	if (dispatcher == NULL)
308 		return 0;
309 
310 	rc = evd_service_unregister(dispatcher);
311 	if (rc != 0)
312 		return rc;
313 
314 	rte_free(dispatcher);
315 
316 	return 0;
317 }
318 
319 uint32_t
320 rte_dispatcher_service_id_get(const struct rte_dispatcher *dispatcher)
321 {
322 	return dispatcher->service_id;
323 }
324 
325 static int
326 lcore_port_index(struct rte_dispatcher_lcore *lcore,
327 	uint8_t event_port_id)
328 {
329 	uint16_t i;
330 
331 	for (i = 0; i < lcore->num_ports; i++) {
332 		struct rte_dispatcher_lcore_port *port =
333 			&lcore->ports[i];
334 
335 		if (port->port_id == event_port_id)
336 			return i;
337 	}
338 
339 	return -1;
340 }
341 
342 int
343 rte_dispatcher_bind_port_to_lcore(struct rte_dispatcher *dispatcher,
344 	uint8_t event_port_id, uint16_t batch_size, uint64_t timeout,
345 	unsigned int lcore_id)
346 {
347 	struct rte_dispatcher_lcore *lcore;
348 	struct rte_dispatcher_lcore_port *port;
349 
350 	lcore =	&dispatcher->lcores[lcore_id];
351 
352 	if (lcore->num_ports == EVD_MAX_PORTS_PER_LCORE)
353 		return -ENOMEM;
354 
355 	if (lcore_port_index(lcore, event_port_id) >= 0)
356 		return -EEXIST;
357 
358 	port = &lcore->ports[lcore->num_ports];
359 
360 	*port = (struct rte_dispatcher_lcore_port) {
361 		.port_id = event_port_id,
362 		.batch_size = batch_size,
363 		.timeout = timeout
364 	};
365 
366 	lcore->num_ports++;
367 
368 	return 0;
369 }
370 
371 int
372 rte_dispatcher_unbind_port_from_lcore(struct rte_dispatcher *dispatcher,
373 	uint8_t event_port_id, unsigned int lcore_id)
374 {
375 	struct rte_dispatcher_lcore *lcore;
376 	int port_idx;
377 	struct rte_dispatcher_lcore_port *port;
378 	struct rte_dispatcher_lcore_port *last;
379 
380 	lcore =	&dispatcher->lcores[lcore_id];
381 
382 	port_idx = lcore_port_index(lcore, event_port_id);
383 
384 	if (port_idx < 0)
385 		return -ENOENT;
386 
387 	port = &lcore->ports[port_idx];
388 	last = &lcore->ports[lcore->num_ports - 1];
389 
390 	if (port != last)
391 		*port = *last;
392 
393 	lcore->num_ports--;
394 
395 	return 0;
396 }
397 
398 static struct rte_dispatcher_handler *
399 evd_lcore_get_handler_by_id(struct rte_dispatcher_lcore *lcore, int handler_id)
400 {
401 	uint16_t i;
402 
403 	for (i = 0; i < lcore->num_handlers; i++) {
404 		struct rte_dispatcher_handler *handler =
405 			&lcore->handlers[i];
406 
407 		if (handler->id == handler_id)
408 			return handler;
409 	}
410 
411 	return NULL;
412 }
413 
414 static int
415 evd_alloc_handler_id(struct rte_dispatcher *dispatcher)
416 {
417 	int handler_id = 0;
418 	struct rte_dispatcher_lcore *reference_lcore =
419 		&dispatcher->lcores[0];
420 
421 	if (reference_lcore->num_handlers == EVD_MAX_HANDLERS)
422 		return -1;
423 
424 	while (evd_lcore_get_handler_by_id(reference_lcore, handler_id) != NULL)
425 		handler_id++;
426 
427 	return handler_id;
428 }
429 
430 static void
431 evd_lcore_install_handler(struct rte_dispatcher_lcore *lcore,
432 	const struct rte_dispatcher_handler *handler)
433 {
434 	int handler_idx = lcore->num_handlers;
435 
436 	lcore->handlers[handler_idx] = *handler;
437 	lcore->num_handlers++;
438 }
439 
440 static void
441 evd_install_handler(struct rte_dispatcher *dispatcher,
442 	const struct rte_dispatcher_handler *handler)
443 {
444 	int i;
445 
446 	for (i = 0; i < RTE_MAX_LCORE; i++) {
447 		struct rte_dispatcher_lcore *lcore =
448 			&dispatcher->lcores[i];
449 		evd_lcore_install_handler(lcore, handler);
450 	}
451 }
452 
453 int
454 rte_dispatcher_register(struct rte_dispatcher *dispatcher,
455 	rte_dispatcher_match_t match_fun, void *match_data,
456 	rte_dispatcher_process_t process_fun, void *process_data)
457 {
458 	struct rte_dispatcher_handler handler = {
459 		.match_fun = match_fun,
460 		.match_data = match_data,
461 		.process_fun = process_fun,
462 		.process_data = process_data
463 	};
464 
465 	handler.id = evd_alloc_handler_id(dispatcher);
466 
467 	if (handler.id < 0)
468 		return -ENOMEM;
469 
470 	evd_install_handler(dispatcher, &handler);
471 
472 	return handler.id;
473 }
474 
475 static int
476 evd_lcore_uninstall_handler(struct rte_dispatcher_lcore *lcore,
477 	int handler_id)
478 {
479 	struct rte_dispatcher_handler *unreg_handler;
480 	int handler_idx;
481 	uint16_t last_idx;
482 
483 	unreg_handler = evd_lcore_get_handler_by_id(lcore, handler_id);
484 
485 	if (unreg_handler == NULL) {
486 		RTE_EDEV_LOG_ERR("Invalid handler id %d", handler_id);
487 		return -EINVAL;
488 	}
489 
490 	handler_idx = unreg_handler - &lcore->handlers[0];
491 
492 	last_idx = lcore->num_handlers - 1;
493 
494 	if (handler_idx != last_idx) {
495 		/* move all handlers to maintain handler order */
496 		int n = last_idx - handler_idx;
497 		memmove(unreg_handler, unreg_handler + 1,
498 			sizeof(struct rte_dispatcher_handler) * n);
499 	}
500 
501 	lcore->num_handlers--;
502 
503 	return 0;
504 }
505 
506 static int
507 evd_uninstall_handler(struct rte_dispatcher *dispatcher, int handler_id)
508 {
509 	unsigned int lcore_id;
510 
511 	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
512 		struct rte_dispatcher_lcore *lcore =
513 			&dispatcher->lcores[lcore_id];
514 		int rc;
515 
516 		rc = evd_lcore_uninstall_handler(lcore, handler_id);
517 		if (rc < 0)
518 			return rc;
519 	}
520 
521 	return 0;
522 }
523 
524 int
525 rte_dispatcher_unregister(struct rte_dispatcher *dispatcher, int handler_id)
526 {
527 	return evd_uninstall_handler(dispatcher, handler_id);
528 }
529 
530 static struct rte_dispatcher_finalizer *
531 evd_get_finalizer_by_id(struct rte_dispatcher *dispatcher,
532 		       int handler_id)
533 {
534 	int i;
535 
536 	for (i = 0; i < dispatcher->num_finalizers; i++) {
537 		struct rte_dispatcher_finalizer *finalizer =
538 			&dispatcher->finalizers[i];
539 
540 		if (finalizer->id == handler_id)
541 			return finalizer;
542 	}
543 
544 	return NULL;
545 }
546 
547 static int
548 evd_alloc_finalizer_id(struct rte_dispatcher *dispatcher)
549 {
550 	int finalizer_id = 0;
551 
552 	while (evd_get_finalizer_by_id(dispatcher, finalizer_id) != NULL)
553 		finalizer_id++;
554 
555 	return finalizer_id;
556 }
557 
558 static struct rte_dispatcher_finalizer *
559 evd_alloc_finalizer(struct rte_dispatcher *dispatcher)
560 {
561 	int finalizer_idx;
562 	struct rte_dispatcher_finalizer *finalizer;
563 
564 	if (dispatcher->num_finalizers == EVD_MAX_FINALIZERS)
565 		return NULL;
566 
567 	finalizer_idx = dispatcher->num_finalizers;
568 	finalizer = &dispatcher->finalizers[finalizer_idx];
569 
570 	finalizer->id = evd_alloc_finalizer_id(dispatcher);
571 
572 	dispatcher->num_finalizers++;
573 
574 	return finalizer;
575 }
576 
577 int
578 rte_dispatcher_finalize_register(struct rte_dispatcher *dispatcher,
579 	rte_dispatcher_finalize_t finalize_fun, void *finalize_data)
580 {
581 	struct rte_dispatcher_finalizer *finalizer;
582 
583 	finalizer = evd_alloc_finalizer(dispatcher);
584 
585 	if (finalizer == NULL)
586 		return -ENOMEM;
587 
588 	finalizer->finalize_fun = finalize_fun;
589 	finalizer->finalize_data = finalize_data;
590 
591 	return finalizer->id;
592 }
593 
594 int
595 rte_dispatcher_finalize_unregister(struct rte_dispatcher *dispatcher,
596 	int finalizer_id)
597 {
598 	struct rte_dispatcher_finalizer *unreg_finalizer;
599 	int finalizer_idx;
600 	uint16_t last_idx;
601 
602 	unreg_finalizer = evd_get_finalizer_by_id(dispatcher, finalizer_id);
603 
604 	if (unreg_finalizer == NULL) {
605 		RTE_EDEV_LOG_ERR("Invalid finalizer id %d", finalizer_id);
606 		return -EINVAL;
607 	}
608 
609 	finalizer_idx = unreg_finalizer - &dispatcher->finalizers[0];
610 
611 	last_idx = dispatcher->num_finalizers - 1;
612 
613 	if (finalizer_idx != last_idx) {
614 		/* move all finalizers to maintain order */
615 		int n = last_idx - finalizer_idx;
616 		memmove(unreg_finalizer, unreg_finalizer + 1,
617 			sizeof(struct rte_dispatcher_finalizer) * n);
618 	}
619 
620 	dispatcher->num_finalizers--;
621 
622 	return 0;
623 }
624 
625 static void
626 evd_set_service_runstate(struct rte_dispatcher *dispatcher, int state)
627 {
628 	int rc;
629 
630 	rc = rte_service_component_runstate_set(dispatcher->service_id,
631 						state);
632 	/*
633 	 * The only cause of a runstate_set() failure is an invalid
634 	 * service id, which in turns means the dispatcher instance's
635 	 * state is invalid.
636 	 */
637 	if (rc != 0)
638 		RTE_EDEV_LOG_ERR("Unexpected error %d occurred while setting "
639 				 "service component run state to %d", rc,
640 				 state);
641 
642 	RTE_VERIFY(rc == 0);
643 }
644 
645 void
646 rte_dispatcher_start(struct rte_dispatcher *dispatcher)
647 {
648 	evd_set_service_runstate(dispatcher, 1);
649 }
650 
651 void
652 rte_dispatcher_stop(struct rte_dispatcher *dispatcher)
653 {
654 	evd_set_service_runstate(dispatcher, 0);
655 }
656 
657 static void
658 evd_aggregate_stats(struct rte_dispatcher_stats *result,
659 	const struct rte_dispatcher_stats *part)
660 {
661 	result->poll_count += part->poll_count;
662 	result->ev_batch_count += part->ev_batch_count;
663 	result->ev_dispatch_count += part->ev_dispatch_count;
664 	result->ev_drop_count += part->ev_drop_count;
665 }
666 
667 void
668 rte_dispatcher_stats_get(const struct rte_dispatcher *dispatcher,
669 	struct rte_dispatcher_stats *stats)
670 {
671 	unsigned int lcore_id;
672 
673 	*stats = (struct rte_dispatcher_stats) {};
674 
675 	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
676 		const struct rte_dispatcher_lcore *lcore =
677 			&dispatcher->lcores[lcore_id];
678 
679 		evd_aggregate_stats(stats, &lcore->stats);
680 	}
681 }
682 
683 void
684 rte_dispatcher_stats_reset(struct rte_dispatcher *dispatcher)
685 {
686 	unsigned int lcore_id;
687 
688 	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
689 		struct rte_dispatcher_lcore *lcore =
690 			&dispatcher->lcores[lcore_id];
691 
692 		lcore->stats = (struct rte_dispatcher_stats) {};
693 	}
694 }
695