xref: /spdk/module/scheduler/dynamic/scheduler_dynamic.c (revision e45c8090e9922646f14d4bf89c2cf8c443322003)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2021 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/likely.h"
8 #include "spdk/event.h"
9 #include "spdk/log.h"
10 #include "spdk/env.h"
11 
12 #include "spdk/thread.h"
13 #include "spdk_internal/event.h"
14 #include "spdk/scheduler.h"
15 #include "spdk_internal/usdt.h"
16 
17 static uint32_t g_main_lcore;
18 
19 struct core_stats {
20 	uint64_t busy;
21 	uint64_t idle;
22 	uint32_t thread_count;
23 };
24 
25 static struct core_stats *g_cores;
26 
27 uint8_t g_scheduler_load_limit = 20;
28 uint8_t g_scheduler_core_limit = 80;
29 uint8_t g_scheduler_core_busy = 95;
30 
31 static uint8_t
32 _busy_pct(uint64_t busy, uint64_t idle)
33 {
34 	if ((busy + idle) == 0) {
35 		return 0;
36 	}
37 
38 	return busy * 100 / (busy + idle);
39 }
40 
41 static uint8_t
42 _get_thread_load(struct spdk_scheduler_thread_info *thread_info)
43 {
44 	uint64_t busy, idle;
45 
46 	busy = thread_info->current_stats.busy_tsc;
47 	idle = thread_info->current_stats.idle_tsc;
48 
49 	/* return percentage of time thread was busy */
50 	return _busy_pct(busy, idle);
51 }
52 
53 typedef void (*_foreach_fn)(struct spdk_scheduler_thread_info *thread_info);
54 
55 static void
56 _foreach_thread(struct spdk_scheduler_core_info *cores_info, _foreach_fn fn)
57 {
58 	struct spdk_scheduler_core_info *core;
59 	uint32_t i, j;
60 
61 	SPDK_ENV_FOREACH_CORE(i) {
62 		core = &cores_info[i];
63 		for (j = 0; j < core->threads_count; j++) {
64 			fn(&core->thread_infos[j]);
65 		}
66 	}
67 }
68 
69 static void
70 prepare_to_sleep(uint32_t core)
71 {
72 	struct spdk_governor *governor = spdk_governor_get();
73 	int rc;
74 
75 	if (governor == NULL) {
76 		return;
77 	}
78 
79 	rc = governor->set_core_freq_min(core);
80 	if (rc < 0) {
81 		SPDK_ERRLOG("could not set_core_freq_min(%d)\n", core);
82 	}
83 }
84 
85 static void
86 prepare_to_wake(uint32_t core)
87 {
88 	struct spdk_governor *governor = spdk_governor_get();
89 	int rc;
90 
91 	if (governor == NULL) {
92 		return;
93 	}
94 
95 	rc = governor->set_core_freq_max(core);
96 	if (rc < 0) {
97 		SPDK_ERRLOG("could not set_core_freq_max(%d)\n", core);
98 	}
99 }
100 
101 static void
102 _move_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core)
103 {
104 	struct core_stats *dst = &g_cores[dst_core];
105 	struct core_stats *src = &g_cores[thread_info->lcore];
106 	uint64_t busy_tsc = thread_info->current_stats.busy_tsc;
107 	uint8_t busy_pct = _busy_pct(src->busy, src->idle);
108 	uint64_t tsc;
109 
110 	SPDK_DTRACE_PROBE2(dynsched_move, thread_info, dst_core);
111 
112 	if (src == dst) {
113 		/* Don't modify stats if thread is already on that core. */
114 		return;
115 	}
116 
117 	dst->busy += spdk_min(UINT64_MAX - dst->busy, busy_tsc);
118 	dst->idle -= spdk_min(dst->idle, busy_tsc);
119 	dst->thread_count++;
120 
121 	/* Adjust busy/idle from core as if thread was not present on it.
122 	 * Core load will reflect the sum of all remaining threads on it. */
123 	src->busy -= spdk_min(src->busy, busy_tsc);
124 	src->idle += spdk_min(UINT64_MAX - src->idle, busy_tsc);
125 
126 	if (busy_pct >= g_scheduler_core_busy &&
127 	    _busy_pct(src->busy, src->idle) < g_scheduler_core_limit) {
128 		/* This core was so busy that we cannot assume all of busy_tsc
129 		 * consumed by the moved thread will now be idle_tsc - it's
130 		 * very possible the remaining threads will use these cycles
131 		 * as busy_tsc.
132 		 *
133 		 * So make sure we don't drop the updated estimate below
134 		 * g_scheduler_core_limit, so that other cores can't
135 		 * move threads to this core during this scheduling
136 		 * period.
137 		 */
138 		tsc = src->busy + src->idle;
139 		src->busy = tsc * g_scheduler_core_limit / 100;
140 		src->idle = tsc - src->busy;
141 	}
142 	assert(src->thread_count > 0);
143 	src->thread_count--;
144 
145 	thread_info->lcore = dst_core;
146 }
147 
148 static bool
149 _is_core_at_limit(uint32_t core_id)
150 {
151 	struct core_stats *core = &g_cores[core_id];
152 	uint64_t busy, idle;
153 
154 	/* Core with no or single thread cannot be over the limit. */
155 	if (core->thread_count <= 1) {
156 		return false;
157 	}
158 
159 	busy = core->busy;
160 	idle = core->idle;
161 
162 	/* No work was done, exit before possible division by 0. */
163 	if (busy == 0) {
164 		return false;
165 	}
166 
167 	/* Work done was less than the limit */
168 	if (_busy_pct(busy, idle) < g_scheduler_core_limit) {
169 		return false;
170 	}
171 
172 	return true;
173 }
174 
175 static bool
176 _can_core_fit_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core)
177 {
178 	struct core_stats *dst = &g_cores[dst_core];
179 	uint64_t new_busy_tsc, new_idle_tsc;
180 
181 	/* Thread can always fit on the core it's currently on. */
182 	if (thread_info->lcore == dst_core) {
183 		return true;
184 	}
185 
186 	/* Reactors in interrupt mode do not update stats,
187 	 * a thread can always fit into reactor in interrupt mode. */
188 	if (dst->busy + dst->idle == 0) {
189 		return true;
190 	}
191 
192 	/* Core has no threads. */
193 	if (dst->thread_count == 0) {
194 		return true;
195 	}
196 
197 	/* Core doesn't have enough idle_tsc to take this thread. */
198 	if (dst->idle < thread_info->current_stats.busy_tsc) {
199 		return false;
200 	}
201 
202 	new_busy_tsc = dst->busy + thread_info->current_stats.busy_tsc;
203 	new_idle_tsc = dst->idle - thread_info->current_stats.busy_tsc;
204 
205 	/* Core cannot fit this thread if it would put it over the
206 	 * g_scheduler_core_limit. */
207 	return _busy_pct(new_busy_tsc, new_idle_tsc) < g_scheduler_core_limit;
208 }
209 
210 static uint32_t
211 _find_optimal_core(struct spdk_scheduler_thread_info *thread_info)
212 {
213 	uint32_t i;
214 	uint32_t current_lcore = thread_info->lcore;
215 	uint32_t least_busy_lcore = thread_info->lcore;
216 	struct spdk_thread *thread;
217 	struct spdk_cpuset *cpumask;
218 	bool core_at_limit = _is_core_at_limit(current_lcore);
219 
220 	thread = spdk_thread_get_by_id(thread_info->thread_id);
221 	if (thread == NULL) {
222 		return current_lcore;
223 	}
224 	cpumask = spdk_thread_get_cpumask(thread);
225 
226 	/* Find a core that can fit the thread. */
227 	SPDK_ENV_FOREACH_CORE(i) {
228 		/* Ignore cores outside cpumask. */
229 		if (!spdk_cpuset_get_cpu(cpumask, i)) {
230 			continue;
231 		}
232 
233 		/* Search for least busy core. */
234 		if (g_cores[i].busy < g_cores[least_busy_lcore].busy) {
235 			least_busy_lcore = i;
236 		}
237 
238 		/* Skip cores that cannot fit the thread and current one. */
239 		if (!_can_core_fit_thread(thread_info, i) || i == current_lcore) {
240 			continue;
241 		}
242 		if (i == g_main_lcore) {
243 			/* First consider g_main_lcore, consolidate threads on main lcore if possible. */
244 			return i;
245 		} else if (i < current_lcore && current_lcore != g_main_lcore) {
246 			/* Lower core id was found, move to consolidate threads on lowest core ids. */
247 			return i;
248 		} else if (core_at_limit) {
249 			/* When core is over the limit, any core id is better than current one. */
250 			return i;
251 		}
252 	}
253 
254 	/* For cores over the limit, place the thread on least busy core
255 	 * to balance threads. */
256 	if (core_at_limit) {
257 		return least_busy_lcore;
258 	}
259 
260 	/* If no better core is found, remain on the same one. */
261 	return current_lcore;
262 }
263 
264 static int
265 init(void)
266 {
267 	g_main_lcore = spdk_env_get_current_core();
268 
269 	if (spdk_governor_set("dpdk_governor") != 0) {
270 		SPDK_NOTICELOG("Unable to initialize dpdk governor\n");
271 	}
272 
273 	g_cores = calloc(spdk_env_get_last_core() + 1, sizeof(struct core_stats));
274 	if (g_cores == NULL) {
275 		SPDK_ERRLOG("Failed to allocate memory for dynamic scheduler core stats.\n");
276 		return -ENOMEM;
277 	}
278 
279 	return 0;
280 }
281 
282 static void
283 deinit(void)
284 {
285 	free(g_cores);
286 	g_cores = NULL;
287 	spdk_governor_set(NULL);
288 }
289 
290 static void
291 _balance_idle(struct spdk_scheduler_thread_info *thread_info)
292 {
293 	if (_get_thread_load(thread_info) >= g_scheduler_load_limit) {
294 		return;
295 	}
296 	/* This thread is idle, move it to the main core. */
297 	_move_thread(thread_info, g_main_lcore);
298 }
299 
300 static void
301 _balance_active(struct spdk_scheduler_thread_info *thread_info)
302 {
303 	uint32_t target_lcore;
304 
305 	if (_get_thread_load(thread_info) < g_scheduler_load_limit) {
306 		return;
307 	}
308 
309 	/* This thread is active. */
310 	target_lcore = _find_optimal_core(thread_info);
311 	_move_thread(thread_info, target_lcore);
312 }
313 
314 static void
315 balance(struct spdk_scheduler_core_info *cores_info, uint32_t cores_count)
316 {
317 	struct spdk_reactor *reactor;
318 	struct spdk_governor *governor;
319 	struct spdk_scheduler_core_info *core;
320 	struct core_stats *main_core;
321 	uint32_t i;
322 	int rc;
323 	bool busy_threads_present = false;
324 
325 	SPDK_DTRACE_PROBE1(dynsched_balance, cores_count);
326 
327 	SPDK_ENV_FOREACH_CORE(i) {
328 		g_cores[i].thread_count = cores_info[i].threads_count;
329 		g_cores[i].busy = cores_info[i].current_busy_tsc;
330 		g_cores[i].idle = cores_info[i].current_idle_tsc;
331 		SPDK_DTRACE_PROBE2(dynsched_core_info, i, &cores_info[i]);
332 	}
333 	main_core = &g_cores[g_main_lcore];
334 
335 	/* Distribute threads in two passes, to make sure updated core stats are considered on each pass.
336 	 * 1) Move all idle threads to main core. */
337 	_foreach_thread(cores_info, _balance_idle);
338 	/* 2) Distribute active threads across all cores. */
339 	_foreach_thread(cores_info, _balance_active);
340 
341 	/* Switch unused cores to interrupt mode and switch cores to polled mode
342 	 * if they will be used after rebalancing */
343 	SPDK_ENV_FOREACH_CORE(i) {
344 		reactor = spdk_reactor_get(i);
345 		assert(reactor != NULL);
346 
347 		core = &cores_info[i];
348 		/* We can switch mode only if reactor already does not have any threads */
349 		if (g_cores[i].thread_count == 0 && TAILQ_EMPTY(&reactor->threads)) {
350 			core->interrupt_mode = true;
351 			prepare_to_sleep(i);
352 		} else if (g_cores[i].thread_count != 0) {
353 			core->interrupt_mode = false;
354 			if (i != g_main_lcore) {
355 				/* If a thread is present on non g_main_lcore,
356 				 * it has to be busy. */
357 				busy_threads_present = true;
358 				prepare_to_wake(i);
359 			}
360 		}
361 	}
362 
363 	governor = spdk_governor_get();
364 	if (governor == NULL) {
365 		return;
366 	}
367 
368 	/* Change main core frequency if needed */
369 	if (busy_threads_present) {
370 		rc = governor->set_core_freq_max(g_main_lcore);
371 		if (rc < 0) {
372 			SPDK_ERRLOG("setting default frequency for core %u failed\n", g_main_lcore);
373 		}
374 	} else if (main_core->busy > main_core->idle) {
375 		rc = governor->core_freq_up(g_main_lcore);
376 		if (rc < 0) {
377 			SPDK_ERRLOG("increasing frequency for core %u failed\n", g_main_lcore);
378 		}
379 	} else {
380 		rc = governor->core_freq_down(g_main_lcore);
381 		if (rc < 0) {
382 			SPDK_ERRLOG("lowering frequency for core %u failed\n", g_main_lcore);
383 		}
384 	}
385 }
386 
387 struct json_scheduler_opts {
388 	uint8_t load_limit;
389 	uint8_t core_limit;
390 	uint8_t core_busy;
391 };
392 
393 static const struct spdk_json_object_decoder sched_decoders[] = {
394 	{"load_limit", offsetof(struct json_scheduler_opts, load_limit), spdk_json_decode_uint8, true},
395 	{"core_limit", offsetof(struct json_scheduler_opts, core_limit), spdk_json_decode_uint8, true},
396 	{"core_busy", offsetof(struct json_scheduler_opts, core_busy), spdk_json_decode_uint8, true},
397 };
398 
399 static int
400 set_opts(const struct spdk_json_val *opts)
401 {
402 	struct json_scheduler_opts scheduler_opts;
403 
404 	scheduler_opts.load_limit = g_scheduler_load_limit;
405 	scheduler_opts.core_limit = g_scheduler_core_limit;
406 	scheduler_opts.core_busy = g_scheduler_core_busy;
407 
408 	if (opts != NULL) {
409 		if (spdk_json_decode_object_relaxed(opts, sched_decoders,
410 						    SPDK_COUNTOF(sched_decoders), &scheduler_opts)) {
411 			SPDK_ERRLOG("Decoding scheduler opts JSON failed\n");
412 			return -1;
413 		}
414 	}
415 
416 	SPDK_NOTICELOG("Setting scheduler load limit to %d\n", scheduler_opts.load_limit);
417 	g_scheduler_load_limit = scheduler_opts.load_limit;
418 	SPDK_NOTICELOG("Setting scheduler core limit to %d\n", scheduler_opts.core_limit);
419 	g_scheduler_core_limit = scheduler_opts.core_limit;
420 	SPDK_NOTICELOG("Setting scheduler core busy to %d\n", scheduler_opts.core_busy);
421 	g_scheduler_core_busy = scheduler_opts.core_busy;
422 
423 	return 0;
424 }
425 
426 static void
427 get_opts(struct spdk_json_write_ctx *ctx)
428 {
429 	spdk_json_write_named_uint8(ctx, "load_limit", g_scheduler_load_limit);
430 	spdk_json_write_named_uint8(ctx, "core_limit", g_scheduler_core_limit);
431 	spdk_json_write_named_uint8(ctx, "core_busy", g_scheduler_core_busy);
432 }
433 
434 static struct spdk_scheduler scheduler_dynamic = {
435 	.name = "dynamic",
436 	.init = init,
437 	.deinit = deinit,
438 	.balance = balance,
439 	.set_opts = set_opts,
440 	.get_opts = get_opts,
441 };
442 
443 SPDK_SCHEDULER_REGISTER(scheduler_dynamic);
444