xref: /dpdk/lib/eal/common/rte_service.c (revision 33b84a2efca7ac188def108ba8b981daa7572b9a)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4 
5 #include <stdio.h>
6 #include <inttypes.h>
7 #include <string.h>
8 
9 #include <rte_service.h>
10 #include <rte_service_component.h>
11 
12 #include <rte_lcore.h>
13 #include <rte_branch_prediction.h>
14 #include <rte_common.h>
15 #include <rte_cycles.h>
16 #include <rte_atomic.h>
17 #include <rte_malloc.h>
18 #include <rte_spinlock.h>
19 
20 #include "eal_private.h"
21 
22 #define RTE_SERVICE_NUM_MAX 64
23 
24 #define SERVICE_F_REGISTERED    (1 << 0)
25 #define SERVICE_F_STATS_ENABLED (1 << 1)
26 #define SERVICE_F_START_CHECK   (1 << 2)
27 
28 /* runstates for services and lcores, denoting if they are active or not */
29 #define RUNSTATE_STOPPED 0
30 #define RUNSTATE_RUNNING 1
31 
32 /* internal representation of a service */
33 struct rte_service_spec_impl {
34 	/* public part of the struct */
35 	struct rte_service_spec spec;
36 
37 	/* spin lock that when set indicates a service core is currently
38 	 * running this service callback. When not set, a core may take the
39 	 * lock and then run the service callback.
40 	 */
41 	rte_spinlock_t execute_lock;
42 
43 	/* API set/get-able variables */
44 	int8_t app_runstate;
45 	int8_t comp_runstate;
46 	uint8_t internal_flags;
47 
48 	/* per service statistics */
49 	/* Indicates how many cores the service is mapped to run on.
50 	 * It does not indicate the number of cores the service is running
51 	 * on currently.
52 	 */
53 	uint32_t num_mapped_cores;
54 } __rte_cache_aligned;
55 
56 struct service_stats {
57 	uint64_t calls;
58 	uint64_t cycles;
59 };
60 
61 /* the internal values of a service core */
62 struct core_state {
63 	/* map of services IDs are run on this core */
64 	uint64_t service_mask;
65 	uint8_t runstate; /* running or stopped */
66 	uint8_t thread_active; /* indicates when thread is in service_run() */
67 	uint8_t is_service_core; /* set if core is currently a service core */
68 	uint8_t service_active_on_lcore[RTE_SERVICE_NUM_MAX];
69 	uint64_t loops;
70 	uint64_t cycles;
71 	struct service_stats service_stats[RTE_SERVICE_NUM_MAX];
72 } __rte_cache_aligned;
73 
74 static uint32_t rte_service_count;
75 static struct rte_service_spec_impl *rte_services;
76 static struct core_state *lcore_states;
77 static uint32_t rte_service_library_initialized;
78 
79 int32_t
80 rte_service_init(void)
81 {
82 	/* Hard limit due to the use of an uint64_t-based bitmask (and the
83 	 * clzl intrinsic).
84 	 */
85 	RTE_BUILD_BUG_ON(RTE_SERVICE_NUM_MAX > 64);
86 
87 	if (rte_service_library_initialized) {
88 		RTE_LOG(NOTICE, EAL,
89 			"service library init() called, init flag %d\n",
90 			rte_service_library_initialized);
91 		return -EALREADY;
92 	}
93 
94 	rte_services = rte_calloc("rte_services", RTE_SERVICE_NUM_MAX,
95 			sizeof(struct rte_service_spec_impl),
96 			RTE_CACHE_LINE_SIZE);
97 	if (!rte_services) {
98 		RTE_LOG(ERR, EAL, "error allocating rte services array\n");
99 		goto fail_mem;
100 	}
101 
102 	lcore_states = rte_calloc("rte_service_core_states", RTE_MAX_LCORE,
103 			sizeof(struct core_state), RTE_CACHE_LINE_SIZE);
104 	if (!lcore_states) {
105 		RTE_LOG(ERR, EAL, "error allocating core states array\n");
106 		goto fail_mem;
107 	}
108 
109 	int i;
110 	struct rte_config *cfg = rte_eal_get_configuration();
111 	for (i = 0; i < RTE_MAX_LCORE; i++) {
112 		if (lcore_config[i].core_role == ROLE_SERVICE) {
113 			if ((unsigned int)i == cfg->main_lcore)
114 				continue;
115 			rte_service_lcore_add(i);
116 		}
117 	}
118 
119 	rte_service_library_initialized = 1;
120 	return 0;
121 fail_mem:
122 	rte_free(rte_services);
123 	rte_free(lcore_states);
124 	return -ENOMEM;
125 }
126 
127 void
128 rte_service_finalize(void)
129 {
130 	if (!rte_service_library_initialized)
131 		return;
132 
133 	rte_service_lcore_reset_all();
134 	rte_eal_mp_wait_lcore();
135 
136 	rte_free(rte_services);
137 	rte_free(lcore_states);
138 
139 	rte_service_library_initialized = 0;
140 }
141 
142 static inline bool
143 service_registered(uint32_t id)
144 {
145 	return rte_services[id].internal_flags & SERVICE_F_REGISTERED;
146 }
147 
148 static inline bool
149 service_valid(uint32_t id)
150 {
151 	return id < RTE_SERVICE_NUM_MAX && service_registered(id);
152 }
153 
154 static struct rte_service_spec_impl *
155 service_get(uint32_t id)
156 {
157 	return &rte_services[id];
158 }
159 
160 /* validate ID and retrieve service pointer, or return error value */
161 #define SERVICE_VALID_GET_OR_ERR_RET(id, service, retval) do {          \
162 	if (!service_valid(id))                                         \
163 		return retval;                                          \
164 	service = &rte_services[id];                                    \
165 } while (0)
166 
167 /* returns 1 if statistics should be collected for service
168  * Returns 0 if statistics should not be collected for service
169  */
170 static inline int
171 service_stats_enabled(struct rte_service_spec_impl *impl)
172 {
173 	return !!(impl->internal_flags & SERVICE_F_STATS_ENABLED);
174 }
175 
176 static inline int
177 service_mt_safe(struct rte_service_spec_impl *s)
178 {
179 	return !!(s->spec.capabilities & RTE_SERVICE_CAP_MT_SAFE);
180 }
181 
182 int32_t
183 rte_service_set_stats_enable(uint32_t id, int32_t enabled)
184 {
185 	struct rte_service_spec_impl *s;
186 	SERVICE_VALID_GET_OR_ERR_RET(id, s, 0);
187 
188 	if (enabled)
189 		s->internal_flags |= SERVICE_F_STATS_ENABLED;
190 	else
191 		s->internal_flags &= ~(SERVICE_F_STATS_ENABLED);
192 
193 	return 0;
194 }
195 
196 int32_t
197 rte_service_set_runstate_mapped_check(uint32_t id, int32_t enabled)
198 {
199 	struct rte_service_spec_impl *s;
200 	SERVICE_VALID_GET_OR_ERR_RET(id, s, 0);
201 
202 	if (enabled)
203 		s->internal_flags |= SERVICE_F_START_CHECK;
204 	else
205 		s->internal_flags &= ~(SERVICE_F_START_CHECK);
206 
207 	return 0;
208 }
209 
210 uint32_t
211 rte_service_get_count(void)
212 {
213 	return rte_service_count;
214 }
215 
216 int32_t
217 rte_service_get_by_name(const char *name, uint32_t *service_id)
218 {
219 	if (!service_id)
220 		return -EINVAL;
221 
222 	int i;
223 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
224 		if (service_registered(i) &&
225 				strcmp(name, rte_services[i].spec.name) == 0) {
226 			*service_id = i;
227 			return 0;
228 		}
229 	}
230 
231 	return -ENODEV;
232 }
233 
234 const char *
235 rte_service_get_name(uint32_t id)
236 {
237 	struct rte_service_spec_impl *s;
238 	SERVICE_VALID_GET_OR_ERR_RET(id, s, 0);
239 	return s->spec.name;
240 }
241 
242 int32_t
243 rte_service_probe_capability(uint32_t id, uint32_t capability)
244 {
245 	struct rte_service_spec_impl *s;
246 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
247 	return !!(s->spec.capabilities & capability);
248 }
249 
250 int32_t
251 rte_service_component_register(const struct rte_service_spec *spec,
252 			       uint32_t *id_ptr)
253 {
254 	uint32_t i;
255 	int32_t free_slot = -1;
256 
257 	if (spec->callback == NULL || strlen(spec->name) == 0)
258 		return -EINVAL;
259 
260 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
261 		if (!service_registered(i)) {
262 			free_slot = i;
263 			break;
264 		}
265 	}
266 
267 	if ((free_slot < 0) || (i == RTE_SERVICE_NUM_MAX))
268 		return -ENOSPC;
269 
270 	struct rte_service_spec_impl *s = &rte_services[free_slot];
271 	s->spec = *spec;
272 	s->internal_flags |= SERVICE_F_REGISTERED | SERVICE_F_START_CHECK;
273 
274 	rte_service_count++;
275 
276 	if (id_ptr)
277 		*id_ptr = free_slot;
278 
279 	return 0;
280 }
281 
282 int32_t
283 rte_service_component_unregister(uint32_t id)
284 {
285 	uint32_t i;
286 	struct rte_service_spec_impl *s;
287 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
288 
289 	rte_service_count--;
290 
291 	s->internal_flags &= ~(SERVICE_F_REGISTERED);
292 
293 	/* clear the run-bit in all cores */
294 	for (i = 0; i < RTE_MAX_LCORE; i++)
295 		lcore_states[i].service_mask &= ~(UINT64_C(1) << id);
296 
297 	memset(&rte_services[id], 0, sizeof(struct rte_service_spec_impl));
298 
299 	return 0;
300 }
301 
302 int32_t
303 rte_service_component_runstate_set(uint32_t id, uint32_t runstate)
304 {
305 	struct rte_service_spec_impl *s;
306 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
307 
308 	/* comp_runstate act as the guard variable. Use store-release
309 	 * memory order. This synchronizes with load-acquire in
310 	 * service_run and service_runstate_get function.
311 	 */
312 	if (runstate)
313 		__atomic_store_n(&s->comp_runstate, RUNSTATE_RUNNING,
314 			__ATOMIC_RELEASE);
315 	else
316 		__atomic_store_n(&s->comp_runstate, RUNSTATE_STOPPED,
317 			__ATOMIC_RELEASE);
318 
319 	return 0;
320 }
321 
322 int32_t
323 rte_service_runstate_set(uint32_t id, uint32_t runstate)
324 {
325 	struct rte_service_spec_impl *s;
326 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
327 
328 	/* app_runstate act as the guard variable. Use store-release
329 	 * memory order. This synchronizes with load-acquire in
330 	 * service_run runstate_get function.
331 	 */
332 	if (runstate)
333 		__atomic_store_n(&s->app_runstate, RUNSTATE_RUNNING,
334 			__ATOMIC_RELEASE);
335 	else
336 		__atomic_store_n(&s->app_runstate, RUNSTATE_STOPPED,
337 			__ATOMIC_RELEASE);
338 
339 	return 0;
340 }
341 
342 int32_t
343 rte_service_runstate_get(uint32_t id)
344 {
345 	struct rte_service_spec_impl *s;
346 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
347 
348 	/* comp_runstate and app_runstate act as the guard variables.
349 	 * Use load-acquire memory order. This synchronizes with
350 	 * store-release in service state set functions.
351 	 */
352 	if (__atomic_load_n(&s->comp_runstate, __ATOMIC_ACQUIRE) ==
353 			RUNSTATE_RUNNING &&
354 	    __atomic_load_n(&s->app_runstate, __ATOMIC_ACQUIRE) ==
355 			RUNSTATE_RUNNING) {
356 		int check_disabled = !(s->internal_flags &
357 			SERVICE_F_START_CHECK);
358 		int lcore_mapped = (__atomic_load_n(&s->num_mapped_cores,
359 			__ATOMIC_RELAXED) > 0);
360 
361 		return (check_disabled | lcore_mapped);
362 	} else
363 		return 0;
364 
365 }
366 
367 static inline void
368 service_runner_do_callback(struct rte_service_spec_impl *s,
369 			   struct core_state *cs, uint32_t service_idx)
370 {
371 	void *userdata = s->spec.callback_userdata;
372 
373 	if (service_stats_enabled(s)) {
374 		uint64_t start = rte_rdtsc();
375 		int rc = s->spec.callback(userdata);
376 
377 		/* The lcore service worker thread is the only writer,
378 		 * and thus only a non-atomic load and an atomic store
379 		 * is needed, and not the more expensive atomic
380 		 * add.
381 		 */
382 		struct service_stats *service_stats =
383 			&cs->service_stats[service_idx];
384 
385 		if (likely(rc != -EAGAIN)) {
386 			uint64_t end = rte_rdtsc();
387 			uint64_t cycles = end - start;
388 
389 			__atomic_store_n(&cs->cycles, cs->cycles + cycles,
390 				__ATOMIC_RELAXED);
391 			__atomic_store_n(&service_stats->cycles,
392 				service_stats->cycles + cycles,
393 				__ATOMIC_RELAXED);
394 		}
395 
396 		__atomic_store_n(&service_stats->calls,
397 			service_stats->calls + 1, __ATOMIC_RELAXED);
398 	} else
399 		s->spec.callback(userdata);
400 }
401 
402 
403 /* Expects the service 's' is valid. */
404 static int32_t
405 service_run(uint32_t i, struct core_state *cs, uint64_t service_mask,
406 	    struct rte_service_spec_impl *s, uint32_t serialize_mt_unsafe)
407 {
408 	if (!s)
409 		return -EINVAL;
410 
411 	/* comp_runstate and app_runstate act as the guard variables.
412 	 * Use load-acquire memory order. This synchronizes with
413 	 * store-release in service state set functions.
414 	 */
415 	if (__atomic_load_n(&s->comp_runstate, __ATOMIC_ACQUIRE) !=
416 			RUNSTATE_RUNNING ||
417 	    __atomic_load_n(&s->app_runstate, __ATOMIC_ACQUIRE) !=
418 			RUNSTATE_RUNNING ||
419 	    !(service_mask & (UINT64_C(1) << i))) {
420 		cs->service_active_on_lcore[i] = 0;
421 		return -ENOEXEC;
422 	}
423 
424 	cs->service_active_on_lcore[i] = 1;
425 
426 	if ((service_mt_safe(s) == 0) && (serialize_mt_unsafe == 1)) {
427 		if (!rte_spinlock_trylock(&s->execute_lock))
428 			return -EBUSY;
429 
430 		service_runner_do_callback(s, cs, i);
431 		rte_spinlock_unlock(&s->execute_lock);
432 	} else
433 		service_runner_do_callback(s, cs, i);
434 
435 	return 0;
436 }
437 
438 int32_t
439 rte_service_may_be_active(uint32_t id)
440 {
441 	uint32_t ids[RTE_MAX_LCORE] = {0};
442 	int32_t lcore_count = rte_service_lcore_list(ids, RTE_MAX_LCORE);
443 	int i;
444 
445 	if (!service_valid(id))
446 		return -EINVAL;
447 
448 	for (i = 0; i < lcore_count; i++) {
449 		if (lcore_states[ids[i]].service_active_on_lcore[id])
450 			return 1;
451 	}
452 
453 	return 0;
454 }
455 
456 int32_t
457 rte_service_run_iter_on_app_lcore(uint32_t id, uint32_t serialize_mt_unsafe)
458 {
459 	struct core_state *cs = &lcore_states[rte_lcore_id()];
460 	struct rte_service_spec_impl *s;
461 
462 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
463 
464 	/* Increment num_mapped_cores to reflect that this core is
465 	 * now mapped capable of running the service.
466 	 */
467 	__atomic_add_fetch(&s->num_mapped_cores, 1, __ATOMIC_RELAXED);
468 
469 	int ret = service_run(id, cs, UINT64_MAX, s, serialize_mt_unsafe);
470 
471 	__atomic_sub_fetch(&s->num_mapped_cores, 1, __ATOMIC_RELAXED);
472 
473 	return ret;
474 }
475 
476 static int32_t
477 service_runner_func(void *arg)
478 {
479 	RTE_SET_USED(arg);
480 	uint8_t i;
481 	const int lcore = rte_lcore_id();
482 	struct core_state *cs = &lcore_states[lcore];
483 
484 	__atomic_store_n(&cs->thread_active, 1, __ATOMIC_SEQ_CST);
485 
486 	/* runstate act as the guard variable. Use load-acquire
487 	 * memory order here to synchronize with store-release
488 	 * in runstate update functions.
489 	 */
490 	while (__atomic_load_n(&cs->runstate, __ATOMIC_ACQUIRE) ==
491 			RUNSTATE_RUNNING) {
492 
493 		const uint64_t service_mask = cs->service_mask;
494 		uint8_t start_id;
495 		uint8_t end_id;
496 
497 		if (service_mask == 0)
498 			continue;
499 
500 		start_id = __builtin_ctzl(service_mask);
501 		end_id = 64 - __builtin_clzl(service_mask);
502 
503 		for (i = start_id; i < end_id; i++) {
504 			/* return value ignored as no change to code flow */
505 			service_run(i, cs, service_mask, service_get(i), 1);
506 		}
507 
508 		__atomic_store_n(&cs->loops, cs->loops + 1, __ATOMIC_RELAXED);
509 	}
510 
511 	/* Switch off this core for all services, to ensure that future
512 	 * calls to may_be_active() know this core is switched off.
513 	 */
514 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++)
515 		cs->service_active_on_lcore[i] = 0;
516 
517 	/* Use SEQ CST memory ordering to avoid any re-ordering around
518 	 * this store, ensuring that once this store is visible, the service
519 	 * lcore thread really is done in service cores code.
520 	 */
521 	__atomic_store_n(&cs->thread_active, 0, __ATOMIC_SEQ_CST);
522 	return 0;
523 }
524 
525 int32_t
526 rte_service_lcore_may_be_active(uint32_t lcore)
527 {
528 	if (lcore >= RTE_MAX_LCORE || !lcore_states[lcore].is_service_core)
529 		return -EINVAL;
530 
531 	/* Load thread_active using ACQUIRE to avoid instructions dependent on
532 	 * the result being re-ordered before this load completes.
533 	 */
534 	return __atomic_load_n(&lcore_states[lcore].thread_active,
535 			       __ATOMIC_ACQUIRE);
536 }
537 
538 int32_t
539 rte_service_lcore_count(void)
540 {
541 	int32_t count = 0;
542 	uint32_t i;
543 	for (i = 0; i < RTE_MAX_LCORE; i++)
544 		count += lcore_states[i].is_service_core;
545 	return count;
546 }
547 
548 int32_t
549 rte_service_lcore_list(uint32_t array[], uint32_t n)
550 {
551 	uint32_t count = rte_service_lcore_count();
552 	if (count > n)
553 		return -ENOMEM;
554 
555 	if (!array)
556 		return -EINVAL;
557 
558 	uint32_t i;
559 	uint32_t idx = 0;
560 	for (i = 0; i < RTE_MAX_LCORE; i++) {
561 		struct core_state *cs = &lcore_states[i];
562 		if (cs->is_service_core) {
563 			array[idx] = i;
564 			idx++;
565 		}
566 	}
567 
568 	return count;
569 }
570 
571 int32_t
572 rte_service_lcore_count_services(uint32_t lcore)
573 {
574 	if (lcore >= RTE_MAX_LCORE)
575 		return -EINVAL;
576 
577 	struct core_state *cs = &lcore_states[lcore];
578 	if (!cs->is_service_core)
579 		return -ENOTSUP;
580 
581 	return __builtin_popcountll(cs->service_mask);
582 }
583 
584 int32_t
585 rte_service_start_with_defaults(void)
586 {
587 	/* create a default mapping from cores to services, then start the
588 	 * services to make them transparent to unaware applications.
589 	 */
590 	uint32_t i;
591 	int ret;
592 	uint32_t count = rte_service_get_count();
593 
594 	int32_t lcore_iter = 0;
595 	uint32_t ids[RTE_MAX_LCORE] = {0};
596 	int32_t lcore_count = rte_service_lcore_list(ids, RTE_MAX_LCORE);
597 
598 	if (lcore_count == 0)
599 		return -ENOTSUP;
600 
601 	for (i = 0; (int)i < lcore_count; i++)
602 		rte_service_lcore_start(ids[i]);
603 
604 	for (i = 0; i < count; i++) {
605 		/* do 1:1 core mapping here, with each service getting
606 		 * assigned a single core by default. Adding multiple services
607 		 * should multiplex to a single core, or 1:1 if there are the
608 		 * same amount of services as service-cores
609 		 */
610 		ret = rte_service_map_lcore_set(i, ids[lcore_iter], 1);
611 		if (ret)
612 			return -ENODEV;
613 
614 		lcore_iter++;
615 		if (lcore_iter >= lcore_count)
616 			lcore_iter = 0;
617 
618 		ret = rte_service_runstate_set(i, 1);
619 		if (ret)
620 			return -ENOEXEC;
621 	}
622 
623 	return 0;
624 }
625 
626 static int32_t
627 service_update(uint32_t sid, uint32_t lcore, uint32_t *set, uint32_t *enabled)
628 {
629 	/* validate ID, or return error value */
630 	if (!service_valid(sid) || lcore >= RTE_MAX_LCORE ||
631 			!lcore_states[lcore].is_service_core)
632 		return -EINVAL;
633 
634 	uint64_t sid_mask = UINT64_C(1) << sid;
635 	if (set) {
636 		uint64_t lcore_mapped = lcore_states[lcore].service_mask &
637 			sid_mask;
638 
639 		if (*set && !lcore_mapped) {
640 			lcore_states[lcore].service_mask |= sid_mask;
641 			__atomic_add_fetch(&rte_services[sid].num_mapped_cores,
642 				1, __ATOMIC_RELAXED);
643 		}
644 		if (!*set && lcore_mapped) {
645 			lcore_states[lcore].service_mask &= ~(sid_mask);
646 			__atomic_sub_fetch(&rte_services[sid].num_mapped_cores,
647 				1, __ATOMIC_RELAXED);
648 		}
649 	}
650 
651 	if (enabled)
652 		*enabled = !!(lcore_states[lcore].service_mask & (sid_mask));
653 
654 	return 0;
655 }
656 
657 int32_t
658 rte_service_map_lcore_set(uint32_t id, uint32_t lcore, uint32_t enabled)
659 {
660 	uint32_t on = enabled > 0;
661 	return service_update(id, lcore, &on, 0);
662 }
663 
664 int32_t
665 rte_service_map_lcore_get(uint32_t id, uint32_t lcore)
666 {
667 	uint32_t enabled;
668 	int ret = service_update(id, lcore, 0, &enabled);
669 	if (ret == 0)
670 		return enabled;
671 	return ret;
672 }
673 
674 static void
675 set_lcore_state(uint32_t lcore, int32_t state)
676 {
677 	/* mark core state in hugepage backed config */
678 	struct rte_config *cfg = rte_eal_get_configuration();
679 	cfg->lcore_role[lcore] = state;
680 
681 	/* mark state in process local lcore_config */
682 	lcore_config[lcore].core_role = state;
683 
684 	/* update per-lcore optimized state tracking */
685 	lcore_states[lcore].is_service_core = (state == ROLE_SERVICE);
686 }
687 
688 int32_t
689 rte_service_lcore_reset_all(void)
690 {
691 	/* loop over cores, reset all to mask 0 */
692 	uint32_t i;
693 	for (i = 0; i < RTE_MAX_LCORE; i++) {
694 		if (lcore_states[i].is_service_core) {
695 			lcore_states[i].service_mask = 0;
696 			set_lcore_state(i, ROLE_RTE);
697 			/* runstate act as guard variable Use
698 			 * store-release memory order here to synchronize
699 			 * with load-acquire in runstate read functions.
700 			 */
701 			__atomic_store_n(&lcore_states[i].runstate,
702 				RUNSTATE_STOPPED, __ATOMIC_RELEASE);
703 		}
704 	}
705 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++)
706 		__atomic_store_n(&rte_services[i].num_mapped_cores, 0,
707 			__ATOMIC_RELAXED);
708 
709 	return 0;
710 }
711 
712 int32_t
713 rte_service_lcore_add(uint32_t lcore)
714 {
715 	if (lcore >= RTE_MAX_LCORE)
716 		return -EINVAL;
717 	if (lcore_states[lcore].is_service_core)
718 		return -EALREADY;
719 
720 	set_lcore_state(lcore, ROLE_SERVICE);
721 
722 	/* ensure that after adding a core the mask and state are defaults */
723 	lcore_states[lcore].service_mask = 0;
724 	/* Use store-release memory order here to synchronize with
725 	 * load-acquire in runstate read functions.
726 	 */
727 	__atomic_store_n(&lcore_states[lcore].runstate, RUNSTATE_STOPPED,
728 		__ATOMIC_RELEASE);
729 
730 	return rte_eal_wait_lcore(lcore);
731 }
732 
733 int32_t
734 rte_service_lcore_del(uint32_t lcore)
735 {
736 	if (lcore >= RTE_MAX_LCORE)
737 		return -EINVAL;
738 
739 	struct core_state *cs = &lcore_states[lcore];
740 	if (!cs->is_service_core)
741 		return -EINVAL;
742 
743 	/* runstate act as the guard variable. Use load-acquire
744 	 * memory order here to synchronize with store-release
745 	 * in runstate update functions.
746 	 */
747 	if (__atomic_load_n(&cs->runstate, __ATOMIC_ACQUIRE) !=
748 			RUNSTATE_STOPPED)
749 		return -EBUSY;
750 
751 	set_lcore_state(lcore, ROLE_RTE);
752 
753 	rte_smp_wmb();
754 	return 0;
755 }
756 
757 int32_t
758 rte_service_lcore_start(uint32_t lcore)
759 {
760 	if (lcore >= RTE_MAX_LCORE)
761 		return -EINVAL;
762 
763 	struct core_state *cs = &lcore_states[lcore];
764 	if (!cs->is_service_core)
765 		return -EINVAL;
766 
767 	/* runstate act as the guard variable. Use load-acquire
768 	 * memory order here to synchronize with store-release
769 	 * in runstate update functions.
770 	 */
771 	if (__atomic_load_n(&cs->runstate, __ATOMIC_ACQUIRE) ==
772 			RUNSTATE_RUNNING)
773 		return -EALREADY;
774 
775 	/* set core to run state first, and then launch otherwise it will
776 	 * return immediately as runstate keeps it in the service poll loop
777 	 */
778 	/* Use load-acquire memory order here to synchronize with
779 	 * store-release in runstate update functions.
780 	 */
781 	__atomic_store_n(&cs->runstate, RUNSTATE_RUNNING, __ATOMIC_RELEASE);
782 
783 	int ret = rte_eal_remote_launch(service_runner_func, 0, lcore);
784 	/* returns -EBUSY if the core is already launched, 0 on success */
785 	return ret;
786 }
787 
788 int32_t
789 rte_service_lcore_stop(uint32_t lcore)
790 {
791 	if (lcore >= RTE_MAX_LCORE)
792 		return -EINVAL;
793 
794 	/* runstate act as the guard variable. Use load-acquire
795 	 * memory order here to synchronize with store-release
796 	 * in runstate update functions.
797 	 */
798 	if (__atomic_load_n(&lcore_states[lcore].runstate, __ATOMIC_ACQUIRE) ==
799 			RUNSTATE_STOPPED)
800 		return -EALREADY;
801 
802 	uint32_t i;
803 	struct core_state *cs = &lcore_states[lcore];
804 	uint64_t service_mask = cs->service_mask;
805 
806 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
807 		int32_t enabled = service_mask & (UINT64_C(1) << i);
808 		int32_t service_running = rte_service_runstate_get(i);
809 		int32_t only_core = (1 ==
810 			__atomic_load_n(&rte_services[i].num_mapped_cores,
811 				__ATOMIC_RELAXED));
812 
813 		/* if the core is mapped, and the service is running, and this
814 		 * is the only core that is mapped, the service would cease to
815 		 * run if this core stopped, so fail instead.
816 		 */
817 		if (enabled && service_running && only_core)
818 			return -EBUSY;
819 	}
820 
821 	/* Use store-release memory order here to synchronize with
822 	 * load-acquire in runstate read functions.
823 	 */
824 	__atomic_store_n(&lcore_states[lcore].runstate, RUNSTATE_STOPPED,
825 		__ATOMIC_RELEASE);
826 
827 	return 0;
828 }
829 
830 static uint64_t
831 lcore_attr_get_loops(unsigned int lcore)
832 {
833 	struct core_state *cs = &lcore_states[lcore];
834 
835 	return __atomic_load_n(&cs->loops, __ATOMIC_RELAXED);
836 }
837 
838 static uint64_t
839 lcore_attr_get_cycles(unsigned int lcore)
840 {
841 	struct core_state *cs = &lcore_states[lcore];
842 
843 	return __atomic_load_n(&cs->cycles, __ATOMIC_RELAXED);
844 }
845 
846 static uint64_t
847 lcore_attr_get_service_calls(uint32_t service_id, unsigned int lcore)
848 {
849 	struct core_state *cs = &lcore_states[lcore];
850 
851 	return __atomic_load_n(&cs->service_stats[service_id].calls,
852 		__ATOMIC_RELAXED);
853 }
854 
855 static uint64_t
856 lcore_attr_get_service_cycles(uint32_t service_id, unsigned int lcore)
857 {
858 	struct core_state *cs = &lcore_states[lcore];
859 
860 	return __atomic_load_n(&cs->service_stats[service_id].cycles,
861 		__ATOMIC_RELAXED);
862 }
863 
864 typedef uint64_t (*lcore_attr_get_fun)(uint32_t service_id,
865 				       unsigned int lcore);
866 
867 static uint64_t
868 attr_get(uint32_t id, lcore_attr_get_fun lcore_attr_get)
869 {
870 	unsigned int lcore;
871 	uint64_t sum = 0;
872 
873 	for (lcore = 0; lcore < RTE_MAX_LCORE; lcore++) {
874 		if (lcore_states[lcore].is_service_core)
875 			sum += lcore_attr_get(id, lcore);
876 	}
877 
878 	return sum;
879 }
880 
881 static uint64_t
882 attr_get_service_calls(uint32_t service_id)
883 {
884 	return attr_get(service_id, lcore_attr_get_service_calls);
885 }
886 
887 static uint64_t
888 attr_get_service_cycles(uint32_t service_id)
889 {
890 	return attr_get(service_id, lcore_attr_get_service_cycles);
891 }
892 
893 int32_t
894 rte_service_attr_get(uint32_t id, uint32_t attr_id, uint64_t *attr_value)
895 {
896 	if (!service_valid(id))
897 		return -EINVAL;
898 
899 	if (!attr_value)
900 		return -EINVAL;
901 
902 	switch (attr_id) {
903 	case RTE_SERVICE_ATTR_CALL_COUNT:
904 		*attr_value = attr_get_service_calls(id);
905 		return 0;
906 	case RTE_SERVICE_ATTR_CYCLES:
907 		*attr_value = attr_get_service_cycles(id);
908 		return 0;
909 	default:
910 		return -EINVAL;
911 	}
912 }
913 
914 int32_t
915 rte_service_lcore_attr_get(uint32_t lcore, uint32_t attr_id,
916 			   uint64_t *attr_value)
917 {
918 	struct core_state *cs;
919 
920 	if (lcore >= RTE_MAX_LCORE || !attr_value)
921 		return -EINVAL;
922 
923 	cs = &lcore_states[lcore];
924 	if (!cs->is_service_core)
925 		return -ENOTSUP;
926 
927 	switch (attr_id) {
928 	case RTE_SERVICE_LCORE_ATTR_LOOPS:
929 		*attr_value = lcore_attr_get_loops(lcore);
930 		return 0;
931 	case RTE_SERVICE_LCORE_ATTR_CYCLES:
932 		*attr_value = lcore_attr_get_cycles(lcore);
933 		return 0;
934 	default:
935 		return -EINVAL;
936 	}
937 }
938 
939 int32_t
940 rte_service_attr_reset_all(uint32_t id)
941 {
942 	unsigned int lcore;
943 
944 	if (!service_valid(id))
945 		return -EINVAL;
946 
947 	for (lcore = 0; lcore < RTE_MAX_LCORE; lcore++) {
948 		struct core_state *cs = &lcore_states[lcore];
949 
950 		cs->service_stats[id] = (struct service_stats) {};
951 	}
952 
953 	return 0;
954 }
955 
956 int32_t
957 rte_service_lcore_attr_reset_all(uint32_t lcore)
958 {
959 	struct core_state *cs;
960 
961 	if (lcore >= RTE_MAX_LCORE)
962 		return -EINVAL;
963 
964 	cs = &lcore_states[lcore];
965 	if (!cs->is_service_core)
966 		return -ENOTSUP;
967 
968 	cs->loops = 0;
969 
970 	return 0;
971 }
972 
973 static void
974 service_dump_one(FILE *f, uint32_t id)
975 {
976 	struct rte_service_spec_impl *s;
977 	uint64_t service_calls;
978 	uint64_t service_cycles;
979 
980 	service_calls = attr_get_service_calls(id);
981 	service_cycles = attr_get_service_cycles(id);
982 
983 	/* avoid divide by zero */
984 	if (service_calls == 0)
985 		service_calls = 1;
986 
987 	s = service_get(id);
988 
989 	fprintf(f, "  %s: stats %d\tcalls %"PRIu64"\tcycles %"
990 		PRIu64"\tavg: %"PRIu64"\n",
991 		s->spec.name, service_stats_enabled(s), service_calls,
992 		service_cycles, service_cycles / service_calls);
993 }
994 
995 static void
996 service_dump_calls_per_lcore(FILE *f, uint32_t lcore)
997 {
998 	uint32_t i;
999 	struct core_state *cs = &lcore_states[lcore];
1000 
1001 	fprintf(f, "%02d\t", lcore);
1002 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
1003 		if (!service_registered(i))
1004 			continue;
1005 		fprintf(f, "%"PRIu64"\t", cs->service_stats[i].calls);
1006 	}
1007 	fprintf(f, "\n");
1008 }
1009 
1010 int32_t
1011 rte_service_dump(FILE *f, uint32_t id)
1012 {
1013 	uint32_t i;
1014 	int print_one = (id != UINT32_MAX);
1015 
1016 	/* print only the specified service */
1017 	if (print_one) {
1018 		struct rte_service_spec_impl *s;
1019 		SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
1020 		fprintf(f, "Service %s Summary\n", s->spec.name);
1021 		service_dump_one(f, id);
1022 		return 0;
1023 	}
1024 
1025 	/* print all services, as UINT32_MAX was passed as id */
1026 	fprintf(f, "Services Summary\n");
1027 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
1028 		if (!service_registered(i))
1029 			continue;
1030 		service_dump_one(f, i);
1031 	}
1032 
1033 	fprintf(f, "Service Cores Summary\n");
1034 	for (i = 0; i < RTE_MAX_LCORE; i++) {
1035 		if (lcore_config[i].core_role != ROLE_SERVICE)
1036 			continue;
1037 
1038 		service_dump_calls_per_lcore(f, i);
1039 	}
1040 
1041 	return 0;
1042 }
1043