xref: /spdk/module/scheduler/dynamic/scheduler_dynamic.c (revision c6c1234de9e0015e670dd0b51bf6ce39ee0e07bd)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2021 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/likely.h"
8 #include "spdk/event.h"
9 #include "spdk/log.h"
10 #include "spdk/env.h"
11 
12 #include "spdk/thread.h"
13 #include "spdk_internal/event.h"
14 #include "spdk/scheduler.h"
15 #include "spdk_internal/usdt.h"
16 
17 static uint32_t g_main_lcore;
18 
19 struct core_stats {
20 	uint64_t busy;
21 	uint64_t idle;
22 	uint32_t thread_count;
23 	bool isolated;
24 };
25 
26 static struct core_stats *g_cores;
27 
28 uint8_t g_scheduler_load_limit = 20;
29 uint8_t g_scheduler_core_limit = 80;
30 uint8_t g_scheduler_core_busy = 95;
31 
32 static uint8_t
33 _busy_pct(uint64_t busy, uint64_t idle)
34 {
35 	if ((busy + idle) == 0) {
36 		return 0;
37 	}
38 
39 	return busy * 100 / (busy + idle);
40 }
41 
42 static uint8_t
43 _get_thread_load(struct spdk_scheduler_thread_info *thread_info)
44 {
45 	uint64_t busy, idle;
46 
47 	busy = thread_info->current_stats.busy_tsc;
48 	idle = thread_info->current_stats.idle_tsc;
49 
50 	/* return percentage of time thread was busy */
51 	return _busy_pct(busy, idle);
52 }
53 
54 typedef void (*_foreach_fn)(struct spdk_scheduler_thread_info *thread_info);
55 
56 static void
57 _foreach_thread(struct spdk_scheduler_core_info *cores_info, _foreach_fn fn)
58 {
59 	struct spdk_scheduler_core_info *core;
60 	uint32_t i, j;
61 
62 	SPDK_ENV_FOREACH_CORE(i) {
63 		core = &cores_info[i];
64 		/* Skip cores that are isolated */
65 		if (core->isolated) {
66 			continue;
67 		}
68 		for (j = 0; j < core->threads_count; j++) {
69 			fn(&core->thread_infos[j]);
70 		}
71 	}
72 }
73 
74 static void
75 prepare_to_sleep(uint32_t core)
76 {
77 	struct spdk_governor *governor = spdk_governor_get();
78 	int rc;
79 
80 	if (governor == NULL) {
81 		return;
82 	}
83 
84 	rc = governor->set_core_freq_min(core);
85 	if (rc < 0) {
86 		SPDK_ERRLOG("could not set_core_freq_min(%d)\n", core);
87 	}
88 }
89 
90 static void
91 prepare_to_wake(uint32_t core)
92 {
93 	struct spdk_governor *governor = spdk_governor_get();
94 	int rc;
95 
96 	if (governor == NULL) {
97 		return;
98 	}
99 
100 	rc = governor->set_core_freq_max(core);
101 	if (rc < 0) {
102 		SPDK_ERRLOG("could not set_core_freq_max(%d)\n", core);
103 	}
104 }
105 
106 static void
107 _move_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core)
108 {
109 	struct core_stats *dst = &g_cores[dst_core];
110 	struct core_stats *src = &g_cores[thread_info->lcore];
111 	uint64_t busy_tsc = thread_info->current_stats.busy_tsc;
112 	uint8_t busy_pct = _busy_pct(src->busy, src->idle);
113 	uint64_t tsc;
114 
115 	SPDK_DTRACE_PROBE2(dynsched_move, thread_info, dst_core);
116 
117 	if (src == dst) {
118 		/* Don't modify stats if thread is already on that core. */
119 		return;
120 	}
121 
122 	dst->busy += spdk_min(UINT64_MAX - dst->busy, busy_tsc);
123 	dst->idle -= spdk_min(dst->idle, busy_tsc);
124 	dst->thread_count++;
125 
126 	/* Adjust busy/idle from core as if thread was not present on it.
127 	 * Core load will reflect the sum of all remaining threads on it. */
128 	src->busy -= spdk_min(src->busy, busy_tsc);
129 	src->idle += spdk_min(UINT64_MAX - src->idle, busy_tsc);
130 
131 	if (busy_pct >= g_scheduler_core_busy &&
132 	    _busy_pct(src->busy, src->idle) < g_scheduler_core_limit) {
133 		/* This core was so busy that we cannot assume all of busy_tsc
134 		 * consumed by the moved thread will now be idle_tsc - it's
135 		 * very possible the remaining threads will use these cycles
136 		 * as busy_tsc.
137 		 *
138 		 * So make sure we don't drop the updated estimate below
139 		 * g_scheduler_core_limit, so that other cores can't
140 		 * move threads to this core during this scheduling
141 		 * period.
142 		 */
143 		tsc = src->busy + src->idle;
144 		src->busy = tsc * g_scheduler_core_limit / 100;
145 		src->idle = tsc - src->busy;
146 	}
147 	assert(src->thread_count > 0);
148 	src->thread_count--;
149 
150 	thread_info->lcore = dst_core;
151 }
152 
153 static bool
154 _is_core_at_limit(uint32_t core_id)
155 {
156 	struct core_stats *core = &g_cores[core_id];
157 	uint64_t busy, idle;
158 
159 	/* Core with no or single thread cannot be over the limit. */
160 	if (core->thread_count <= 1) {
161 		return false;
162 	}
163 
164 	busy = core->busy;
165 	idle = core->idle;
166 
167 	/* No work was done, exit before possible division by 0. */
168 	if (busy == 0) {
169 		return false;
170 	}
171 
172 	/* Work done was less than the limit */
173 	if (_busy_pct(busy, idle) < g_scheduler_core_limit) {
174 		return false;
175 	}
176 
177 	return true;
178 }
179 
180 static bool
181 _can_core_fit_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core)
182 {
183 	struct core_stats *dst = &g_cores[dst_core];
184 	uint64_t new_busy_tsc, new_idle_tsc;
185 
186 	/* Thread can always fit on the core it's currently on. */
187 	if (thread_info->lcore == dst_core) {
188 		return true;
189 	}
190 
191 	/* Reactors in interrupt mode do not update stats,
192 	 * a thread can always fit into reactor in interrupt mode. */
193 	if (dst->busy + dst->idle == 0) {
194 		return true;
195 	}
196 
197 	/* Core has no threads. */
198 	if (dst->thread_count == 0) {
199 		return true;
200 	}
201 
202 	/* Core doesn't have enough idle_tsc to take this thread. */
203 	if (dst->idle < thread_info->current_stats.busy_tsc) {
204 		return false;
205 	}
206 
207 	new_busy_tsc = dst->busy + thread_info->current_stats.busy_tsc;
208 	new_idle_tsc = dst->idle - thread_info->current_stats.busy_tsc;
209 
210 	/* Core cannot fit this thread if it would put it over the
211 	 * g_scheduler_core_limit. */
212 	return _busy_pct(new_busy_tsc, new_idle_tsc) < g_scheduler_core_limit;
213 }
214 
215 static uint32_t
216 _find_optimal_core(struct spdk_scheduler_thread_info *thread_info)
217 {
218 	uint32_t i;
219 	uint32_t current_lcore = thread_info->lcore;
220 	uint32_t least_busy_lcore = thread_info->lcore;
221 	struct spdk_thread *thread;
222 	struct spdk_cpuset *cpumask;
223 	bool core_at_limit = _is_core_at_limit(current_lcore);
224 
225 	thread = spdk_thread_get_by_id(thread_info->thread_id);
226 	if (thread == NULL) {
227 		return current_lcore;
228 	}
229 	cpumask = spdk_thread_get_cpumask(thread);
230 
231 	/* Find a core that can fit the thread. */
232 	SPDK_ENV_FOREACH_CORE(i) {
233 		/* Ignore cores outside cpumask. */
234 		if (!spdk_cpuset_get_cpu(cpumask, i)) {
235 			continue;
236 		}
237 
238 		/* Skip cores that are isolated */
239 		if (g_cores[i].isolated) {
240 			continue;
241 		}
242 
243 		/* Search for least busy core. */
244 		if (g_cores[i].busy < g_cores[least_busy_lcore].busy) {
245 			least_busy_lcore = i;
246 		}
247 
248 		/* Skip cores that cannot fit the thread and current one. */
249 		if (!_can_core_fit_thread(thread_info, i) || i == current_lcore) {
250 			continue;
251 		}
252 		if (i == g_main_lcore) {
253 			/* First consider g_main_lcore, consolidate threads on main lcore if possible. */
254 			return i;
255 		} else if (i < current_lcore && current_lcore != g_main_lcore) {
256 			/* Lower core id was found, move to consolidate threads on lowest core ids. */
257 			return i;
258 		} else if (core_at_limit) {
259 			/* When core is over the limit, any core id is better than current one. */
260 			return i;
261 		}
262 	}
263 
264 	/* For cores over the limit, place the thread on least busy core
265 	 * to balance threads. */
266 	if (core_at_limit) {
267 		return least_busy_lcore;
268 	}
269 
270 	/* If no better core is found, remain on the same one. */
271 	return current_lcore;
272 }
273 
274 static int
275 init(void)
276 {
277 	g_main_lcore = spdk_scheduler_get_scheduling_lcore();
278 
279 	if (spdk_governor_set("dpdk_governor") != 0) {
280 		SPDK_NOTICELOG("Unable to initialize dpdk governor\n");
281 	}
282 
283 	g_cores = calloc(spdk_env_get_last_core() + 1, sizeof(struct core_stats));
284 	if (g_cores == NULL) {
285 		SPDK_ERRLOG("Failed to allocate memory for dynamic scheduler core stats.\n");
286 		return -ENOMEM;
287 	}
288 
289 	return 0;
290 }
291 
292 static void
293 deinit(void)
294 {
295 	free(g_cores);
296 	g_cores = NULL;
297 	spdk_governor_set(NULL);
298 }
299 
300 static void
301 _balance_idle(struct spdk_scheduler_thread_info *thread_info)
302 {
303 	if (_get_thread_load(thread_info) >= g_scheduler_load_limit) {
304 		return;
305 	}
306 	/* This thread is idle, move it to the main core. */
307 	_move_thread(thread_info, g_main_lcore);
308 }
309 
310 static void
311 _balance_active(struct spdk_scheduler_thread_info *thread_info)
312 {
313 	uint32_t target_lcore;
314 
315 	if (_get_thread_load(thread_info) < g_scheduler_load_limit) {
316 		return;
317 	}
318 
319 	/* This thread is active. */
320 	target_lcore = _find_optimal_core(thread_info);
321 	_move_thread(thread_info, target_lcore);
322 }
323 
324 static void
325 balance(struct spdk_scheduler_core_info *cores_info, uint32_t cores_count)
326 {
327 	struct spdk_reactor *reactor;
328 	struct spdk_governor *governor;
329 	struct spdk_scheduler_core_info *core;
330 	struct core_stats *main_core;
331 	uint32_t i;
332 	int rc;
333 	bool busy_threads_present = false;
334 
335 	SPDK_DTRACE_PROBE1(dynsched_balance, cores_count);
336 
337 	SPDK_ENV_FOREACH_CORE(i) {
338 		g_cores[i].thread_count = cores_info[i].threads_count;
339 		g_cores[i].busy = cores_info[i].current_busy_tsc;
340 		g_cores[i].idle = cores_info[i].current_idle_tsc;
341 		g_cores[i].isolated = cores_info[i].isolated;
342 		SPDK_DTRACE_PROBE2(dynsched_core_info, i, &cores_info[i]);
343 	}
344 	main_core = &g_cores[g_main_lcore];
345 
346 	/* Distribute threads in two passes, to make sure updated core stats are considered on each pass.
347 	 * 1) Move all idle threads to main core. */
348 	_foreach_thread(cores_info, _balance_idle);
349 	/* 2) Distribute active threads across all cores. */
350 	_foreach_thread(cores_info, _balance_active);
351 
352 	/* Switch unused cores to interrupt mode and switch cores to polled mode
353 	 * if they will be used after rebalancing */
354 	SPDK_ENV_FOREACH_CORE(i) {
355 		reactor = spdk_reactor_get(i);
356 		assert(reactor != NULL);
357 
358 		core = &cores_info[i];
359 		/* We can switch mode only if reactor already does not have any threads */
360 		if (g_cores[i].thread_count == 0 && TAILQ_EMPTY(&reactor->threads)) {
361 			core->interrupt_mode = true;
362 			prepare_to_sleep(i);
363 		} else if (g_cores[i].thread_count != 0) {
364 			core->interrupt_mode = false;
365 			if (i != g_main_lcore) {
366 				/* If a thread is present on non g_main_lcore,
367 				 * it has to be busy. */
368 				busy_threads_present = true;
369 				prepare_to_wake(i);
370 			}
371 		}
372 	}
373 
374 	governor = spdk_governor_get();
375 	if (governor == NULL) {
376 		return;
377 	}
378 
379 	/* Change main core frequency if needed */
380 	if (busy_threads_present) {
381 		rc = governor->set_core_freq_max(g_main_lcore);
382 		if (rc < 0) {
383 			SPDK_ERRLOG("setting default frequency for core %u failed\n", g_main_lcore);
384 		}
385 	} else if (main_core->busy > main_core->idle) {
386 		rc = governor->core_freq_up(g_main_lcore);
387 		if (rc < 0) {
388 			SPDK_ERRLOG("increasing frequency for core %u failed\n", g_main_lcore);
389 		}
390 	} else {
391 		rc = governor->core_freq_down(g_main_lcore);
392 		if (rc < 0) {
393 			SPDK_ERRLOG("lowering frequency for core %u failed\n", g_main_lcore);
394 		}
395 	}
396 }
397 
398 struct json_scheduler_opts {
399 	uint8_t load_limit;
400 	uint8_t core_limit;
401 	uint8_t core_busy;
402 };
403 
404 static const struct spdk_json_object_decoder sched_decoders[] = {
405 	{"load_limit", offsetof(struct json_scheduler_opts, load_limit), spdk_json_decode_uint8, true},
406 	{"core_limit", offsetof(struct json_scheduler_opts, core_limit), spdk_json_decode_uint8, true},
407 	{"core_busy", offsetof(struct json_scheduler_opts, core_busy), spdk_json_decode_uint8, true},
408 };
409 
410 static int
411 set_opts(const struct spdk_json_val *opts)
412 {
413 	struct json_scheduler_opts scheduler_opts;
414 
415 	scheduler_opts.load_limit = g_scheduler_load_limit;
416 	scheduler_opts.core_limit = g_scheduler_core_limit;
417 	scheduler_opts.core_busy = g_scheduler_core_busy;
418 
419 	if (opts != NULL) {
420 		if (spdk_json_decode_object_relaxed(opts, sched_decoders,
421 						    SPDK_COUNTOF(sched_decoders), &scheduler_opts)) {
422 			SPDK_ERRLOG("Decoding scheduler opts JSON failed\n");
423 			return -1;
424 		}
425 	}
426 
427 	SPDK_NOTICELOG("Setting scheduler load limit to %d\n", scheduler_opts.load_limit);
428 	g_scheduler_load_limit = scheduler_opts.load_limit;
429 	SPDK_NOTICELOG("Setting scheduler core limit to %d\n", scheduler_opts.core_limit);
430 	g_scheduler_core_limit = scheduler_opts.core_limit;
431 	SPDK_NOTICELOG("Setting scheduler core busy to %d\n", scheduler_opts.core_busy);
432 	g_scheduler_core_busy = scheduler_opts.core_busy;
433 
434 	return 0;
435 }
436 
437 static void
438 get_opts(struct spdk_json_write_ctx *ctx)
439 {
440 	spdk_json_write_named_uint8(ctx, "load_limit", g_scheduler_load_limit);
441 	spdk_json_write_named_uint8(ctx, "core_limit", g_scheduler_core_limit);
442 	spdk_json_write_named_uint8(ctx, "core_busy", g_scheduler_core_busy);
443 }
444 
445 static struct spdk_scheduler scheduler_dynamic = {
446 	.name = "dynamic",
447 	.init = init,
448 	.deinit = deinit,
449 	.balance = balance,
450 	.set_opts = set_opts,
451 	.get_opts = get_opts,
452 };
453 
454 SPDK_SCHEDULER_REGISTER(scheduler_dynamic);
455