xref: /spdk/module/scheduler/dynamic/scheduler_dynamic.c (revision 588dfe314bb83d86effdf67ec42837b11c2620bf)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2021 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/likely.h"
8 #include "spdk/event.h"
9 #include "spdk/log.h"
10 #include "spdk/env.h"
11 
12 #include "spdk/thread.h"
13 #include "spdk_internal/event.h"
14 #include "spdk/scheduler.h"
15 #include "spdk_internal/usdt.h"
16 
17 static uint32_t g_main_lcore;
18 
19 struct core_stats {
20 	uint64_t busy;
21 	uint64_t idle;
22 	uint32_t thread_count;
23 };
24 
25 static struct core_stats *g_cores;
26 
27 uint8_t g_scheduler_load_limit = 20;
28 uint8_t g_scheduler_core_limit = 80;
29 uint8_t g_scheduler_core_busy = 95;
30 
31 static uint8_t
32 _busy_pct(uint64_t busy, uint64_t idle)
33 {
34 	if ((busy + idle) == 0) {
35 		return 0;
36 	}
37 
38 	return busy * 100 / (busy + idle);
39 }
40 
41 static uint8_t
42 _get_thread_load(struct spdk_scheduler_thread_info *thread_info)
43 {
44 	uint64_t busy, idle;
45 
46 	busy = thread_info->current_stats.busy_tsc;
47 	idle = thread_info->current_stats.idle_tsc;
48 
49 	/* return percentage of time thread was busy */
50 	return _busy_pct(busy, idle);
51 }
52 
53 typedef void (*_foreach_fn)(struct spdk_scheduler_thread_info *thread_info);
54 
55 static void
56 _foreach_thread(struct spdk_scheduler_core_info *cores_info, _foreach_fn fn)
57 {
58 	struct spdk_scheduler_core_info *core;
59 	uint32_t i, j;
60 
61 	SPDK_ENV_FOREACH_CORE(i) {
62 		core = &cores_info[i];
63 		for (j = 0; j < core->threads_count; j++) {
64 			fn(&core->thread_infos[j]);
65 		}
66 	}
67 }
68 
69 static void
70 _move_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core)
71 {
72 	struct core_stats *dst = &g_cores[dst_core];
73 	struct core_stats *src = &g_cores[thread_info->lcore];
74 	uint64_t busy_tsc = thread_info->current_stats.busy_tsc;
75 	uint8_t busy_pct = _busy_pct(src->busy, src->idle);
76 	uint64_t tsc;
77 
78 	SPDK_DTRACE_PROBE2(dynsched_move, thread_info, dst_core);
79 
80 	if (src == dst) {
81 		/* Don't modify stats if thread is already on that core. */
82 		return;
83 	}
84 
85 	dst->busy += spdk_min(UINT64_MAX - dst->busy, busy_tsc);
86 	dst->idle -= spdk_min(dst->idle, busy_tsc);
87 	dst->thread_count++;
88 
89 	/* Adjust busy/idle from core as if thread was not present on it.
90 	 * Core load will reflect the sum of all remaining threads on it. */
91 	src->busy -= spdk_min(src->busy, busy_tsc);
92 	src->idle += spdk_min(UINT64_MAX - src->idle, busy_tsc);
93 
94 	if (busy_pct >= g_scheduler_core_busy &&
95 	    _busy_pct(src->busy, src->idle) < g_scheduler_core_limit) {
96 		/* This core was so busy that we cannot assume all of busy_tsc
97 		 * consumed by the moved thread will now be idle_tsc - it's
98 		 * very possible the remaining threads will use these cycles
99 		 * as busy_tsc.
100 		 *
101 		 * So make sure we don't drop the updated estimate below
102 		 * g_scheduler_core_limit, so that other cores can't
103 		 * move threads to this core during this scheduling
104 		 * period.
105 		 */
106 		tsc = src->busy + src->idle;
107 		src->busy = tsc * g_scheduler_core_limit / 100;
108 		src->idle = tsc - src->busy;
109 	}
110 	assert(src->thread_count > 0);
111 	src->thread_count--;
112 
113 	thread_info->lcore = dst_core;
114 }
115 
116 static bool
117 _is_core_at_limit(uint32_t core_id)
118 {
119 	struct core_stats *core = &g_cores[core_id];
120 	uint64_t busy, idle;
121 
122 	/* Core with no or single thread cannot be over the limit. */
123 	if (core->thread_count <= 1) {
124 		return false;
125 	}
126 
127 	busy = core->busy;
128 	idle = core->idle;
129 
130 	/* No work was done, exit before possible division by 0. */
131 	if (busy == 0) {
132 		return false;
133 	}
134 
135 	/* Work done was less than the limit */
136 	if (_busy_pct(busy, idle) < g_scheduler_core_limit) {
137 		return false;
138 	}
139 
140 	return true;
141 }
142 
143 static bool
144 _can_core_fit_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core)
145 {
146 	struct core_stats *dst = &g_cores[dst_core];
147 	uint64_t new_busy_tsc, new_idle_tsc;
148 
149 	/* Thread can always fit on the core it's currently on. */
150 	if (thread_info->lcore == dst_core) {
151 		return true;
152 	}
153 
154 	/* Reactors in interrupt mode do not update stats,
155 	 * a thread can always fit into reactor in interrupt mode. */
156 	if (dst->busy + dst->idle == 0) {
157 		return true;
158 	}
159 
160 	/* Core has no threads. */
161 	if (dst->thread_count == 0) {
162 		return true;
163 	}
164 
165 	/* Core doesn't have enough idle_tsc to take this thread. */
166 	if (dst->idle < thread_info->current_stats.busy_tsc) {
167 		return false;
168 	}
169 
170 	new_busy_tsc = dst->busy + thread_info->current_stats.busy_tsc;
171 	new_idle_tsc = dst->idle - thread_info->current_stats.busy_tsc;
172 
173 	/* Core cannot fit this thread if it would put it over the
174 	 * g_scheduler_core_limit. */
175 	return _busy_pct(new_busy_tsc, new_idle_tsc) < g_scheduler_core_limit;
176 }
177 
178 static uint32_t
179 _find_optimal_core(struct spdk_scheduler_thread_info *thread_info)
180 {
181 	uint32_t i;
182 	uint32_t current_lcore = thread_info->lcore;
183 	uint32_t least_busy_lcore = thread_info->lcore;
184 	struct spdk_thread *thread;
185 	struct spdk_cpuset *cpumask;
186 	bool core_at_limit = _is_core_at_limit(current_lcore);
187 
188 	thread = spdk_thread_get_by_id(thread_info->thread_id);
189 	if (thread == NULL) {
190 		return current_lcore;
191 	}
192 	cpumask = spdk_thread_get_cpumask(thread);
193 
194 	/* Find a core that can fit the thread. */
195 	SPDK_ENV_FOREACH_CORE(i) {
196 		/* Ignore cores outside cpumask. */
197 		if (!spdk_cpuset_get_cpu(cpumask, i)) {
198 			continue;
199 		}
200 
201 		/* Search for least busy core. */
202 		if (g_cores[i].busy < g_cores[least_busy_lcore].busy) {
203 			least_busy_lcore = i;
204 		}
205 
206 		/* Skip cores that cannot fit the thread and current one. */
207 		if (!_can_core_fit_thread(thread_info, i) || i == current_lcore) {
208 			continue;
209 		}
210 		if (i == g_main_lcore) {
211 			/* First consider g_main_lcore, consolidate threads on main lcore if possible. */
212 			return i;
213 		} else if (i < current_lcore && current_lcore != g_main_lcore) {
214 			/* Lower core id was found, move to consolidate threads on lowest core ids. */
215 			return i;
216 		} else if (core_at_limit) {
217 			/* When core is over the limit, any core id is better than current one. */
218 			return i;
219 		}
220 	}
221 
222 	/* For cores over the limit, place the thread on least busy core
223 	 * to balance threads. */
224 	if (core_at_limit) {
225 		return least_busy_lcore;
226 	}
227 
228 	/* If no better core is found, remain on the same one. */
229 	return current_lcore;
230 }
231 
232 static int
233 init(void)
234 {
235 	g_main_lcore = spdk_env_get_current_core();
236 
237 	if (spdk_governor_set("dpdk_governor") != 0) {
238 		SPDK_NOTICELOG("Unable to initialize dpdk governor\n");
239 	}
240 
241 	g_cores = calloc(spdk_env_get_last_core() + 1, sizeof(struct core_stats));
242 	if (g_cores == NULL) {
243 		SPDK_ERRLOG("Failed to allocate memory for dynamic scheduler core stats.\n");
244 		return -ENOMEM;
245 	}
246 
247 	if (spdk_scheduler_get_period() == 0) {
248 		/* set default scheduling period to one second */
249 		spdk_scheduler_set_period(SPDK_SEC_TO_USEC);
250 	}
251 
252 	return 0;
253 }
254 
255 static void
256 deinit(void)
257 {
258 	free(g_cores);
259 	g_cores = NULL;
260 	spdk_governor_set(NULL);
261 }
262 
263 static void
264 _balance_idle(struct spdk_scheduler_thread_info *thread_info)
265 {
266 	if (_get_thread_load(thread_info) >= g_scheduler_load_limit) {
267 		return;
268 	}
269 	/* This thread is idle, move it to the main core. */
270 	_move_thread(thread_info, g_main_lcore);
271 }
272 
273 static void
274 _balance_active(struct spdk_scheduler_thread_info *thread_info)
275 {
276 	uint32_t target_lcore;
277 
278 	if (_get_thread_load(thread_info) < g_scheduler_load_limit) {
279 		return;
280 	}
281 
282 	/* This thread is active. */
283 	target_lcore = _find_optimal_core(thread_info);
284 	_move_thread(thread_info, target_lcore);
285 }
286 
287 static void
288 balance(struct spdk_scheduler_core_info *cores_info, uint32_t cores_count)
289 {
290 	struct spdk_reactor *reactor;
291 	struct spdk_governor *governor;
292 	struct spdk_scheduler_core_info *core;
293 	struct core_stats *main_core;
294 	uint32_t i;
295 	int rc;
296 	bool busy_threads_present = false;
297 
298 	SPDK_DTRACE_PROBE1(dynsched_balance, cores_count);
299 
300 	SPDK_ENV_FOREACH_CORE(i) {
301 		g_cores[i].thread_count = cores_info[i].threads_count;
302 		g_cores[i].busy = cores_info[i].current_busy_tsc;
303 		g_cores[i].idle = cores_info[i].current_idle_tsc;
304 		SPDK_DTRACE_PROBE2(dynsched_core_info, i, &cores_info[i]);
305 	}
306 	main_core = &g_cores[g_main_lcore];
307 
308 	/* Distribute threads in two passes, to make sure updated core stats are considered on each pass.
309 	 * 1) Move all idle threads to main core. */
310 	_foreach_thread(cores_info, _balance_idle);
311 	/* 2) Distribute active threads across all cores. */
312 	_foreach_thread(cores_info, _balance_active);
313 
314 	/* Switch unused cores to interrupt mode and switch cores to polled mode
315 	 * if they will be used after rebalancing */
316 	SPDK_ENV_FOREACH_CORE(i) {
317 		reactor = spdk_reactor_get(i);
318 		core = &cores_info[i];
319 		/* We can switch mode only if reactor already does not have any threads */
320 		if (g_cores[i].thread_count == 0 && TAILQ_EMPTY(&reactor->threads)) {
321 			core->interrupt_mode = true;
322 		} else if (g_cores[i].thread_count != 0) {
323 			core->interrupt_mode = false;
324 			if (i != g_main_lcore) {
325 				/* If a thread is present on non g_main_lcore,
326 				 * it has to be busy. */
327 				busy_threads_present = true;
328 			}
329 		}
330 	}
331 
332 	governor = spdk_governor_get();
333 	if (governor == NULL) {
334 		return;
335 	}
336 
337 	/* Change main core frequency if needed */
338 	if (busy_threads_present) {
339 		rc = governor->set_core_freq_max(g_main_lcore);
340 		if (rc < 0) {
341 			SPDK_ERRLOG("setting default frequency for core %u failed\n", g_main_lcore);
342 		}
343 	} else if (main_core->busy > main_core->idle) {
344 		rc = governor->core_freq_up(g_main_lcore);
345 		if (rc < 0) {
346 			SPDK_ERRLOG("increasing frequency for core %u failed\n", g_main_lcore);
347 		}
348 	} else {
349 		rc = governor->core_freq_down(g_main_lcore);
350 		if (rc < 0) {
351 			SPDK_ERRLOG("lowering frequency for core %u failed\n", g_main_lcore);
352 		}
353 	}
354 }
355 
356 struct json_scheduler_opts {
357 	uint8_t load_limit;
358 	uint8_t core_limit;
359 	uint8_t core_busy;
360 };
361 
362 static const struct spdk_json_object_decoder sched_decoders[] = {
363 	{"load_limit", offsetof(struct json_scheduler_opts, load_limit), spdk_json_decode_uint8, true},
364 	{"core_limit", offsetof(struct json_scheduler_opts, core_limit), spdk_json_decode_uint8, true},
365 	{"core_busy", offsetof(struct json_scheduler_opts, core_busy), spdk_json_decode_uint8, true},
366 };
367 
368 static int
369 set_opts(const struct spdk_json_val *opts)
370 {
371 	struct json_scheduler_opts scheduler_opts;
372 
373 	scheduler_opts.load_limit = g_scheduler_load_limit;
374 	scheduler_opts.core_limit = g_scheduler_core_limit;
375 	scheduler_opts.core_busy = g_scheduler_core_busy;
376 
377 	if (opts != NULL) {
378 		if (spdk_json_decode_object_relaxed(opts, sched_decoders,
379 						    SPDK_COUNTOF(sched_decoders), &scheduler_opts)) {
380 			SPDK_ERRLOG("Decoding scheduler opts JSON failed\n");
381 			return -1;
382 		}
383 	}
384 
385 	SPDK_NOTICELOG("Setting scheduler load limit to %d\n", scheduler_opts.load_limit);
386 	g_scheduler_load_limit = scheduler_opts.load_limit;
387 	SPDK_NOTICELOG("Setting scheduler core limit to %d\n", scheduler_opts.core_limit);
388 	g_scheduler_core_limit = scheduler_opts.core_limit;
389 	SPDK_NOTICELOG("Setting scheduler core busy to %d\n", scheduler_opts.core_busy);
390 	g_scheduler_core_busy = scheduler_opts.core_busy;
391 
392 	return 0;
393 }
394 
395 static void
396 get_opts(struct spdk_json_write_ctx *ctx)
397 {
398 	spdk_json_write_named_uint8(ctx, "load_limit", g_scheduler_load_limit);
399 	spdk_json_write_named_uint8(ctx, "core_limit", g_scheduler_core_limit);
400 	spdk_json_write_named_uint8(ctx, "core_busy", g_scheduler_core_busy);
401 }
402 
403 static struct spdk_scheduler scheduler_dynamic = {
404 	.name = "dynamic",
405 	.init = init,
406 	.deinit = deinit,
407 	.balance = balance,
408 	.set_opts = set_opts,
409 	.get_opts = get_opts,
410 };
411 
412 SPDK_SCHEDULER_REGISTER(scheduler_dynamic);
413