xref: /dpdk/lib/eal/common/rte_service.c (revision 7917b0d38e92e8b9ec5a870415b791420e10f11a)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4 
5 #include <stdio.h>
6 #include <inttypes.h>
7 #include <string.h>
8 
9 #include <rte_service.h>
10 #include <rte_service_component.h>
11 
12 #include <eal_trace_internal.h>
13 #include <rte_lcore.h>
14 #include <rte_bitset.h>
15 #include <rte_branch_prediction.h>
16 #include <rte_common.h>
17 #include <rte_cycles.h>
18 #include <rte_atomic.h>
19 #include <rte_malloc.h>
20 #include <rte_spinlock.h>
21 #include <rte_trace_point.h>
22 
23 #include "eal_private.h"
24 
25 #define RTE_SERVICE_NUM_MAX 64
26 
27 #define SERVICE_F_REGISTERED    (1 << 0)
28 #define SERVICE_F_STATS_ENABLED (1 << 1)
29 #define SERVICE_F_START_CHECK   (1 << 2)
30 
31 /* runstates for services and lcores, denoting if they are active or not */
32 #define RUNSTATE_STOPPED 0
33 #define RUNSTATE_RUNNING 1
34 
35 /* internal representation of a service */
36 struct __rte_cache_aligned rte_service_spec_impl {
37 	/* public part of the struct */
38 	struct rte_service_spec spec;
39 
40 	/* spin lock that when set indicates a service core is currently
41 	 * running this service callback. When not set, a core may take the
42 	 * lock and then run the service callback.
43 	 */
44 	rte_spinlock_t execute_lock;
45 
46 	/* API set/get-able variables */
47 	RTE_ATOMIC(int8_t) app_runstate;
48 	RTE_ATOMIC(int8_t) comp_runstate;
49 	uint8_t internal_flags;
50 
51 	/* per service statistics */
52 	/* Indicates how many cores the service is mapped to run on.
53 	 * It does not indicate the number of cores the service is running
54 	 * on currently.
55 	 */
56 	RTE_ATOMIC(uint32_t) num_mapped_cores;
57 };
58 
59 struct service_stats {
60 	RTE_ATOMIC(uint64_t) calls;
61 	RTE_ATOMIC(uint64_t) idle_calls;
62 	RTE_ATOMIC(uint64_t) error_calls;
63 	RTE_ATOMIC(uint64_t) cycles;
64 };
65 
66 /* the internal values of a service core */
67 struct __rte_cache_aligned core_state {
68 	/* map of services IDs are run on this core */
69 	RTE_BITSET_DECLARE(mapped_services, RTE_SERVICE_NUM_MAX);
70 	RTE_ATOMIC(uint8_t) runstate; /* running or stopped */
71 	RTE_ATOMIC(uint8_t) thread_active; /* indicates when thread is in service_run() */
72 	uint8_t is_service_core; /* set if core is currently a service core */
73 	RTE_BITSET_DECLARE(service_active_on_lcore, RTE_SERVICE_NUM_MAX);
74 	RTE_ATOMIC(uint64_t) loops;
75 	RTE_ATOMIC(uint64_t) cycles;
76 	struct service_stats service_stats[RTE_SERVICE_NUM_MAX];
77 };
78 
79 static uint32_t rte_service_count;
80 static struct rte_service_spec_impl *rte_services;
81 static struct core_state *lcore_states;
82 static uint32_t rte_service_library_initialized;
83 
84 int32_t
85 rte_service_init(void)
86 {
87 	if (rte_service_library_initialized) {
88 		EAL_LOG(NOTICE,
89 			"service library init() called, init flag %d",
90 			rte_service_library_initialized);
91 		return -EALREADY;
92 	}
93 
94 	rte_services = rte_calloc("rte_services", RTE_SERVICE_NUM_MAX,
95 			sizeof(struct rte_service_spec_impl),
96 			RTE_CACHE_LINE_SIZE);
97 	if (!rte_services) {
98 		EAL_LOG(ERR, "error allocating rte services array");
99 		goto fail_mem;
100 	}
101 
102 	lcore_states = rte_calloc("rte_service_core_states", RTE_MAX_LCORE,
103 			sizeof(struct core_state), RTE_CACHE_LINE_SIZE);
104 	if (!lcore_states) {
105 		EAL_LOG(ERR, "error allocating core states array");
106 		goto fail_mem;
107 	}
108 
109 	int i;
110 	struct rte_config *cfg = rte_eal_get_configuration();
111 	for (i = 0; i < RTE_MAX_LCORE; i++) {
112 		if (lcore_config[i].core_role == ROLE_SERVICE) {
113 			if ((unsigned int)i == cfg->main_lcore)
114 				continue;
115 			rte_service_lcore_add(i);
116 		}
117 	}
118 
119 	rte_service_library_initialized = 1;
120 	return 0;
121 fail_mem:
122 	rte_free(rte_services);
123 	rte_free(lcore_states);
124 	return -ENOMEM;
125 }
126 
127 void
128 rte_service_finalize(void)
129 {
130 	if (!rte_service_library_initialized)
131 		return;
132 
133 	rte_service_lcore_reset_all();
134 	rte_eal_mp_wait_lcore();
135 
136 	rte_free(rte_services);
137 	rte_free(lcore_states);
138 
139 	rte_service_library_initialized = 0;
140 }
141 
142 static inline bool
143 service_registered(uint32_t id)
144 {
145 	return rte_services[id].internal_flags & SERVICE_F_REGISTERED;
146 }
147 
148 static inline bool
149 service_valid(uint32_t id)
150 {
151 	return id < RTE_SERVICE_NUM_MAX && service_registered(id);
152 }
153 
154 static struct rte_service_spec_impl *
155 service_get(uint32_t id)
156 {
157 	return &rte_services[id];
158 }
159 
160 /* validate ID and retrieve service pointer, or return error value */
161 #define SERVICE_VALID_GET_OR_ERR_RET(id, service, retval) do {          \
162 	if (!service_valid(id))                                         \
163 		return retval;                                          \
164 	service = &rte_services[id];                                    \
165 } while (0)
166 
167 /* returns 1 if statistics should be collected for service
168  * Returns 0 if statistics should not be collected for service
169  */
170 static inline int
171 service_stats_enabled(struct rte_service_spec_impl *impl)
172 {
173 	return !!(impl->internal_flags & SERVICE_F_STATS_ENABLED);
174 }
175 
176 static inline int
177 service_mt_safe(struct rte_service_spec_impl *s)
178 {
179 	return !!(s->spec.capabilities & RTE_SERVICE_CAP_MT_SAFE);
180 }
181 
182 int32_t
183 rte_service_set_stats_enable(uint32_t id, int32_t enabled)
184 {
185 	struct rte_service_spec_impl *s;
186 	SERVICE_VALID_GET_OR_ERR_RET(id, s, 0);
187 
188 	if (enabled)
189 		s->internal_flags |= SERVICE_F_STATS_ENABLED;
190 	else
191 		s->internal_flags &= ~(SERVICE_F_STATS_ENABLED);
192 
193 	return 0;
194 }
195 
196 int32_t
197 rte_service_set_runstate_mapped_check(uint32_t id, int32_t enabled)
198 {
199 	struct rte_service_spec_impl *s;
200 	SERVICE_VALID_GET_OR_ERR_RET(id, s, 0);
201 
202 	if (enabled)
203 		s->internal_flags |= SERVICE_F_START_CHECK;
204 	else
205 		s->internal_flags &= ~(SERVICE_F_START_CHECK);
206 
207 	return 0;
208 }
209 
210 uint32_t
211 rte_service_get_count(void)
212 {
213 	return rte_service_count;
214 }
215 
216 int32_t
217 rte_service_get_by_name(const char *name, uint32_t *service_id)
218 {
219 	if (!service_id)
220 		return -EINVAL;
221 
222 	int i;
223 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
224 		if (service_registered(i) &&
225 				strcmp(name, rte_services[i].spec.name) == 0) {
226 			*service_id = i;
227 			return 0;
228 		}
229 	}
230 
231 	return -ENODEV;
232 }
233 
234 const char *
235 rte_service_get_name(uint32_t id)
236 {
237 	struct rte_service_spec_impl *s;
238 	SERVICE_VALID_GET_OR_ERR_RET(id, s, 0);
239 	return s->spec.name;
240 }
241 
242 int32_t
243 rte_service_probe_capability(uint32_t id, uint32_t capability)
244 {
245 	struct rte_service_spec_impl *s;
246 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
247 	return !!(s->spec.capabilities & capability);
248 }
249 
250 int32_t
251 rte_service_component_register(const struct rte_service_spec *spec,
252 			       uint32_t *id_ptr)
253 {
254 	uint32_t i;
255 	int32_t free_slot = -1;
256 
257 	if (spec->callback == NULL || strlen(spec->name) == 0)
258 		return -EINVAL;
259 
260 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
261 		if (!service_registered(i)) {
262 			free_slot = i;
263 			break;
264 		}
265 	}
266 
267 	if ((free_slot < 0) || (i == RTE_SERVICE_NUM_MAX))
268 		return -ENOSPC;
269 
270 	struct rte_service_spec_impl *s = &rte_services[free_slot];
271 	s->spec = *spec;
272 	s->internal_flags |= SERVICE_F_REGISTERED | SERVICE_F_START_CHECK;
273 
274 	rte_service_count++;
275 
276 	if (id_ptr)
277 		*id_ptr = free_slot;
278 
279 	rte_eal_trace_service_component_register(free_slot, spec->name);
280 
281 	return 0;
282 }
283 
284 int32_t
285 rte_service_component_unregister(uint32_t id)
286 {
287 	uint32_t i;
288 	struct rte_service_spec_impl *s;
289 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
290 
291 	rte_service_count--;
292 
293 	s->internal_flags &= ~(SERVICE_F_REGISTERED);
294 
295 	/* clear the run-bit in all cores */
296 	for (i = 0; i < RTE_MAX_LCORE; i++)
297 		rte_bitset_clear(lcore_states[i].mapped_services, id);
298 
299 	memset(&rte_services[id], 0, sizeof(struct rte_service_spec_impl));
300 
301 	return 0;
302 }
303 
304 int32_t
305 rte_service_component_runstate_set(uint32_t id, uint32_t runstate)
306 {
307 	struct rte_service_spec_impl *s;
308 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
309 
310 	/* comp_runstate act as the guard variable. Use store-release
311 	 * memory order. This synchronizes with load-acquire in
312 	 * service_run and service_runstate_get function.
313 	 */
314 	if (runstate)
315 		rte_atomic_store_explicit(&s->comp_runstate, RUNSTATE_RUNNING,
316 			rte_memory_order_release);
317 	else
318 		rte_atomic_store_explicit(&s->comp_runstate, RUNSTATE_STOPPED,
319 			rte_memory_order_release);
320 
321 	return 0;
322 }
323 
324 int32_t
325 rte_service_runstate_set(uint32_t id, uint32_t runstate)
326 {
327 	struct rte_service_spec_impl *s;
328 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
329 
330 	/* app_runstate act as the guard variable. Use store-release
331 	 * memory order. This synchronizes with load-acquire in
332 	 * service_run runstate_get function.
333 	 */
334 	if (runstate)
335 		rte_atomic_store_explicit(&s->app_runstate, RUNSTATE_RUNNING,
336 			rte_memory_order_release);
337 	else
338 		rte_atomic_store_explicit(&s->app_runstate, RUNSTATE_STOPPED,
339 			rte_memory_order_release);
340 
341 	rte_eal_trace_service_runstate_set(id, runstate);
342 	return 0;
343 }
344 
345 int32_t
346 rte_service_runstate_get(uint32_t id)
347 {
348 	struct rte_service_spec_impl *s;
349 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
350 
351 	/* comp_runstate and app_runstate act as the guard variables.
352 	 * Use load-acquire memory order. This synchronizes with
353 	 * store-release in service state set functions.
354 	 */
355 	if (rte_atomic_load_explicit(&s->comp_runstate, rte_memory_order_acquire) ==
356 			RUNSTATE_RUNNING &&
357 	    rte_atomic_load_explicit(&s->app_runstate, rte_memory_order_acquire) ==
358 			RUNSTATE_RUNNING) {
359 		int check_disabled = !(s->internal_flags &
360 			SERVICE_F_START_CHECK);
361 		int lcore_mapped = (rte_atomic_load_explicit(&s->num_mapped_cores,
362 			rte_memory_order_relaxed) > 0);
363 
364 		return (check_disabled | lcore_mapped);
365 	} else
366 		return 0;
367 
368 }
369 
370 static void
371 service_counter_add(RTE_ATOMIC(uint64_t) *counter, uint64_t operand)
372 {
373 	/* The lcore service worker thread is the only writer, and
374 	 * thus only a non-atomic load and an atomic store is needed,
375 	 * and not the more expensive atomic add.
376 	 */
377 	uint64_t value;
378 
379 	value = rte_atomic_load_explicit(counter, rte_memory_order_relaxed);
380 
381 	rte_atomic_store_explicit(counter, value + operand,
382 				  rte_memory_order_relaxed);
383 }
384 
385 static inline void
386 service_runner_do_callback(struct rte_service_spec_impl *s,
387 			   struct core_state *cs, uint32_t service_idx)
388 {
389 	rte_eal_trace_service_run_begin(service_idx, rte_lcore_id());
390 	void *userdata = s->spec.callback_userdata;
391 
392 	if (service_stats_enabled(s)) {
393 		uint64_t start = rte_rdtsc();
394 		int rc = s->spec.callback(userdata);
395 
396 		struct service_stats *service_stats =
397 			&cs->service_stats[service_idx];
398 
399 		service_counter_add(&service_stats->calls, 1);
400 
401 		if (rc == -EAGAIN)
402 			service_counter_add(&service_stats->idle_calls, 1);
403 		else if (rc != 0)
404 			service_counter_add(&service_stats->error_calls, 1);
405 
406 		if (likely(rc != -EAGAIN)) {
407 			uint64_t end = rte_rdtsc();
408 			uint64_t cycles = end - start;
409 
410 			service_counter_add(&cs->cycles, cycles);
411 			service_counter_add(&service_stats->cycles, cycles);
412 		}
413 	} else {
414 		s->spec.callback(userdata);
415 	}
416 	rte_eal_trace_service_run_end(service_idx, rte_lcore_id());
417 }
418 
419 
420 /* Expects the service 's' is valid. */
421 static int32_t
422 service_run(uint32_t i, struct core_state *cs, const uint64_t *mapped_services,
423 	    struct rte_service_spec_impl *s, uint32_t serialize_mt_unsafe)
424 {
425 	if (!s)
426 		return -EINVAL;
427 
428 	/* comp_runstate and app_runstate act as the guard variables.
429 	 * Use load-acquire memory order. This synchronizes with
430 	 * store-release in service state set functions.
431 	 */
432 	if (rte_atomic_load_explicit(&s->comp_runstate, rte_memory_order_acquire) !=
433 			RUNSTATE_RUNNING ||
434 	    rte_atomic_load_explicit(&s->app_runstate, rte_memory_order_acquire) !=
435 			RUNSTATE_RUNNING ||
436 	    !rte_bitset_test(mapped_services, i)) {
437 		rte_bitset_clear(cs->service_active_on_lcore, i);
438 		return -ENOEXEC;
439 	}
440 
441 	rte_bitset_set(cs->service_active_on_lcore, i);
442 
443 	if ((service_mt_safe(s) == 0) && (serialize_mt_unsafe == 1)) {
444 		if (!rte_spinlock_trylock(&s->execute_lock))
445 			return -EBUSY;
446 
447 		service_runner_do_callback(s, cs, i);
448 		rte_spinlock_unlock(&s->execute_lock);
449 	} else
450 		service_runner_do_callback(s, cs, i);
451 
452 	return 0;
453 }
454 
455 int32_t
456 rte_service_may_be_active(uint32_t id)
457 {
458 	uint32_t ids[RTE_MAX_LCORE] = {0};
459 	int32_t lcore_count = rte_service_lcore_list(ids, RTE_MAX_LCORE);
460 	int i;
461 
462 	if (!service_valid(id))
463 		return -EINVAL;
464 
465 	for (i = 0; i < lcore_count; i++) {
466 		if (rte_bitset_test(lcore_states[ids[i]].service_active_on_lcore, id))
467 			return 1;
468 	}
469 
470 	return 0;
471 }
472 
473 int32_t
474 rte_service_run_iter_on_app_lcore(uint32_t id, uint32_t serialize_mt_unsafe)
475 {
476 	struct core_state *cs = &lcore_states[rte_lcore_id()];
477 	struct rte_service_spec_impl *s;
478 
479 	SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
480 
481 	/* Increment num_mapped_cores to reflect that this core is
482 	 * now mapped capable of running the service.
483 	 */
484 	rte_atomic_fetch_add_explicit(&s->num_mapped_cores, 1, rte_memory_order_relaxed);
485 
486 	RTE_BITSET_DECLARE(all_services, RTE_SERVICE_NUM_MAX);
487 	rte_bitset_set_all(all_services, RTE_SERVICE_NUM_MAX);
488 	int ret = service_run(id, cs, all_services, s, serialize_mt_unsafe);
489 
490 	rte_atomic_fetch_sub_explicit(&s->num_mapped_cores, 1, rte_memory_order_relaxed);
491 
492 	return ret;
493 }
494 
495 static int32_t
496 service_runner_func(void *arg)
497 {
498 	RTE_SET_USED(arg);
499 	const int lcore = rte_lcore_id();
500 	struct core_state *cs = &lcore_states[lcore];
501 
502 	rte_atomic_store_explicit(&cs->thread_active, 1, rte_memory_order_seq_cst);
503 
504 	/* runstate act as the guard variable. Use load-acquire
505 	 * memory order here to synchronize with store-release
506 	 * in runstate update functions.
507 	 */
508 	while (rte_atomic_load_explicit(&cs->runstate, rte_memory_order_acquire) ==
509 			RUNSTATE_RUNNING) {
510 		ssize_t id;
511 
512 		RTE_BITSET_FOREACH_SET(id, cs->mapped_services, RTE_SERVICE_NUM_MAX) {
513 			/* return value ignored as no change to code flow */
514 			service_run(id, cs, cs->mapped_services, service_get(id), 1);
515 		}
516 
517 		rte_atomic_store_explicit(&cs->loops, cs->loops + 1, rte_memory_order_relaxed);
518 	}
519 
520 	/* Switch off this core for all services, to ensure that future
521 	 * calls to may_be_active() know this core is switched off.
522 	 */
523 	rte_bitset_clear_all(cs->service_active_on_lcore, RTE_SERVICE_NUM_MAX);
524 
525 	/* Use SEQ CST memory ordering to avoid any re-ordering around
526 	 * this store, ensuring that once this store is visible, the service
527 	 * lcore thread really is done in service cores code.
528 	 */
529 	rte_atomic_store_explicit(&cs->thread_active, 0, rte_memory_order_seq_cst);
530 	return 0;
531 }
532 
533 int32_t
534 rte_service_lcore_may_be_active(uint32_t lcore)
535 {
536 	if (lcore >= RTE_MAX_LCORE || !lcore_states[lcore].is_service_core)
537 		return -EINVAL;
538 
539 	/* Load thread_active using ACQUIRE to avoid instructions dependent on
540 	 * the result being re-ordered before this load completes.
541 	 */
542 	return rte_atomic_load_explicit(&lcore_states[lcore].thread_active,
543 			       rte_memory_order_acquire);
544 }
545 
546 int32_t
547 rte_service_lcore_count(void)
548 {
549 	int32_t count = 0;
550 	uint32_t i;
551 	for (i = 0; i < RTE_MAX_LCORE; i++)
552 		count += lcore_states[i].is_service_core;
553 	return count;
554 }
555 
556 int32_t
557 rte_service_lcore_list(uint32_t array[], uint32_t n)
558 {
559 	uint32_t count = rte_service_lcore_count();
560 	if (count > n)
561 		return -ENOMEM;
562 
563 	if (!array)
564 		return -EINVAL;
565 
566 	uint32_t i;
567 	uint32_t idx = 0;
568 	for (i = 0; i < RTE_MAX_LCORE; i++) {
569 		struct core_state *cs = &lcore_states[i];
570 		if (cs->is_service_core) {
571 			array[idx] = i;
572 			idx++;
573 		}
574 	}
575 
576 	return count;
577 }
578 
579 int32_t
580 rte_service_lcore_count_services(uint32_t lcore)
581 {
582 	if (lcore >= RTE_MAX_LCORE)
583 		return -EINVAL;
584 
585 	struct core_state *cs = &lcore_states[lcore];
586 	if (!cs->is_service_core)
587 		return -ENOTSUP;
588 
589 	return rte_bitset_count_set(cs->mapped_services, RTE_SERVICE_NUM_MAX);
590 }
591 
592 int32_t
593 rte_service_start_with_defaults(void)
594 {
595 	/* create a default mapping from cores to services, then start the
596 	 * services to make them transparent to unaware applications.
597 	 */
598 	uint32_t i;
599 	int ret;
600 	uint32_t count = rte_service_get_count();
601 
602 	int32_t lcore_iter = 0;
603 	uint32_t ids[RTE_MAX_LCORE] = {0};
604 	int32_t lcore_count = rte_service_lcore_list(ids, RTE_MAX_LCORE);
605 
606 	if (lcore_count == 0)
607 		return -ENOTSUP;
608 
609 	for (i = 0; (int)i < lcore_count; i++)
610 		rte_service_lcore_start(ids[i]);
611 
612 	for (i = 0; i < count; i++) {
613 		/* do 1:1 core mapping here, with each service getting
614 		 * assigned a single core by default. Adding multiple services
615 		 * should multiplex to a single core, or 1:1 if there are the
616 		 * same amount of services as service-cores
617 		 */
618 		ret = rte_service_map_lcore_set(i, ids[lcore_iter], 1);
619 		if (ret)
620 			return -ENODEV;
621 
622 		lcore_iter++;
623 		if (lcore_iter >= lcore_count)
624 			lcore_iter = 0;
625 
626 		ret = rte_service_runstate_set(i, 1);
627 		if (ret)
628 			return -ENOEXEC;
629 	}
630 
631 	return 0;
632 }
633 
634 static int32_t
635 service_update(uint32_t sid, uint32_t lcore, uint32_t *set, uint32_t *enabled)
636 {
637 	/* validate ID, or return error value */
638 	if (!service_valid(sid) || lcore >= RTE_MAX_LCORE ||
639 			!lcore_states[lcore].is_service_core)
640 		return -EINVAL;
641 
642 	if (set) {
643 		uint64_t lcore_mapped = rte_bitset_test(lcore_states[lcore].mapped_services, sid);
644 
645 		if (*set && !lcore_mapped) {
646 			rte_bitset_set(lcore_states[lcore].mapped_services, sid);
647 			rte_atomic_fetch_add_explicit(&rte_services[sid].num_mapped_cores,
648 				1, rte_memory_order_relaxed);
649 		}
650 		if (!*set && lcore_mapped) {
651 			rte_bitset_clear(lcore_states[lcore].mapped_services, sid);
652 			rte_atomic_fetch_sub_explicit(&rte_services[sid].num_mapped_cores,
653 				1, rte_memory_order_relaxed);
654 		}
655 	}
656 
657 	if (enabled)
658 		*enabled = rte_bitset_test(lcore_states[lcore].mapped_services, sid);
659 
660 	return 0;
661 }
662 
663 int32_t
664 rte_service_map_lcore_set(uint32_t id, uint32_t lcore, uint32_t enabled)
665 {
666 	uint32_t on = enabled > 0;
667 	rte_eal_trace_service_map_lcore(id, lcore, enabled);
668 	return service_update(id, lcore, &on, 0);
669 }
670 
671 int32_t
672 rte_service_map_lcore_get(uint32_t id, uint32_t lcore)
673 {
674 	uint32_t enabled;
675 	int ret = service_update(id, lcore, 0, &enabled);
676 	if (ret == 0)
677 		return enabled;
678 	return ret;
679 }
680 
681 static void
682 set_lcore_state(uint32_t lcore, int32_t state)
683 {
684 	/* mark core state in hugepage backed config */
685 	struct rte_config *cfg = rte_eal_get_configuration();
686 	cfg->lcore_role[lcore] = state;
687 
688 	/* mark state in process local lcore_config */
689 	lcore_config[lcore].core_role = state;
690 
691 	/* update per-lcore optimized state tracking */
692 	lcore_states[lcore].is_service_core = (state == ROLE_SERVICE);
693 
694 	rte_eal_trace_service_lcore_state_change(lcore, state);
695 }
696 
697 int32_t
698 rte_service_lcore_reset_all(void)
699 {
700 	/* loop over cores, reset all mapped services */
701 	uint32_t i;
702 	for (i = 0; i < RTE_MAX_LCORE; i++) {
703 		if (lcore_states[i].is_service_core) {
704 			rte_bitset_clear_all(lcore_states[i].mapped_services, RTE_SERVICE_NUM_MAX);
705 			set_lcore_state(i, ROLE_RTE);
706 			/* runstate act as guard variable Use
707 			 * store-release memory order here to synchronize
708 			 * with load-acquire in runstate read functions.
709 			 */
710 			rte_atomic_store_explicit(&lcore_states[i].runstate,
711 				RUNSTATE_STOPPED, rte_memory_order_release);
712 		}
713 	}
714 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++)
715 		rte_atomic_store_explicit(&rte_services[i].num_mapped_cores, 0,
716 			rte_memory_order_relaxed);
717 
718 	return 0;
719 }
720 
721 int32_t
722 rte_service_lcore_add(uint32_t lcore)
723 {
724 	if (lcore >= RTE_MAX_LCORE)
725 		return -EINVAL;
726 	if (lcore_states[lcore].is_service_core)
727 		return -EALREADY;
728 
729 	set_lcore_state(lcore, ROLE_SERVICE);
730 
731 	/* ensure that after adding a core the mask and state are defaults */
732 	rte_bitset_clear_all(lcore_states[lcore].mapped_services, RTE_SERVICE_NUM_MAX);
733 	/* Use store-release memory order here to synchronize with
734 	 * load-acquire in runstate read functions.
735 	 */
736 	rte_atomic_store_explicit(&lcore_states[lcore].runstate, RUNSTATE_STOPPED,
737 		rte_memory_order_release);
738 
739 	return rte_eal_wait_lcore(lcore);
740 }
741 
742 int32_t
743 rte_service_lcore_del(uint32_t lcore)
744 {
745 	if (lcore >= RTE_MAX_LCORE)
746 		return -EINVAL;
747 
748 	struct core_state *cs = &lcore_states[lcore];
749 	if (!cs->is_service_core)
750 		return -EINVAL;
751 
752 	/* runstate act as the guard variable. Use load-acquire
753 	 * memory order here to synchronize with store-release
754 	 * in runstate update functions.
755 	 */
756 	if (rte_atomic_load_explicit(&cs->runstate, rte_memory_order_acquire) !=
757 			RUNSTATE_STOPPED)
758 		return -EBUSY;
759 
760 	set_lcore_state(lcore, ROLE_RTE);
761 
762 	rte_smp_wmb();
763 	return 0;
764 }
765 
766 int32_t
767 rte_service_lcore_start(uint32_t lcore)
768 {
769 	if (lcore >= RTE_MAX_LCORE)
770 		return -EINVAL;
771 
772 	struct core_state *cs = &lcore_states[lcore];
773 	if (!cs->is_service_core)
774 		return -EINVAL;
775 
776 	/* runstate act as the guard variable. Use load-acquire
777 	 * memory order here to synchronize with store-release
778 	 * in runstate update functions.
779 	 */
780 	if (rte_atomic_load_explicit(&cs->runstate, rte_memory_order_acquire) ==
781 			RUNSTATE_RUNNING)
782 		return -EALREADY;
783 
784 	/* set core to run state first, and then launch otherwise it will
785 	 * return immediately as runstate keeps it in the service poll loop
786 	 */
787 	/* Use load-acquire memory order here to synchronize with
788 	 * store-release in runstate update functions.
789 	 */
790 	rte_atomic_store_explicit(&cs->runstate, RUNSTATE_RUNNING, rte_memory_order_release);
791 
792 	rte_eal_trace_service_lcore_start(lcore);
793 
794 	int ret = rte_eal_remote_launch(service_runner_func, 0, lcore);
795 	/* returns -EBUSY if the core is already launched, 0 on success */
796 	return ret;
797 }
798 
799 int32_t
800 rte_service_lcore_stop(uint32_t lcore)
801 {
802 	if (lcore >= RTE_MAX_LCORE)
803 		return -EINVAL;
804 
805 	/* runstate act as the guard variable. Use load-acquire
806 	 * memory order here to synchronize with store-release
807 	 * in runstate update functions.
808 	 */
809 	if (rte_atomic_load_explicit(&lcore_states[lcore].runstate, rte_memory_order_acquire) ==
810 			RUNSTATE_STOPPED)
811 		return -EALREADY;
812 
813 	uint32_t i;
814 	struct core_state *cs = &lcore_states[lcore];
815 
816 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
817 		bool enabled = rte_bitset_test(cs->mapped_services, i);
818 		bool service_running = rte_service_runstate_get(i);
819 		bool only_core = (1 ==
820 			rte_atomic_load_explicit(&rte_services[i].num_mapped_cores,
821 				rte_memory_order_relaxed));
822 
823 		/* if the core is mapped, and the service is running, and this
824 		 * is the only core that is mapped, the service would cease to
825 		 * run if this core stopped, so fail instead.
826 		 */
827 		if (enabled && service_running && only_core)
828 			return -EBUSY;
829 	}
830 
831 	/* Use store-release memory order here to synchronize with
832 	 * load-acquire in runstate read functions.
833 	 */
834 	rte_atomic_store_explicit(&lcore_states[lcore].runstate, RUNSTATE_STOPPED,
835 		rte_memory_order_release);
836 
837 	rte_eal_trace_service_lcore_stop(lcore);
838 
839 	return 0;
840 }
841 
842 static uint64_t
843 lcore_attr_get_loops(unsigned int lcore)
844 {
845 	struct core_state *cs = &lcore_states[lcore];
846 
847 	return rte_atomic_load_explicit(&cs->loops, rte_memory_order_relaxed);
848 }
849 
850 static uint64_t
851 lcore_attr_get_cycles(unsigned int lcore)
852 {
853 	struct core_state *cs = &lcore_states[lcore];
854 
855 	return rte_atomic_load_explicit(&cs->cycles, rte_memory_order_relaxed);
856 }
857 
858 static uint64_t
859 lcore_attr_get_service_calls(uint32_t service_id, unsigned int lcore)
860 {
861 	struct core_state *cs = &lcore_states[lcore];
862 
863 	return rte_atomic_load_explicit(&cs->service_stats[service_id].calls,
864 		rte_memory_order_relaxed);
865 }
866 
867 static uint64_t
868 lcore_attr_get_service_idle_calls(uint32_t service_id, unsigned int lcore)
869 {
870 	struct core_state *cs = &lcore_states[lcore];
871 
872 	return rte_atomic_load_explicit(&cs->service_stats[service_id].idle_calls,
873 		rte_memory_order_relaxed);
874 }
875 
876 static uint64_t
877 lcore_attr_get_service_error_calls(uint32_t service_id, unsigned int lcore)
878 {
879 	struct core_state *cs = &lcore_states[lcore];
880 
881 	return rte_atomic_load_explicit(&cs->service_stats[service_id].error_calls,
882 		rte_memory_order_relaxed);
883 }
884 
885 static uint64_t
886 lcore_attr_get_service_cycles(uint32_t service_id, unsigned int lcore)
887 {
888 	struct core_state *cs = &lcore_states[lcore];
889 
890 	return rte_atomic_load_explicit(&cs->service_stats[service_id].cycles,
891 		rte_memory_order_relaxed);
892 }
893 
894 typedef uint64_t (*lcore_attr_get_fun)(uint32_t service_id,
895 				       unsigned int lcore);
896 
897 static uint64_t
898 attr_get(uint32_t id, lcore_attr_get_fun lcore_attr_get)
899 {
900 	unsigned int lcore;
901 	uint64_t sum = 0;
902 
903 	for (lcore = 0; lcore < RTE_MAX_LCORE; lcore++) {
904 		if (lcore_states[lcore].is_service_core)
905 			sum += lcore_attr_get(id, lcore);
906 	}
907 
908 	return sum;
909 }
910 
911 static uint64_t
912 attr_get_service_calls(uint32_t service_id)
913 {
914 	return attr_get(service_id, lcore_attr_get_service_calls);
915 }
916 
917 static uint64_t
918 attr_get_service_idle_calls(uint32_t service_id)
919 {
920 	return attr_get(service_id, lcore_attr_get_service_idle_calls);
921 }
922 
923 static uint64_t
924 attr_get_service_error_calls(uint32_t service_id)
925 {
926 	return attr_get(service_id, lcore_attr_get_service_error_calls);
927 }
928 
929 static uint64_t
930 attr_get_service_cycles(uint32_t service_id)
931 {
932 	return attr_get(service_id, lcore_attr_get_service_cycles);
933 }
934 
935 int32_t
936 rte_service_attr_get(uint32_t id, uint32_t attr_id, uint64_t *attr_value)
937 {
938 	if (!service_valid(id))
939 		return -EINVAL;
940 
941 	if (!attr_value)
942 		return -EINVAL;
943 
944 	switch (attr_id) {
945 	case RTE_SERVICE_ATTR_CALL_COUNT:
946 		*attr_value = attr_get_service_calls(id);
947 		return 0;
948 	case RTE_SERVICE_ATTR_IDLE_CALL_COUNT:
949 		*attr_value = attr_get_service_idle_calls(id);
950 		return 0;
951 	case RTE_SERVICE_ATTR_ERROR_CALL_COUNT:
952 		*attr_value = attr_get_service_error_calls(id);
953 		return 0;
954 	case RTE_SERVICE_ATTR_CYCLES:
955 		*attr_value = attr_get_service_cycles(id);
956 		return 0;
957 	default:
958 		return -EINVAL;
959 	}
960 }
961 
962 int32_t
963 rte_service_lcore_attr_get(uint32_t lcore, uint32_t attr_id,
964 			   uint64_t *attr_value)
965 {
966 	struct core_state *cs;
967 
968 	if (lcore >= RTE_MAX_LCORE || !attr_value)
969 		return -EINVAL;
970 
971 	cs = &lcore_states[lcore];
972 	if (!cs->is_service_core)
973 		return -ENOTSUP;
974 
975 	switch (attr_id) {
976 	case RTE_SERVICE_LCORE_ATTR_LOOPS:
977 		*attr_value = lcore_attr_get_loops(lcore);
978 		return 0;
979 	case RTE_SERVICE_LCORE_ATTR_CYCLES:
980 		*attr_value = lcore_attr_get_cycles(lcore);
981 		return 0;
982 	default:
983 		return -EINVAL;
984 	}
985 }
986 
987 int32_t
988 rte_service_attr_reset_all(uint32_t id)
989 {
990 	unsigned int lcore;
991 
992 	if (!service_valid(id))
993 		return -EINVAL;
994 
995 	for (lcore = 0; lcore < RTE_MAX_LCORE; lcore++) {
996 		struct core_state *cs = &lcore_states[lcore];
997 
998 		cs->service_stats[id] = (struct service_stats) {};
999 	}
1000 
1001 	return 0;
1002 }
1003 
1004 int32_t
1005 rte_service_lcore_attr_reset_all(uint32_t lcore)
1006 {
1007 	struct core_state *cs;
1008 
1009 	if (lcore >= RTE_MAX_LCORE)
1010 		return -EINVAL;
1011 
1012 	cs = &lcore_states[lcore];
1013 	if (!cs->is_service_core)
1014 		return -ENOTSUP;
1015 
1016 	cs->loops = 0;
1017 
1018 	return 0;
1019 }
1020 
1021 static void
1022 service_dump_one(FILE *f, uint32_t id)
1023 {
1024 	struct rte_service_spec_impl *s;
1025 	uint64_t service_calls;
1026 	uint64_t service_cycles;
1027 
1028 	service_calls = attr_get_service_calls(id);
1029 	service_cycles = attr_get_service_cycles(id);
1030 
1031 	/* avoid divide by zero */
1032 	if (service_calls == 0)
1033 		service_calls = 1;
1034 
1035 	s = service_get(id);
1036 
1037 	fprintf(f, "  %s: stats %d\tcalls %"PRIu64"\tcycles %"
1038 		PRIu64"\tavg: %"PRIu64"\n",
1039 		s->spec.name, service_stats_enabled(s), service_calls,
1040 		service_cycles, service_cycles / service_calls);
1041 }
1042 
1043 static void
1044 service_dump_calls_per_lcore(FILE *f, uint32_t lcore)
1045 {
1046 	uint32_t i;
1047 	struct core_state *cs = &lcore_states[lcore];
1048 
1049 	fprintf(f, "%02d\t", lcore);
1050 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
1051 		if (!service_registered(i))
1052 			continue;
1053 		fprintf(f, "%"PRIu64"\t", cs->service_stats[i].calls);
1054 	}
1055 	fprintf(f, "\n");
1056 }
1057 
1058 int32_t
1059 rte_service_dump(FILE *f, uint32_t id)
1060 {
1061 	uint32_t i;
1062 	int print_one = (id != UINT32_MAX);
1063 
1064 	/* print only the specified service */
1065 	if (print_one) {
1066 		struct rte_service_spec_impl *s;
1067 		SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
1068 		fprintf(f, "Service %s Summary\n", s->spec.name);
1069 		service_dump_one(f, id);
1070 		return 0;
1071 	}
1072 
1073 	/* print all services, as UINT32_MAX was passed as id */
1074 	fprintf(f, "Services Summary\n");
1075 	for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
1076 		if (!service_registered(i))
1077 			continue;
1078 		service_dump_one(f, i);
1079 	}
1080 
1081 	fprintf(f, "Service Cores Summary\n");
1082 	for (i = 0; i < RTE_MAX_LCORE; i++) {
1083 		if (lcore_config[i].core_role != ROLE_SERVICE)
1084 			continue;
1085 
1086 		service_dump_calls_per_lcore(f, i);
1087 	}
1088 
1089 	return 0;
1090 }
1091