xref: /dpdk/lib/eal/common/rte_service.c (revision f9dfb59edbccae50e7c5508348aa2b4b84413048)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4 
5 #include <stdio.h>
6 #include <inttypes.h>
7 #include <string.h>
8 
9 #include <rte_service.h>
10 #include <rte_service_component.h>
11 
12 #include <rte_lcore.h>
13 #include <rte_branch_prediction.h>
14 #include <rte_common.h>
15 #include <rte_cycles.h>
16 #include <rte_atomic.h>
17 #include <rte_malloc.h>
18 #include <rte_spinlock.h>
19 
20 #include "eal_private.h"
21 
22 #define RTE_SERVICE_NUM_MAX 64
23 
24 #define SERVICE_F_REGISTERED    (1 << 0)
25 #define SERVICE_F_STATS_ENABLED (1 << 1)
26 #define SERVICE_F_START_CHECK   (1 << 2)
27 
28 /* runstates for services and lcores, denoting if they are active or not */
29 #define RUNSTATE_STOPPED 0
30 #define RUNSTATE_RUNNING 1
31 
32 /* internal representation of a service */
33 struct rte_service_spec_impl {
34 	/* public part of the struct */
35 	struct rte_service_spec spec;
36 
37 	/* spin lock that when set indicates a service core is currently
38 	 * running this service callback. When not set, a core may take the
39 	 * lock and then run the service callback.
40 	 */
41 	rte_spinlock_t execute_lock;
42 
43 	/* API set/get-able variables */
44 	int8_t app_runstate;
45 	int8_t comp_runstate;
46 	uint8_t internal_flags;
47 
48 	/* per service statistics */
49 	/* Indicates how many cores the service is mapped to run on.
50 	 * It does not indicate the number of cores the service is running
51 	 * on currently.
52 	 */
53 	uint32_t num_mapped_cores;
54 } __rte_cache_aligned;
55 
56 struct service_stats {
57 	uint64_t calls;
58 	uint64_t cycles;
59 };
60 
61 /* the internal values of a service core */
62 struct core_state {
63 	/* map of services IDs are run on this core */
64 	uint64_t service_mask;
65 	uint8_t runstate; /* running or stopped */
66 	uint8_t thread_active; /* indicates when thread is in service_run() */
67 	uint8_t is_service_core; /* set if core is currently a service core */
68 	uint8_t service_active_on_lcore[RTE_SERVICE_NUM_MAX];
69 	uint64_t loops;
70 	uint64_t cycles;
71 	struct service_stats service_stats[RTE_SERVICE_NUM_MAX];
72 } __rte_cache_aligned;
73 
74 static uint32_t rte_service_count;
75 static struct rte_service_spec_impl *rte_services;
76 static struct core_state *lcore_states;
77 static uint32_t rte_service_library_initialized;
78 
79 int32_t
80 rte_service_init(void)
81 {
82 	/* Hard limit due to the use of an uint64_t-based bitmask (and the
83 	 * clzl intrinsic).
84 	 */
85 	RTE_BUILD_BUG_ON(RTE_SERVICE_NUM_MAX > 64);
86 
87 	if (rte_service_library_initialized) {
88 		RTE_LOG(NOTICE, EAL,
89 			"service library init() called, init flag %d\n",
90 			rte_service_library_initialized);
91 		return -EALREADY;
92 	}
93 
94 	rte_services = rte_calloc("rte_services", RTE_SERVICE_NUM_MAX,
95 			sizeof(struct rte_service_spec_impl),
96 			RTE_CACHE_LINE_SIZE);
97 	if (!rte_services) {
98 		RTE_LOG(ERR, EAL, "error allocating rte services array\n");
99 		goto fail_mem;
100 	}
101 
102 	lcore_states = rte_calloc("rte_service_core_states", RTE_MAX_LCORE,
103 			sizeof(struct core_state), RTE_CACHE_LINE_SIZE);
104 	if (!lcore_states) {
105 		RTE_LOG(ERR, EAL, "error allocating core states array\n");
106 		goto fail_mem;
107 	}
108 
109 	int i;
110 	int count = 0;
111 	struct rte_config *cfg = rte_eal_get_configuration();
112 	for (i = 0; i < RTE_MAX_LCORE; i++) {
113 		if (lcore_config[i].core_role == ROLE_SERVICE) {
114 			if ((unsigned int)i == cfg->main_lcore)
115 				continue;
116 			rte_service_lcore_add(i);
117 			count++;
118 		}
119 	}
120 
121 	rte_service_library_initialized = 1;
122 	return 0;
123 fail_mem:
124 	rte_free(rte_services);
125 	rte_free(lcore_states);
126 	return -ENOMEM;
127 }
128 
129 void
130 rte_service_finalize(void)
131 {
132 	if (!rte_service_library_initialized)
133 		return;
134 
135 	rte_service_lcore_reset_all();
136 	rte_eal_mp_wait_lcore();
137 
138 	rte_free(rte_services);
139 	rte_free(lcore_states);
140 
141 	rte_service_library_initialized = 0;
142 }
143 
144 static inline bool
145 service_registered(uint32_t id)
146 {
147 	return rte_services[id].internal_flags & SERVICE_F_REGISTERED;
148 }
149 
150 static inline bool
151 service_valid(uint32_t id)
152 {
153 	return id < RTE_SERVICE_NUM_MAX && service_registered(id);
154 }
155 
156 static struct rte_service_spec_impl *
157 service_get(uint32_t id)
158 {
159 	return &rte_services[id];
160 }
161 
162 /* validate ID and retrieve service pointer, or return error value */
163 #define SERVICE_VALID_GET_OR_ERR_RET(id, service, retval) do {          \
164 	if (!service_valid(id))                                         \
165 		return retval;                                          \
166 	service = &rte_services[id];                                    \
167 } while (0)
168 
169 /* returns 1 if statistics should be collected for service
170  * Returns 0 if statistics should not be collected for service
171  */
172 static inline int
173 service_stats_enabled(struct rte_service_spec_impl *impl)
174 {
175 	return !!(impl->internal_flags & SERVICE_F_STATS_ENABLED);
176 }
177 
178 static inline int
179 service_mt_safe(struct rte_service_spec_impl *s)
180 {
181 	return !!(s->spec.capabilities & RTE_SERVICE_CAP_MT_SAFE);
182 }
183 
184 int32_t
185 rte_service_set_stats_enable(uint32_t id, int32_t enabled)
186 {
187 	struct rte_service_spec_impl *s;
188 	SERVICE_VALID_GET_OR_ERR_RET(id, s, 0);
189 
190 	if (enabled)
191 		s->internal_flags |= SERVICE_F_STATS_ENABLED;
192 	else
193 		s->internal_flags &= ~(SERVICE_F_STATS_ENABLED);
194 
195 	return 0;
196 }
197 
198 int32_t
199 rte_service_set_runstate_mapped_check(uint32_t id, int32_t enabled)
200 {
201 	struct rte_service_spec_impl *s;
202 	SERVICE_VALID_GET_OR_ERR_RET(id, s, 0);
203 
204 	if (enabled)
205 		s->internal_flags |= SERVICE_F_START_CHECK;
206 	else
207 		s->internal_flags &= ~(SERVICE_F_START_CHECK);
208 
209 	return 0;
210 }
211 
212 uint32_t
213 rte_service_get_count(void)
214 {
215 	return rte_service_count;
216 }
217 
218 int32_t
219 rte_service_get_by_name(const char *name, uint32_t *service_id)
220 {
221 	if (!service_id)
222 		return -EINVAL;
223 
224 	int i;
225 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
226 		if (service_registered(i) &&
227 				strcmp(name, rte_services[i].spec.name) == 0) {
228 			*service_id = i;
229 			return 0;
230 		}
231 	}
232 
233 	return -ENODEV;
234 }
235 
236 const char *
237 rte_service_get_name(uint32_t id)
238 {
239 	struct rte_service_spec_impl *s;
240 	SERVICE_VALID_GET_OR_ERR_RET(id, s, 0);
241 	return s->spec.name;
242 }
243 
244 int32_t
245 rte_service_probe_capability(uint32_t id, uint32_t capability)
246 {
247 	struct rte_service_spec_impl *s;
248 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
249 	return !!(s->spec.capabilities & capability);
250 }
251 
252 int32_t
253 rte_service_component_register(const struct rte_service_spec *spec,
254 			       uint32_t *id_ptr)
255 {
256 	uint32_t i;
257 	int32_t free_slot = -1;
258 
259 	if (spec->callback == NULL || strlen(spec->name) == 0)
260 		return -EINVAL;
261 
262 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
263 		if (!service_registered(i)) {
264 			free_slot = i;
265 			break;
266 		}
267 	}
268 
269 	if ((free_slot < 0) || (i == RTE_SERVICE_NUM_MAX))
270 		return -ENOSPC;
271 
272 	struct rte_service_spec_impl *s = &rte_services[free_slot];
273 	s->spec = *spec;
274 	s->internal_flags |= SERVICE_F_REGISTERED | SERVICE_F_START_CHECK;
275 
276 	rte_service_count++;
277 
278 	if (id_ptr)
279 		*id_ptr = free_slot;
280 
281 	return 0;
282 }
283 
284 int32_t
285 rte_service_component_unregister(uint32_t id)
286 {
287 	uint32_t i;
288 	struct rte_service_spec_impl *s;
289 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
290 
291 	rte_service_count--;
292 
293 	s->internal_flags &= ~(SERVICE_F_REGISTERED);
294 
295 	/* clear the run-bit in all cores */
296 	for (i = 0; i < RTE_MAX_LCORE; i++)
297 		lcore_states[i].service_mask &= ~(UINT64_C(1) << id);
298 
299 	memset(&rte_services[id], 0, sizeof(struct rte_service_spec_impl));
300 
301 	return 0;
302 }
303 
304 int32_t
305 rte_service_component_runstate_set(uint32_t id, uint32_t runstate)
306 {
307 	struct rte_service_spec_impl *s;
308 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
309 
310 	/* comp_runstate act as the guard variable. Use store-release
311 	 * memory order. This synchronizes with load-acquire in
312 	 * service_run and service_runstate_get function.
313 	 */
314 	if (runstate)
315 		__atomic_store_n(&s->comp_runstate, RUNSTATE_RUNNING,
316 			__ATOMIC_RELEASE);
317 	else
318 		__atomic_store_n(&s->comp_runstate, RUNSTATE_STOPPED,
319 			__ATOMIC_RELEASE);
320 
321 	return 0;
322 }
323 
324 int32_t
325 rte_service_runstate_set(uint32_t id, uint32_t runstate)
326 {
327 	struct rte_service_spec_impl *s;
328 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
329 
330 	/* app_runstate act as the guard variable. Use store-release
331 	 * memory order. This synchronizes with load-acquire in
332 	 * service_run runstate_get function.
333 	 */
334 	if (runstate)
335 		__atomic_store_n(&s->app_runstate, RUNSTATE_RUNNING,
336 			__ATOMIC_RELEASE);
337 	else
338 		__atomic_store_n(&s->app_runstate, RUNSTATE_STOPPED,
339 			__ATOMIC_RELEASE);
340 
341 	return 0;
342 }
343 
344 int32_t
345 rte_service_runstate_get(uint32_t id)
346 {
347 	struct rte_service_spec_impl *s;
348 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
349 
350 	/* comp_runstate and app_runstate act as the guard variables.
351 	 * Use load-acquire memory order. This synchronizes with
352 	 * store-release in service state set functions.
353 	 */
354 	if (__atomic_load_n(&s->comp_runstate, __ATOMIC_ACQUIRE) ==
355 			RUNSTATE_RUNNING &&
356 	    __atomic_load_n(&s->app_runstate, __ATOMIC_ACQUIRE) ==
357 			RUNSTATE_RUNNING) {
358 		int check_disabled = !(s->internal_flags &
359 			SERVICE_F_START_CHECK);
360 		int lcore_mapped = (__atomic_load_n(&s->num_mapped_cores,
361 			__ATOMIC_RELAXED) > 0);
362 
363 		return (check_disabled | lcore_mapped);
364 	} else
365 		return 0;
366 
367 }
368 
369 static inline void
370 service_runner_do_callback(struct rte_service_spec_impl *s,
371 			   struct core_state *cs, uint32_t service_idx)
372 {
373 	void *userdata = s->spec.callback_userdata;
374 
375 	if (service_stats_enabled(s)) {
376 		uint64_t start = rte_rdtsc();
377 		int rc = s->spec.callback(userdata);
378 
379 		/* The lcore service worker thread is the only writer,
380 		 * and thus only a non-atomic load and an atomic store
381 		 * is needed, and not the more expensive atomic
382 		 * add.
383 		 */
384 		struct service_stats *service_stats =
385 			&cs->service_stats[service_idx];
386 
387 		if (likely(rc != -EAGAIN)) {
388 			uint64_t end = rte_rdtsc();
389 			uint64_t cycles = end - start;
390 
391 			__atomic_store_n(&cs->cycles, cs->cycles + cycles,
392 				__ATOMIC_RELAXED);
393 			__atomic_store_n(&service_stats->cycles,
394 				service_stats->cycles + cycles,
395 				__ATOMIC_RELAXED);
396 		}
397 
398 		__atomic_store_n(&service_stats->calls,
399 			service_stats->calls + 1, __ATOMIC_RELAXED);
400 	} else
401 		s->spec.callback(userdata);
402 }
403 
404 
405 /* Expects the service 's' is valid. */
406 static int32_t
407 service_run(uint32_t i, struct core_state *cs, uint64_t service_mask,
408 	    struct rte_service_spec_impl *s, uint32_t serialize_mt_unsafe)
409 {
410 	if (!s)
411 		return -EINVAL;
412 
413 	/* comp_runstate and app_runstate act as the guard variables.
414 	 * Use load-acquire memory order. This synchronizes with
415 	 * store-release in service state set functions.
416 	 */
417 	if (__atomic_load_n(&s->comp_runstate, __ATOMIC_ACQUIRE) !=
418 			RUNSTATE_RUNNING ||
419 	    __atomic_load_n(&s->app_runstate, __ATOMIC_ACQUIRE) !=
420 			RUNSTATE_RUNNING ||
421 	    !(service_mask & (UINT64_C(1) << i))) {
422 		cs->service_active_on_lcore[i] = 0;
423 		return -ENOEXEC;
424 	}
425 
426 	cs->service_active_on_lcore[i] = 1;
427 
428 	if ((service_mt_safe(s) == 0) && (serialize_mt_unsafe == 1)) {
429 		if (!rte_spinlock_trylock(&s->execute_lock))
430 			return -EBUSY;
431 
432 		service_runner_do_callback(s, cs, i);
433 		rte_spinlock_unlock(&s->execute_lock);
434 	} else
435 		service_runner_do_callback(s, cs, i);
436 
437 	return 0;
438 }
439 
440 int32_t
441 rte_service_may_be_active(uint32_t id)
442 {
443 	uint32_t ids[RTE_MAX_LCORE] = {0};
444 	int32_t lcore_count = rte_service_lcore_list(ids, RTE_MAX_LCORE);
445 	int i;
446 
447 	if (!service_valid(id))
448 		return -EINVAL;
449 
450 	for (i = 0; i < lcore_count; i++) {
451 		if (lcore_states[ids[i]].service_active_on_lcore[id])
452 			return 1;
453 	}
454 
455 	return 0;
456 }
457 
458 int32_t
459 rte_service_run_iter_on_app_lcore(uint32_t id, uint32_t serialize_mt_unsafe)
460 {
461 	struct core_state *cs = &lcore_states[rte_lcore_id()];
462 	struct rte_service_spec_impl *s;
463 
464 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
465 
466 	/* Increment num_mapped_cores to reflect that this core is
467 	 * now mapped capable of running the service.
468 	 */
469 	__atomic_add_fetch(&s->num_mapped_cores, 1, __ATOMIC_RELAXED);
470 
471 	int ret = service_run(id, cs, UINT64_MAX, s, serialize_mt_unsafe);
472 
473 	__atomic_sub_fetch(&s->num_mapped_cores, 1, __ATOMIC_RELAXED);
474 
475 	return ret;
476 }
477 
478 static int32_t
479 service_runner_func(void *arg)
480 {
481 	RTE_SET_USED(arg);
482 	uint8_t i;
483 	const int lcore = rte_lcore_id();
484 	struct core_state *cs = &lcore_states[lcore];
485 
486 	__atomic_store_n(&cs->thread_active, 1, __ATOMIC_SEQ_CST);
487 
488 	/* runstate act as the guard variable. Use load-acquire
489 	 * memory order here to synchronize with store-release
490 	 * in runstate update functions.
491 	 */
492 	while (__atomic_load_n(&cs->runstate, __ATOMIC_ACQUIRE) ==
493 			RUNSTATE_RUNNING) {
494 
495 		const uint64_t service_mask = cs->service_mask;
496 		uint8_t start_id;
497 		uint8_t end_id;
498 
499 		if (service_mask == 0)
500 			continue;
501 
502 		start_id = __builtin_ctzl(service_mask);
503 		end_id = 64 - __builtin_clzl(service_mask);
504 
505 		for (i = start_id; i < end_id; i++) {
506 			/* return value ignored as no change to code flow */
507 			service_run(i, cs, service_mask, service_get(i), 1);
508 		}
509 
510 		__atomic_store_n(&cs->loops, cs->loops + 1, __ATOMIC_RELAXED);
511 	}
512 
513 	/* Switch off this core for all services, to ensure that future
514 	 * calls to may_be_active() know this core is switched off.
515 	 */
516 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++)
517 		cs->service_active_on_lcore[i] = 0;
518 
519 	/* Use SEQ CST memory ordering to avoid any re-ordering around
520 	 * this store, ensuring that once this store is visible, the service
521 	 * lcore thread really is done in service cores code.
522 	 */
523 	__atomic_store_n(&cs->thread_active, 0, __ATOMIC_SEQ_CST);
524 	return 0;
525 }
526 
527 int32_t
528 rte_service_lcore_may_be_active(uint32_t lcore)
529 {
530 	if (lcore >= RTE_MAX_LCORE || !lcore_states[lcore].is_service_core)
531 		return -EINVAL;
532 
533 	/* Load thread_active using ACQUIRE to avoid instructions dependent on
534 	 * the result being re-ordered before this load completes.
535 	 */
536 	return __atomic_load_n(&lcore_states[lcore].thread_active,
537 			       __ATOMIC_ACQUIRE);
538 }
539 
540 int32_t
541 rte_service_lcore_count(void)
542 {
543 	int32_t count = 0;
544 	uint32_t i;
545 	for (i = 0; i < RTE_MAX_LCORE; i++)
546 		count += lcore_states[i].is_service_core;
547 	return count;
548 }
549 
550 int32_t
551 rte_service_lcore_list(uint32_t array[], uint32_t n)
552 {
553 	uint32_t count = rte_service_lcore_count();
554 	if (count > n)
555 		return -ENOMEM;
556 
557 	if (!array)
558 		return -EINVAL;
559 
560 	uint32_t i;
561 	uint32_t idx = 0;
562 	for (i = 0; i < RTE_MAX_LCORE; i++) {
563 		struct core_state *cs = &lcore_states[i];
564 		if (cs->is_service_core) {
565 			array[idx] = i;
566 			idx++;
567 		}
568 	}
569 
570 	return count;
571 }
572 
573 int32_t
574 rte_service_lcore_count_services(uint32_t lcore)
575 {
576 	if (lcore >= RTE_MAX_LCORE)
577 		return -EINVAL;
578 
579 	struct core_state *cs = &lcore_states[lcore];
580 	if (!cs->is_service_core)
581 		return -ENOTSUP;
582 
583 	return __builtin_popcountll(cs->service_mask);
584 }
585 
586 int32_t
587 rte_service_start_with_defaults(void)
588 {
589 	/* create a default mapping from cores to services, then start the
590 	 * services to make them transparent to unaware applications.
591 	 */
592 	uint32_t i;
593 	int ret;
594 	uint32_t count = rte_service_get_count();
595 
596 	int32_t lcore_iter = 0;
597 	uint32_t ids[RTE_MAX_LCORE] = {0};
598 	int32_t lcore_count = rte_service_lcore_list(ids, RTE_MAX_LCORE);
599 
600 	if (lcore_count == 0)
601 		return -ENOTSUP;
602 
603 	for (i = 0; (int)i < lcore_count; i++)
604 		rte_service_lcore_start(ids[i]);
605 
606 	for (i = 0; i < count; i++) {
607 		/* do 1:1 core mapping here, with each service getting
608 		 * assigned a single core by default. Adding multiple services
609 		 * should multiplex to a single core, or 1:1 if there are the
610 		 * same amount of services as service-cores
611 		 */
612 		ret = rte_service_map_lcore_set(i, ids[lcore_iter], 1);
613 		if (ret)
614 			return -ENODEV;
615 
616 		lcore_iter++;
617 		if (lcore_iter >= lcore_count)
618 			lcore_iter = 0;
619 
620 		ret = rte_service_runstate_set(i, 1);
621 		if (ret)
622 			return -ENOEXEC;
623 	}
624 
625 	return 0;
626 }
627 
628 static int32_t
629 service_update(uint32_t sid, uint32_t lcore, uint32_t *set, uint32_t *enabled)
630 {
631 	/* validate ID, or return error value */
632 	if (!service_valid(sid) || lcore >= RTE_MAX_LCORE ||
633 			!lcore_states[lcore].is_service_core)
634 		return -EINVAL;
635 
636 	uint64_t sid_mask = UINT64_C(1) << sid;
637 	if (set) {
638 		uint64_t lcore_mapped = lcore_states[lcore].service_mask &
639 			sid_mask;
640 
641 		if (*set && !lcore_mapped) {
642 			lcore_states[lcore].service_mask |= sid_mask;
643 			__atomic_add_fetch(&rte_services[sid].num_mapped_cores,
644 				1, __ATOMIC_RELAXED);
645 		}
646 		if (!*set && lcore_mapped) {
647 			lcore_states[lcore].service_mask &= ~(sid_mask);
648 			__atomic_sub_fetch(&rte_services[sid].num_mapped_cores,
649 				1, __ATOMIC_RELAXED);
650 		}
651 	}
652 
653 	if (enabled)
654 		*enabled = !!(lcore_states[lcore].service_mask & (sid_mask));
655 
656 	return 0;
657 }
658 
659 int32_t
660 rte_service_map_lcore_set(uint32_t id, uint32_t lcore, uint32_t enabled)
661 {
662 	uint32_t on = enabled > 0;
663 	return service_update(id, lcore, &on, 0);
664 }
665 
666 int32_t
667 rte_service_map_lcore_get(uint32_t id, uint32_t lcore)
668 {
669 	uint32_t enabled;
670 	int ret = service_update(id, lcore, 0, &enabled);
671 	if (ret == 0)
672 		return enabled;
673 	return ret;
674 }
675 
676 static void
677 set_lcore_state(uint32_t lcore, int32_t state)
678 {
679 	/* mark core state in hugepage backed config */
680 	struct rte_config *cfg = rte_eal_get_configuration();
681 	cfg->lcore_role[lcore] = state;
682 
683 	/* mark state in process local lcore_config */
684 	lcore_config[lcore].core_role = state;
685 
686 	/* update per-lcore optimized state tracking */
687 	lcore_states[lcore].is_service_core = (state == ROLE_SERVICE);
688 }
689 
690 int32_t
691 rte_service_lcore_reset_all(void)
692 {
693 	/* loop over cores, reset all to mask 0 */
694 	uint32_t i;
695 	for (i = 0; i < RTE_MAX_LCORE; i++) {
696 		if (lcore_states[i].is_service_core) {
697 			lcore_states[i].service_mask = 0;
698 			set_lcore_state(i, ROLE_RTE);
699 			/* runstate act as guard variable Use
700 			 * store-release memory order here to synchronize
701 			 * with load-acquire in runstate read functions.
702 			 */
703 			__atomic_store_n(&lcore_states[i].runstate,
704 				RUNSTATE_STOPPED, __ATOMIC_RELEASE);
705 		}
706 	}
707 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++)
708 		__atomic_store_n(&rte_services[i].num_mapped_cores, 0,
709 			__ATOMIC_RELAXED);
710 
711 	return 0;
712 }
713 
714 int32_t
715 rte_service_lcore_add(uint32_t lcore)
716 {
717 	if (lcore >= RTE_MAX_LCORE)
718 		return -EINVAL;
719 	if (lcore_states[lcore].is_service_core)
720 		return -EALREADY;
721 
722 	set_lcore_state(lcore, ROLE_SERVICE);
723 
724 	/* ensure that after adding a core the mask and state are defaults */
725 	lcore_states[lcore].service_mask = 0;
726 	/* Use store-release memory order here to synchronize with
727 	 * load-acquire in runstate read functions.
728 	 */
729 	__atomic_store_n(&lcore_states[lcore].runstate, RUNSTATE_STOPPED,
730 		__ATOMIC_RELEASE);
731 
732 	return rte_eal_wait_lcore(lcore);
733 }
734 
735 int32_t
736 rte_service_lcore_del(uint32_t lcore)
737 {
738 	if (lcore >= RTE_MAX_LCORE)
739 		return -EINVAL;
740 
741 	struct core_state *cs = &lcore_states[lcore];
742 	if (!cs->is_service_core)
743 		return -EINVAL;
744 
745 	/* runstate act as the guard variable. Use load-acquire
746 	 * memory order here to synchronize with store-release
747 	 * in runstate update functions.
748 	 */
749 	if (__atomic_load_n(&cs->runstate, __ATOMIC_ACQUIRE) !=
750 			RUNSTATE_STOPPED)
751 		return -EBUSY;
752 
753 	set_lcore_state(lcore, ROLE_RTE);
754 
755 	rte_smp_wmb();
756 	return 0;
757 }
758 
759 int32_t
760 rte_service_lcore_start(uint32_t lcore)
761 {
762 	if (lcore >= RTE_MAX_LCORE)
763 		return -EINVAL;
764 
765 	struct core_state *cs = &lcore_states[lcore];
766 	if (!cs->is_service_core)
767 		return -EINVAL;
768 
769 	/* runstate act as the guard variable. Use load-acquire
770 	 * memory order here to synchronize with store-release
771 	 * in runstate update functions.
772 	 */
773 	if (__atomic_load_n(&cs->runstate, __ATOMIC_ACQUIRE) ==
774 			RUNSTATE_RUNNING)
775 		return -EALREADY;
776 
777 	/* set core to run state first, and then launch otherwise it will
778 	 * return immediately as runstate keeps it in the service poll loop
779 	 */
780 	/* Use load-acquire memory order here to synchronize with
781 	 * store-release in runstate update functions.
782 	 */
783 	__atomic_store_n(&cs->runstate, RUNSTATE_RUNNING, __ATOMIC_RELEASE);
784 
785 	int ret = rte_eal_remote_launch(service_runner_func, 0, lcore);
786 	/* returns -EBUSY if the core is already launched, 0 on success */
787 	return ret;
788 }
789 
790 int32_t
791 rte_service_lcore_stop(uint32_t lcore)
792 {
793 	if (lcore >= RTE_MAX_LCORE)
794 		return -EINVAL;
795 
796 	/* runstate act as the guard variable. Use load-acquire
797 	 * memory order here to synchronize with store-release
798 	 * in runstate update functions.
799 	 */
800 	if (__atomic_load_n(&lcore_states[lcore].runstate, __ATOMIC_ACQUIRE) ==
801 			RUNSTATE_STOPPED)
802 		return -EALREADY;
803 
804 	uint32_t i;
805 	struct core_state *cs = &lcore_states[lcore];
806 	uint64_t service_mask = cs->service_mask;
807 
808 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
809 		int32_t enabled = service_mask & (UINT64_C(1) << i);
810 		int32_t service_running = rte_service_runstate_get(i);
811 		int32_t only_core = (1 ==
812 			__atomic_load_n(&rte_services[i].num_mapped_cores,
813 				__ATOMIC_RELAXED));
814 
815 		/* if the core is mapped, and the service is running, and this
816 		 * is the only core that is mapped, the service would cease to
817 		 * run if this core stopped, so fail instead.
818 		 */
819 		if (enabled && service_running && only_core)
820 			return -EBUSY;
821 	}
822 
823 	/* Use store-release memory order here to synchronize with
824 	 * load-acquire in runstate read functions.
825 	 */
826 	__atomic_store_n(&lcore_states[lcore].runstate, RUNSTATE_STOPPED,
827 		__ATOMIC_RELEASE);
828 
829 	return 0;
830 }
831 
832 static uint64_t
833 lcore_attr_get_loops(unsigned int lcore)
834 {
835 	struct core_state *cs = &lcore_states[lcore];
836 
837 	return __atomic_load_n(&cs->loops, __ATOMIC_RELAXED);
838 }
839 
840 static uint64_t
841 lcore_attr_get_cycles(unsigned int lcore)
842 {
843 	struct core_state *cs = &lcore_states[lcore];
844 
845 	return __atomic_load_n(&cs->cycles, __ATOMIC_RELAXED);
846 }
847 
848 static uint64_t
849 lcore_attr_get_service_calls(uint32_t service_id, unsigned int lcore)
850 {
851 	struct core_state *cs = &lcore_states[lcore];
852 
853 	return __atomic_load_n(&cs->service_stats[service_id].calls,
854 		__ATOMIC_RELAXED);
855 }
856 
857 static uint64_t
858 lcore_attr_get_service_cycles(uint32_t service_id, unsigned int lcore)
859 {
860 	struct core_state *cs = &lcore_states[lcore];
861 
862 	return __atomic_load_n(&cs->service_stats[service_id].cycles,
863 		__ATOMIC_RELAXED);
864 }
865 
866 typedef uint64_t (*lcore_attr_get_fun)(uint32_t service_id,
867 				       unsigned int lcore);
868 
869 static uint64_t
870 attr_get(uint32_t id, lcore_attr_get_fun lcore_attr_get)
871 {
872 	unsigned int lcore;
873 	uint64_t sum = 0;
874 
875 	for (lcore = 0; lcore < RTE_MAX_LCORE; lcore++) {
876 		if (lcore_states[lcore].is_service_core)
877 			sum += lcore_attr_get(id, lcore);
878 	}
879 
880 	return sum;
881 }
882 
883 static uint64_t
884 attr_get_service_calls(uint32_t service_id)
885 {
886 	return attr_get(service_id, lcore_attr_get_service_calls);
887 }
888 
889 static uint64_t
890 attr_get_service_cycles(uint32_t service_id)
891 {
892 	return attr_get(service_id, lcore_attr_get_service_cycles);
893 }
894 
895 int32_t
896 rte_service_attr_get(uint32_t id, uint32_t attr_id, uint64_t *attr_value)
897 {
898 	if (!service_valid(id))
899 		return -EINVAL;
900 
901 	if (!attr_value)
902 		return -EINVAL;
903 
904 	switch (attr_id) {
905 	case RTE_SERVICE_ATTR_CALL_COUNT:
906 		*attr_value = attr_get_service_calls(id);
907 		return 0;
908 	case RTE_SERVICE_ATTR_CYCLES:
909 		*attr_value = attr_get_service_cycles(id);
910 		return 0;
911 	default:
912 		return -EINVAL;
913 	}
914 }
915 
916 int32_t
917 rte_service_lcore_attr_get(uint32_t lcore, uint32_t attr_id,
918 			   uint64_t *attr_value)
919 {
920 	struct core_state *cs;
921 
922 	if (lcore >= RTE_MAX_LCORE || !attr_value)
923 		return -EINVAL;
924 
925 	cs = &lcore_states[lcore];
926 	if (!cs->is_service_core)
927 		return -ENOTSUP;
928 
929 	switch (attr_id) {
930 	case RTE_SERVICE_LCORE_ATTR_LOOPS:
931 		*attr_value = lcore_attr_get_loops(lcore);
932 		return 0;
933 	case RTE_SERVICE_LCORE_ATTR_CYCLES:
934 		*attr_value = lcore_attr_get_cycles(lcore);
935 		return 0;
936 	default:
937 		return -EINVAL;
938 	}
939 }
940 
941 int32_t
942 rte_service_attr_reset_all(uint32_t id)
943 {
944 	unsigned int lcore;
945 
946 	if (!service_valid(id))
947 		return -EINVAL;
948 
949 	for (lcore = 0; lcore < RTE_MAX_LCORE; lcore++) {
950 		struct core_state *cs = &lcore_states[lcore];
951 
952 		cs->service_stats[id] = (struct service_stats) {};
953 	}
954 
955 	return 0;
956 }
957 
958 int32_t
959 rte_service_lcore_attr_reset_all(uint32_t lcore)
960 {
961 	struct core_state *cs;
962 
963 	if (lcore >= RTE_MAX_LCORE)
964 		return -EINVAL;
965 
966 	cs = &lcore_states[lcore];
967 	if (!cs->is_service_core)
968 		return -ENOTSUP;
969 
970 	cs->loops = 0;
971 
972 	return 0;
973 }
974 
975 static void
976 service_dump_one(FILE *f, uint32_t id)
977 {
978 	struct rte_service_spec_impl *s;
979 	uint64_t service_calls;
980 	uint64_t service_cycles;
981 
982 	service_calls = attr_get_service_calls(id);
983 	service_cycles = attr_get_service_cycles(id);
984 
985 	/* avoid divide by zero */
986 	if (service_calls == 0)
987 		service_calls = 1;
988 
989 	s = service_get(id);
990 
991 	fprintf(f, "  %s: stats %d\tcalls %"PRIu64"\tcycles %"
992 		PRIu64"\tavg: %"PRIu64"\n",
993 		s->spec.name, service_stats_enabled(s), service_calls,
994 		service_cycles, service_cycles / service_calls);
995 }
996 
997 static void
998 service_dump_calls_per_lcore(FILE *f, uint32_t lcore)
999 {
1000 	uint32_t i;
1001 	struct core_state *cs = &lcore_states[lcore];
1002 
1003 	fprintf(f, "%02d\t", lcore);
1004 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
1005 		if (!service_registered(i))
1006 			continue;
1007 		fprintf(f, "%"PRIu64"\t", cs->service_stats[i].calls);
1008 	}
1009 	fprintf(f, "\n");
1010 }
1011 
1012 int32_t
1013 rte_service_dump(FILE *f, uint32_t id)
1014 {
1015 	uint32_t i;
1016 	int print_one = (id != UINT32_MAX);
1017 
1018 	/* print only the specified service */
1019 	if (print_one) {
1020 		struct rte_service_spec_impl *s;
1021 		SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
1022 		fprintf(f, "Service %s Summary\n", s->spec.name);
1023 		service_dump_one(f, id);
1024 		return 0;
1025 	}
1026 
1027 	/* print all services, as UINT32_MAX was passed as id */
1028 	fprintf(f, "Services Summary\n");
1029 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
1030 		if (!service_registered(i))
1031 			continue;
1032 		service_dump_one(f, i);
1033 	}
1034 
1035 	fprintf(f, "Service Cores Summary\n");
1036 	for (i = 0; i < RTE_MAX_LCORE; i++) {
1037 		if (lcore_config[i].core_role != ROLE_SERVICE)
1038 			continue;
1039 
1040 		service_dump_calls_per_lcore(f, i);
1041 	}
1042 
1043 	return 0;
1044 }
1045