xref: /dpdk/lib/dispatcher/rte_dispatcher.c (revision c6552d9a8deffa448de2d5e2e726f50508c1efd2)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2023 Ericsson AB
3  */
4 
5 #include <stdbool.h>
6 #include <stdint.h>
7 
8 #include <rte_branch_prediction.h>
9 #include <rte_common.h>
10 #include <rte_lcore.h>
11 #include <rte_random.h>
12 #include <rte_service_component.h>
13 
14 #include "eventdev_pmd.h"
15 
16 #include <rte_dispatcher.h>
17 
18 #define EVD_MAX_PORTS_PER_LCORE 4
19 #define EVD_MAX_HANDLERS 32
20 #define EVD_MAX_FINALIZERS 16
21 #define EVD_AVG_PRIO_INTERVAL 2000
22 #define EVD_SERVICE_NAME "dispatcher"
23 
24 struct rte_dispatcher_lcore_port {
25 	uint8_t port_id;
26 	uint16_t batch_size;
27 	uint64_t timeout;
28 };
29 
30 struct rte_dispatcher_handler {
31 	int id;
32 	rte_dispatcher_match_t match_fun;
33 	void *match_data;
34 	rte_dispatcher_process_t process_fun;
35 	void *process_data;
36 };
37 
38 struct rte_dispatcher_finalizer {
39 	int id;
40 	rte_dispatcher_finalize_t finalize_fun;
41 	void *finalize_data;
42 };
43 
44 struct __rte_cache_aligned rte_dispatcher_lcore {
45 	uint8_t num_ports;
46 	uint16_t num_handlers;
47 	int32_t prio_count;
48 	struct rte_dispatcher_lcore_port ports[EVD_MAX_PORTS_PER_LCORE];
49 	struct rte_dispatcher_handler handlers[EVD_MAX_HANDLERS];
50 	struct rte_dispatcher_stats stats;
51 	RTE_CACHE_GUARD;
52 };
53 
54 struct rte_dispatcher {
55 	uint8_t event_dev_id;
56 	int socket_id;
57 	uint32_t service_id;
58 	struct rte_dispatcher_lcore lcores[RTE_MAX_LCORE];
59 	uint16_t num_finalizers;
60 	struct rte_dispatcher_finalizer finalizers[EVD_MAX_FINALIZERS];
61 };
62 
63 static int
evd_lookup_handler_idx(struct rte_dispatcher_lcore * lcore,const struct rte_event * event)64 evd_lookup_handler_idx(struct rte_dispatcher_lcore *lcore,
65 	const struct rte_event *event)
66 {
67 	uint16_t i;
68 
69 	for (i = 0; i < lcore->num_handlers; i++) {
70 		struct rte_dispatcher_handler *handler =
71 			&lcore->handlers[i];
72 
73 		if (handler->match_fun(event, handler->match_data))
74 			return i;
75 	}
76 
77 	return -1;
78 }
79 
80 static void
evd_prioritize_handler(struct rte_dispatcher_lcore * lcore,int handler_idx)81 evd_prioritize_handler(struct rte_dispatcher_lcore *lcore,
82 	int handler_idx)
83 {
84 	struct rte_dispatcher_handler tmp;
85 
86 	if (handler_idx == 0)
87 		return;
88 
89 	/* Let the lucky handler "bubble" up the list */
90 
91 	tmp = lcore->handlers[handler_idx - 1];
92 	lcore->handlers[handler_idx - 1] = lcore->handlers[handler_idx];
93 	lcore->handlers[handler_idx] = tmp;
94 }
95 
96 static inline void
evd_consider_prioritize_handler(struct rte_dispatcher_lcore * lcore,int handler_idx,uint16_t handler_events)97 evd_consider_prioritize_handler(struct rte_dispatcher_lcore *lcore,
98 	int handler_idx, uint16_t handler_events)
99 {
100 	lcore->prio_count -= handler_events;
101 
102 	if (unlikely(lcore->prio_count <= 0)) {
103 		evd_prioritize_handler(lcore, handler_idx);
104 
105 		/*
106 		 * Randomize the interval in the unlikely case
107 		 * the traffic follow some very strict pattern.
108 		 */
109 		lcore->prio_count =
110 			rte_rand_max(EVD_AVG_PRIO_INTERVAL) +
111 			EVD_AVG_PRIO_INTERVAL / 2;
112 	}
113 }
114 
115 static inline void
evd_dispatch_events(struct rte_dispatcher * dispatcher,struct rte_dispatcher_lcore * lcore,struct rte_dispatcher_lcore_port * port,struct rte_event * events,uint16_t num_events)116 evd_dispatch_events(struct rte_dispatcher *dispatcher,
117 	struct rte_dispatcher_lcore *lcore,
118 	struct rte_dispatcher_lcore_port *port,
119 	struct rte_event *events, uint16_t num_events)
120 {
121 	int i;
122 	struct rte_event bursts[EVD_MAX_HANDLERS][num_events];
123 	uint16_t burst_lens[EVD_MAX_HANDLERS] = { 0 };
124 	uint16_t drop_count = 0;
125 	uint16_t dispatch_count;
126 	uint16_t dispatched = 0;
127 
128 	for (i = 0; i < num_events; i++) {
129 		struct rte_event *event = &events[i];
130 		int handler_idx;
131 
132 		handler_idx = evd_lookup_handler_idx(lcore, event);
133 
134 		if (unlikely(handler_idx < 0)) {
135 			drop_count++;
136 			continue;
137 		}
138 
139 		bursts[handler_idx][burst_lens[handler_idx]] = *event;
140 		burst_lens[handler_idx]++;
141 	}
142 
143 	dispatch_count = num_events - drop_count;
144 
145 	for (i = 0; i < lcore->num_handlers &&
146 		 dispatched < dispatch_count; i++) {
147 		struct rte_dispatcher_handler *handler =
148 			&lcore->handlers[i];
149 		uint16_t len = burst_lens[i];
150 
151 		if (len == 0)
152 			continue;
153 
154 		handler->process_fun(dispatcher->event_dev_id, port->port_id,
155 				     bursts[i], len, handler->process_data);
156 
157 		dispatched += len;
158 
159 		/*
160 		 * Safe, since any reshuffling will only involve
161 		 * already-processed handlers.
162 		 */
163 		evd_consider_prioritize_handler(lcore, i, len);
164 	}
165 
166 	lcore->stats.ev_batch_count++;
167 	lcore->stats.ev_dispatch_count += dispatch_count;
168 	lcore->stats.ev_drop_count += drop_count;
169 
170 	for (i = 0; i < dispatcher->num_finalizers; i++) {
171 		struct rte_dispatcher_finalizer *finalizer =
172 			&dispatcher->finalizers[i];
173 
174 		finalizer->finalize_fun(dispatcher->event_dev_id,
175 					port->port_id,
176 					finalizer->finalize_data);
177 	}
178 }
179 
180 static __rte_always_inline uint16_t
evd_port_dequeue(struct rte_dispatcher * dispatcher,struct rte_dispatcher_lcore * lcore,struct rte_dispatcher_lcore_port * port)181 evd_port_dequeue(struct rte_dispatcher *dispatcher,
182 	struct rte_dispatcher_lcore *lcore,
183 	struct rte_dispatcher_lcore_port *port)
184 {
185 	uint16_t batch_size = port->batch_size;
186 	struct rte_event events[batch_size];
187 	uint16_t n;
188 
189 	n = rte_event_dequeue_burst(dispatcher->event_dev_id, port->port_id,
190 				    events, batch_size, port->timeout);
191 
192 	if (likely(n > 0))
193 		evd_dispatch_events(dispatcher, lcore, port, events, n);
194 
195 	lcore->stats.poll_count++;
196 
197 	return n;
198 }
199 
200 static __rte_always_inline uint16_t
evd_lcore_process(struct rte_dispatcher * dispatcher,struct rte_dispatcher_lcore * lcore)201 evd_lcore_process(struct rte_dispatcher *dispatcher,
202 	struct rte_dispatcher_lcore *lcore)
203 {
204 	uint16_t i;
205 	uint16_t event_count = 0;
206 
207 	for (i = 0; i < lcore->num_ports; i++) {
208 		struct rte_dispatcher_lcore_port *port =
209 			&lcore->ports[i];
210 
211 		event_count += evd_port_dequeue(dispatcher, lcore, port);
212 	}
213 
214 	return event_count;
215 }
216 
217 static int32_t
evd_process(void * userdata)218 evd_process(void *userdata)
219 {
220 	struct rte_dispatcher *dispatcher = userdata;
221 	unsigned int lcore_id = rte_lcore_id();
222 	struct rte_dispatcher_lcore *lcore =
223 		&dispatcher->lcores[lcore_id];
224 	uint64_t event_count;
225 
226 	event_count = evd_lcore_process(dispatcher, lcore);
227 
228 	if (unlikely(event_count == 0))
229 		return -EAGAIN;
230 
231 	return 0;
232 }
233 
234 static int
evd_service_register(struct rte_dispatcher * dispatcher)235 evd_service_register(struct rte_dispatcher *dispatcher)
236 {
237 	struct rte_service_spec service = {
238 		.callback = evd_process,
239 		.callback_userdata = dispatcher,
240 		.capabilities = RTE_SERVICE_CAP_MT_SAFE,
241 		.socket_id = dispatcher->socket_id
242 	};
243 	int rc;
244 
245 	snprintf(service.name, sizeof(service.name), EVD_SERVICE_NAME);
246 
247 	rc = rte_service_component_register(&service, &dispatcher->service_id);
248 	if (rc != 0)
249 		RTE_EDEV_LOG_ERR("Registration of dispatcher service "
250 				 "%s failed with error code %d",
251 				 service.name, rc);
252 
253 	return rc;
254 }
255 
256 static int
evd_service_unregister(struct rte_dispatcher * dispatcher)257 evd_service_unregister(struct rte_dispatcher *dispatcher)
258 {
259 	int rc;
260 
261 	rc = rte_service_component_unregister(dispatcher->service_id);
262 	if (rc != 0)
263 		RTE_EDEV_LOG_ERR("Unregistration of dispatcher service "
264 				 "failed with error code %d", rc);
265 
266 	return rc;
267 }
268 
269 struct rte_dispatcher *
rte_dispatcher_create(uint8_t event_dev_id)270 rte_dispatcher_create(uint8_t event_dev_id)
271 {
272 	int socket_id;
273 	struct rte_dispatcher *dispatcher;
274 	int rc;
275 
276 	socket_id = rte_event_dev_socket_id(event_dev_id);
277 
278 	dispatcher =
279 		rte_malloc_socket("dispatcher", sizeof(struct rte_dispatcher),
280 				  RTE_CACHE_LINE_SIZE, socket_id);
281 
282 	if (dispatcher == NULL) {
283 		RTE_EDEV_LOG_ERR("Unable to allocate memory for dispatcher");
284 		rte_errno = ENOMEM;
285 		return NULL;
286 	}
287 
288 	*dispatcher = (struct rte_dispatcher) {
289 		.event_dev_id = event_dev_id,
290 		.socket_id = socket_id
291 	};
292 
293 	rc = evd_service_register(dispatcher);
294 	if (rc < 0) {
295 		rte_free(dispatcher);
296 		rte_errno = -rc;
297 		return NULL;
298 	}
299 
300 	return dispatcher;
301 }
302 
303 int
rte_dispatcher_free(struct rte_dispatcher * dispatcher)304 rte_dispatcher_free(struct rte_dispatcher *dispatcher)
305 {
306 	int rc;
307 
308 	if (dispatcher == NULL)
309 		return 0;
310 
311 	rc = evd_service_unregister(dispatcher);
312 	if (rc != 0)
313 		return rc;
314 
315 	rte_free(dispatcher);
316 
317 	return 0;
318 }
319 
320 uint32_t
rte_dispatcher_service_id_get(const struct rte_dispatcher * dispatcher)321 rte_dispatcher_service_id_get(const struct rte_dispatcher *dispatcher)
322 {
323 	return dispatcher->service_id;
324 }
325 
326 static int
lcore_port_index(struct rte_dispatcher_lcore * lcore,uint8_t event_port_id)327 lcore_port_index(struct rte_dispatcher_lcore *lcore,
328 	uint8_t event_port_id)
329 {
330 	uint16_t i;
331 
332 	for (i = 0; i < lcore->num_ports; i++) {
333 		struct rte_dispatcher_lcore_port *port =
334 			&lcore->ports[i];
335 
336 		if (port->port_id == event_port_id)
337 			return i;
338 	}
339 
340 	return -1;
341 }
342 
343 int
rte_dispatcher_bind_port_to_lcore(struct rte_dispatcher * dispatcher,uint8_t event_port_id,uint16_t batch_size,uint64_t timeout,unsigned int lcore_id)344 rte_dispatcher_bind_port_to_lcore(struct rte_dispatcher *dispatcher,
345 	uint8_t event_port_id, uint16_t batch_size, uint64_t timeout,
346 	unsigned int lcore_id)
347 {
348 	struct rte_dispatcher_lcore *lcore;
349 	struct rte_dispatcher_lcore_port *port;
350 
351 	lcore =	&dispatcher->lcores[lcore_id];
352 
353 	if (lcore->num_ports == EVD_MAX_PORTS_PER_LCORE)
354 		return -ENOMEM;
355 
356 	if (lcore_port_index(lcore, event_port_id) >= 0)
357 		return -EEXIST;
358 
359 	port = &lcore->ports[lcore->num_ports];
360 
361 	*port = (struct rte_dispatcher_lcore_port) {
362 		.port_id = event_port_id,
363 		.batch_size = batch_size,
364 		.timeout = timeout
365 	};
366 
367 	lcore->num_ports++;
368 
369 	return 0;
370 }
371 
372 int
rte_dispatcher_unbind_port_from_lcore(struct rte_dispatcher * dispatcher,uint8_t event_port_id,unsigned int lcore_id)373 rte_dispatcher_unbind_port_from_lcore(struct rte_dispatcher *dispatcher,
374 	uint8_t event_port_id, unsigned int lcore_id)
375 {
376 	struct rte_dispatcher_lcore *lcore;
377 	int port_idx;
378 	struct rte_dispatcher_lcore_port *port;
379 	struct rte_dispatcher_lcore_port *last;
380 
381 	lcore =	&dispatcher->lcores[lcore_id];
382 
383 	port_idx = lcore_port_index(lcore, event_port_id);
384 
385 	if (port_idx < 0)
386 		return -ENOENT;
387 
388 	port = &lcore->ports[port_idx];
389 	last = &lcore->ports[lcore->num_ports - 1];
390 
391 	if (port != last)
392 		*port = *last;
393 
394 	lcore->num_ports--;
395 
396 	return 0;
397 }
398 
399 static struct rte_dispatcher_handler *
evd_lcore_get_handler_by_id(struct rte_dispatcher_lcore * lcore,int handler_id)400 evd_lcore_get_handler_by_id(struct rte_dispatcher_lcore *lcore, int handler_id)
401 {
402 	uint16_t i;
403 
404 	for (i = 0; i < lcore->num_handlers; i++) {
405 		struct rte_dispatcher_handler *handler =
406 			&lcore->handlers[i];
407 
408 		if (handler->id == handler_id)
409 			return handler;
410 	}
411 
412 	return NULL;
413 }
414 
415 static int
evd_alloc_handler_id(struct rte_dispatcher * dispatcher)416 evd_alloc_handler_id(struct rte_dispatcher *dispatcher)
417 {
418 	int handler_id = 0;
419 	struct rte_dispatcher_lcore *reference_lcore =
420 		&dispatcher->lcores[0];
421 
422 	if (reference_lcore->num_handlers == EVD_MAX_HANDLERS)
423 		return -1;
424 
425 	while (evd_lcore_get_handler_by_id(reference_lcore, handler_id) != NULL)
426 		handler_id++;
427 
428 	return handler_id;
429 }
430 
431 static void
evd_lcore_install_handler(struct rte_dispatcher_lcore * lcore,const struct rte_dispatcher_handler * handler)432 evd_lcore_install_handler(struct rte_dispatcher_lcore *lcore,
433 	const struct rte_dispatcher_handler *handler)
434 {
435 	int handler_idx = lcore->num_handlers;
436 
437 	lcore->handlers[handler_idx] = *handler;
438 	lcore->num_handlers++;
439 }
440 
441 static void
evd_install_handler(struct rte_dispatcher * dispatcher,const struct rte_dispatcher_handler * handler)442 evd_install_handler(struct rte_dispatcher *dispatcher,
443 	const struct rte_dispatcher_handler *handler)
444 {
445 	int i;
446 
447 	for (i = 0; i < RTE_MAX_LCORE; i++) {
448 		struct rte_dispatcher_lcore *lcore =
449 			&dispatcher->lcores[i];
450 		evd_lcore_install_handler(lcore, handler);
451 	}
452 }
453 
454 int
rte_dispatcher_register(struct rte_dispatcher * dispatcher,rte_dispatcher_match_t match_fun,void * match_data,rte_dispatcher_process_t process_fun,void * process_data)455 rte_dispatcher_register(struct rte_dispatcher *dispatcher,
456 	rte_dispatcher_match_t match_fun, void *match_data,
457 	rte_dispatcher_process_t process_fun, void *process_data)
458 {
459 	struct rte_dispatcher_handler handler = {
460 		.match_fun = match_fun,
461 		.match_data = match_data,
462 		.process_fun = process_fun,
463 		.process_data = process_data
464 	};
465 
466 	handler.id = evd_alloc_handler_id(dispatcher);
467 
468 	if (handler.id < 0)
469 		return -ENOMEM;
470 
471 	evd_install_handler(dispatcher, &handler);
472 
473 	return handler.id;
474 }
475 
476 static int
evd_lcore_uninstall_handler(struct rte_dispatcher_lcore * lcore,int handler_id)477 evd_lcore_uninstall_handler(struct rte_dispatcher_lcore *lcore,
478 	int handler_id)
479 {
480 	struct rte_dispatcher_handler *unreg_handler;
481 	int handler_idx;
482 	uint16_t last_idx;
483 
484 	unreg_handler = evd_lcore_get_handler_by_id(lcore, handler_id);
485 
486 	if (unreg_handler == NULL) {
487 		RTE_EDEV_LOG_ERR("Invalid handler id %d", handler_id);
488 		return -EINVAL;
489 	}
490 
491 	handler_idx = unreg_handler - &lcore->handlers[0];
492 
493 	last_idx = lcore->num_handlers - 1;
494 
495 	if (handler_idx != last_idx) {
496 		/* move all handlers to maintain handler order */
497 		int n = last_idx - handler_idx;
498 		memmove(unreg_handler, unreg_handler + 1,
499 			sizeof(struct rte_dispatcher_handler) * n);
500 	}
501 
502 	lcore->num_handlers--;
503 
504 	return 0;
505 }
506 
507 static int
evd_uninstall_handler(struct rte_dispatcher * dispatcher,int handler_id)508 evd_uninstall_handler(struct rte_dispatcher *dispatcher, int handler_id)
509 {
510 	unsigned int lcore_id;
511 
512 	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
513 		struct rte_dispatcher_lcore *lcore =
514 			&dispatcher->lcores[lcore_id];
515 		int rc;
516 
517 		rc = evd_lcore_uninstall_handler(lcore, handler_id);
518 		if (rc < 0)
519 			return rc;
520 	}
521 
522 	return 0;
523 }
524 
525 int
rte_dispatcher_unregister(struct rte_dispatcher * dispatcher,int handler_id)526 rte_dispatcher_unregister(struct rte_dispatcher *dispatcher, int handler_id)
527 {
528 	return evd_uninstall_handler(dispatcher, handler_id);
529 }
530 
531 static struct rte_dispatcher_finalizer *
evd_get_finalizer_by_id(struct rte_dispatcher * dispatcher,int handler_id)532 evd_get_finalizer_by_id(struct rte_dispatcher *dispatcher,
533 		       int handler_id)
534 {
535 	int i;
536 
537 	for (i = 0; i < dispatcher->num_finalizers; i++) {
538 		struct rte_dispatcher_finalizer *finalizer =
539 			&dispatcher->finalizers[i];
540 
541 		if (finalizer->id == handler_id)
542 			return finalizer;
543 	}
544 
545 	return NULL;
546 }
547 
548 static int
evd_alloc_finalizer_id(struct rte_dispatcher * dispatcher)549 evd_alloc_finalizer_id(struct rte_dispatcher *dispatcher)
550 {
551 	int finalizer_id = 0;
552 
553 	while (evd_get_finalizer_by_id(dispatcher, finalizer_id) != NULL)
554 		finalizer_id++;
555 
556 	return finalizer_id;
557 }
558 
559 static struct rte_dispatcher_finalizer *
evd_alloc_finalizer(struct rte_dispatcher * dispatcher)560 evd_alloc_finalizer(struct rte_dispatcher *dispatcher)
561 {
562 	int finalizer_idx;
563 	struct rte_dispatcher_finalizer *finalizer;
564 
565 	if (dispatcher->num_finalizers == EVD_MAX_FINALIZERS)
566 		return NULL;
567 
568 	finalizer_idx = dispatcher->num_finalizers;
569 	finalizer = &dispatcher->finalizers[finalizer_idx];
570 
571 	finalizer->id = evd_alloc_finalizer_id(dispatcher);
572 
573 	dispatcher->num_finalizers++;
574 
575 	return finalizer;
576 }
577 
578 int
rte_dispatcher_finalize_register(struct rte_dispatcher * dispatcher,rte_dispatcher_finalize_t finalize_fun,void * finalize_data)579 rte_dispatcher_finalize_register(struct rte_dispatcher *dispatcher,
580 	rte_dispatcher_finalize_t finalize_fun, void *finalize_data)
581 {
582 	struct rte_dispatcher_finalizer *finalizer;
583 
584 	finalizer = evd_alloc_finalizer(dispatcher);
585 
586 	if (finalizer == NULL)
587 		return -ENOMEM;
588 
589 	finalizer->finalize_fun = finalize_fun;
590 	finalizer->finalize_data = finalize_data;
591 
592 	return finalizer->id;
593 }
594 
595 int
rte_dispatcher_finalize_unregister(struct rte_dispatcher * dispatcher,int finalizer_id)596 rte_dispatcher_finalize_unregister(struct rte_dispatcher *dispatcher,
597 	int finalizer_id)
598 {
599 	struct rte_dispatcher_finalizer *unreg_finalizer;
600 	int finalizer_idx;
601 	uint16_t last_idx;
602 
603 	unreg_finalizer = evd_get_finalizer_by_id(dispatcher, finalizer_id);
604 
605 	if (unreg_finalizer == NULL) {
606 		RTE_EDEV_LOG_ERR("Invalid finalizer id %d", finalizer_id);
607 		return -EINVAL;
608 	}
609 
610 	finalizer_idx = unreg_finalizer - &dispatcher->finalizers[0];
611 
612 	last_idx = dispatcher->num_finalizers - 1;
613 
614 	if (finalizer_idx != last_idx) {
615 		/* move all finalizers to maintain order */
616 		int n = last_idx - finalizer_idx;
617 		memmove(unreg_finalizer, unreg_finalizer + 1,
618 			sizeof(struct rte_dispatcher_finalizer) * n);
619 	}
620 
621 	dispatcher->num_finalizers--;
622 
623 	return 0;
624 }
625 
626 static void
evd_set_service_runstate(struct rte_dispatcher * dispatcher,int state)627 evd_set_service_runstate(struct rte_dispatcher *dispatcher, int state)
628 {
629 	int rc;
630 
631 	rc = rte_service_component_runstate_set(dispatcher->service_id,
632 						state);
633 	/*
634 	 * The only cause of a runstate_set() failure is an invalid
635 	 * service id, which in turns means the dispatcher instance's
636 	 * state is invalid.
637 	 */
638 	if (rc != 0)
639 		RTE_EDEV_LOG_ERR("Unexpected error %d occurred while setting "
640 				 "service component run state to %d", rc,
641 				 state);
642 
643 	RTE_VERIFY(rc == 0);
644 }
645 
646 void
rte_dispatcher_start(struct rte_dispatcher * dispatcher)647 rte_dispatcher_start(struct rte_dispatcher *dispatcher)
648 {
649 	evd_set_service_runstate(dispatcher, 1);
650 }
651 
652 void
rte_dispatcher_stop(struct rte_dispatcher * dispatcher)653 rte_dispatcher_stop(struct rte_dispatcher *dispatcher)
654 {
655 	evd_set_service_runstate(dispatcher, 0);
656 }
657 
658 static void
evd_aggregate_stats(struct rte_dispatcher_stats * result,const struct rte_dispatcher_stats * part)659 evd_aggregate_stats(struct rte_dispatcher_stats *result,
660 	const struct rte_dispatcher_stats *part)
661 {
662 	result->poll_count += part->poll_count;
663 	result->ev_batch_count += part->ev_batch_count;
664 	result->ev_dispatch_count += part->ev_dispatch_count;
665 	result->ev_drop_count += part->ev_drop_count;
666 }
667 
668 void
rte_dispatcher_stats_get(const struct rte_dispatcher * dispatcher,struct rte_dispatcher_stats * stats)669 rte_dispatcher_stats_get(const struct rte_dispatcher *dispatcher,
670 	struct rte_dispatcher_stats *stats)
671 {
672 	unsigned int lcore_id;
673 
674 	*stats = (struct rte_dispatcher_stats) {};
675 
676 	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
677 		const struct rte_dispatcher_lcore *lcore =
678 			&dispatcher->lcores[lcore_id];
679 
680 		evd_aggregate_stats(stats, &lcore->stats);
681 	}
682 }
683 
684 void
rte_dispatcher_stats_reset(struct rte_dispatcher * dispatcher)685 rte_dispatcher_stats_reset(struct rte_dispatcher *dispatcher)
686 {
687 	unsigned int lcore_id;
688 
689 	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
690 		struct rte_dispatcher_lcore *lcore =
691 			&dispatcher->lcores[lcore_id];
692 
693 		lcore->stats = (struct rte_dispatcher_stats) {};
694 	}
695 }
696