1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2021 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/likely.h" 8 #include "spdk/event.h" 9 #include "spdk/log.h" 10 #include "spdk/env.h" 11 12 #include "spdk/thread.h" 13 #include "spdk_internal/event.h" 14 #include "spdk/scheduler.h" 15 #include "spdk_internal/usdt.h" 16 17 static uint32_t g_main_lcore; 18 19 struct core_stats { 20 uint64_t busy; 21 uint64_t idle; 22 uint32_t thread_count; 23 }; 24 25 static struct core_stats *g_cores; 26 27 uint8_t g_scheduler_load_limit = 20; 28 uint8_t g_scheduler_core_limit = 80; 29 uint8_t g_scheduler_core_busy = 95; 30 31 static uint8_t 32 _busy_pct(uint64_t busy, uint64_t idle) 33 { 34 if ((busy + idle) == 0) { 35 return 0; 36 } 37 38 return busy * 100 / (busy + idle); 39 } 40 41 static uint8_t 42 _get_thread_load(struct spdk_scheduler_thread_info *thread_info) 43 { 44 uint64_t busy, idle; 45 46 busy = thread_info->current_stats.busy_tsc; 47 idle = thread_info->current_stats.idle_tsc; 48 49 /* return percentage of time thread was busy */ 50 return _busy_pct(busy, idle); 51 } 52 53 typedef void (*_foreach_fn)(struct spdk_scheduler_thread_info *thread_info); 54 55 static void 56 _foreach_thread(struct spdk_scheduler_core_info *cores_info, _foreach_fn fn) 57 { 58 struct spdk_scheduler_core_info *core; 59 uint32_t i, j; 60 61 SPDK_ENV_FOREACH_CORE(i) { 62 core = &cores_info[i]; 63 for (j = 0; j < core->threads_count; j++) { 64 fn(&core->thread_infos[j]); 65 } 66 } 67 } 68 69 static void 70 _move_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core) 71 { 72 struct core_stats *dst = &g_cores[dst_core]; 73 struct core_stats *src = &g_cores[thread_info->lcore]; 74 uint64_t busy_tsc = thread_info->current_stats.busy_tsc; 75 uint8_t busy_pct = _busy_pct(src->busy, src->idle); 76 uint64_t tsc; 77 78 SPDK_DTRACE_PROBE2(dynsched_move, thread_info, dst_core); 79 80 if (src == dst) { 81 /* Don't modify stats if thread is already on that core. */ 82 return; 83 } 84 85 dst->busy += spdk_min(UINT64_MAX - dst->busy, busy_tsc); 86 dst->idle -= spdk_min(dst->idle, busy_tsc); 87 dst->thread_count++; 88 89 /* Adjust busy/idle from core as if thread was not present on it. 90 * Core load will reflect the sum of all remaining threads on it. */ 91 src->busy -= spdk_min(src->busy, busy_tsc); 92 src->idle += spdk_min(UINT64_MAX - src->idle, busy_tsc); 93 94 if (busy_pct >= g_scheduler_core_busy && 95 _busy_pct(src->busy, src->idle) < g_scheduler_core_limit) { 96 /* This core was so busy that we cannot assume all of busy_tsc 97 * consumed by the moved thread will now be idle_tsc - it's 98 * very possible the remaining threads will use these cycles 99 * as busy_tsc. 100 * 101 * So make sure we don't drop the updated estimate below 102 * g_scheduler_core_limit, so that other cores can't 103 * move threads to this core during this scheduling 104 * period. 105 */ 106 tsc = src->busy + src->idle; 107 src->busy = tsc * g_scheduler_core_limit / 100; 108 src->idle = tsc - src->busy; 109 } 110 assert(src->thread_count > 0); 111 src->thread_count--; 112 113 thread_info->lcore = dst_core; 114 } 115 116 static bool 117 _is_core_at_limit(uint32_t core_id) 118 { 119 struct core_stats *core = &g_cores[core_id]; 120 uint64_t busy, idle; 121 122 /* Core with no or single thread cannot be over the limit. */ 123 if (core->thread_count <= 1) { 124 return false; 125 } 126 127 busy = core->busy; 128 idle = core->idle; 129 130 /* No work was done, exit before possible division by 0. */ 131 if (busy == 0) { 132 return false; 133 } 134 135 /* Work done was less than the limit */ 136 if (_busy_pct(busy, idle) < g_scheduler_core_limit) { 137 return false; 138 } 139 140 return true; 141 } 142 143 static bool 144 _can_core_fit_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core) 145 { 146 struct core_stats *dst = &g_cores[dst_core]; 147 uint64_t new_busy_tsc, new_idle_tsc; 148 149 /* Thread can always fit on the core it's currently on. */ 150 if (thread_info->lcore == dst_core) { 151 return true; 152 } 153 154 /* Reactors in interrupt mode do not update stats, 155 * a thread can always fit into reactor in interrupt mode. */ 156 if (dst->busy + dst->idle == 0) { 157 return true; 158 } 159 160 /* Core has no threads. */ 161 if (dst->thread_count == 0) { 162 return true; 163 } 164 165 /* Core doesn't have enough idle_tsc to take this thread. */ 166 if (dst->idle < thread_info->current_stats.busy_tsc) { 167 return false; 168 } 169 170 new_busy_tsc = dst->busy + thread_info->current_stats.busy_tsc; 171 new_idle_tsc = dst->idle - thread_info->current_stats.busy_tsc; 172 173 /* Core cannot fit this thread if it would put it over the 174 * g_scheduler_core_limit. */ 175 return _busy_pct(new_busy_tsc, new_idle_tsc) < g_scheduler_core_limit; 176 } 177 178 static uint32_t 179 _find_optimal_core(struct spdk_scheduler_thread_info *thread_info) 180 { 181 uint32_t i; 182 uint32_t current_lcore = thread_info->lcore; 183 uint32_t least_busy_lcore = thread_info->lcore; 184 struct spdk_thread *thread; 185 struct spdk_cpuset *cpumask; 186 bool core_at_limit = _is_core_at_limit(current_lcore); 187 188 thread = spdk_thread_get_by_id(thread_info->thread_id); 189 if (thread == NULL) { 190 return current_lcore; 191 } 192 cpumask = spdk_thread_get_cpumask(thread); 193 194 /* Find a core that can fit the thread. */ 195 SPDK_ENV_FOREACH_CORE(i) { 196 /* Ignore cores outside cpumask. */ 197 if (!spdk_cpuset_get_cpu(cpumask, i)) { 198 continue; 199 } 200 201 /* Search for least busy core. */ 202 if (g_cores[i].busy < g_cores[least_busy_lcore].busy) { 203 least_busy_lcore = i; 204 } 205 206 /* Skip cores that cannot fit the thread and current one. */ 207 if (!_can_core_fit_thread(thread_info, i) || i == current_lcore) { 208 continue; 209 } 210 if (i == g_main_lcore) { 211 /* First consider g_main_lcore, consolidate threads on main lcore if possible. */ 212 return i; 213 } else if (i < current_lcore && current_lcore != g_main_lcore) { 214 /* Lower core id was found, move to consolidate threads on lowest core ids. */ 215 return i; 216 } else if (core_at_limit) { 217 /* When core is over the limit, any core id is better than current one. */ 218 return i; 219 } 220 } 221 222 /* For cores over the limit, place the thread on least busy core 223 * to balance threads. */ 224 if (core_at_limit) { 225 return least_busy_lcore; 226 } 227 228 /* If no better core is found, remain on the same one. */ 229 return current_lcore; 230 } 231 232 static int 233 init(void) 234 { 235 g_main_lcore = spdk_env_get_current_core(); 236 237 if (spdk_governor_set("dpdk_governor") != 0) { 238 SPDK_NOTICELOG("Unable to initialize dpdk governor\n"); 239 } 240 241 g_cores = calloc(spdk_env_get_last_core() + 1, sizeof(struct core_stats)); 242 if (g_cores == NULL) { 243 SPDK_ERRLOG("Failed to allocate memory for dynamic scheduler core stats.\n"); 244 return -ENOMEM; 245 } 246 247 return 0; 248 } 249 250 static void 251 deinit(void) 252 { 253 free(g_cores); 254 g_cores = NULL; 255 spdk_governor_set(NULL); 256 } 257 258 static void 259 _balance_idle(struct spdk_scheduler_thread_info *thread_info) 260 { 261 if (_get_thread_load(thread_info) >= g_scheduler_load_limit) { 262 return; 263 } 264 /* This thread is idle, move it to the main core. */ 265 _move_thread(thread_info, g_main_lcore); 266 } 267 268 static void 269 _balance_active(struct spdk_scheduler_thread_info *thread_info) 270 { 271 uint32_t target_lcore; 272 273 if (_get_thread_load(thread_info) < g_scheduler_load_limit) { 274 return; 275 } 276 277 /* This thread is active. */ 278 target_lcore = _find_optimal_core(thread_info); 279 _move_thread(thread_info, target_lcore); 280 } 281 282 static void 283 balance(struct spdk_scheduler_core_info *cores_info, uint32_t cores_count) 284 { 285 struct spdk_reactor *reactor; 286 struct spdk_governor *governor; 287 struct spdk_scheduler_core_info *core; 288 struct core_stats *main_core; 289 uint32_t i; 290 int rc; 291 bool busy_threads_present = false; 292 293 SPDK_DTRACE_PROBE1(dynsched_balance, cores_count); 294 295 SPDK_ENV_FOREACH_CORE(i) { 296 g_cores[i].thread_count = cores_info[i].threads_count; 297 g_cores[i].busy = cores_info[i].current_busy_tsc; 298 g_cores[i].idle = cores_info[i].current_idle_tsc; 299 SPDK_DTRACE_PROBE2(dynsched_core_info, i, &cores_info[i]); 300 } 301 main_core = &g_cores[g_main_lcore]; 302 303 /* Distribute threads in two passes, to make sure updated core stats are considered on each pass. 304 * 1) Move all idle threads to main core. */ 305 _foreach_thread(cores_info, _balance_idle); 306 /* 2) Distribute active threads across all cores. */ 307 _foreach_thread(cores_info, _balance_active); 308 309 /* Switch unused cores to interrupt mode and switch cores to polled mode 310 * if they will be used after rebalancing */ 311 SPDK_ENV_FOREACH_CORE(i) { 312 reactor = spdk_reactor_get(i); 313 assert(reactor != NULL); 314 315 core = &cores_info[i]; 316 /* We can switch mode only if reactor already does not have any threads */ 317 if (g_cores[i].thread_count == 0 && TAILQ_EMPTY(&reactor->threads)) { 318 core->interrupt_mode = true; 319 } else if (g_cores[i].thread_count != 0) { 320 core->interrupt_mode = false; 321 if (i != g_main_lcore) { 322 /* If a thread is present on non g_main_lcore, 323 * it has to be busy. */ 324 busy_threads_present = true; 325 } 326 } 327 } 328 329 governor = spdk_governor_get(); 330 if (governor == NULL) { 331 return; 332 } 333 334 /* Change main core frequency if needed */ 335 if (busy_threads_present) { 336 rc = governor->set_core_freq_max(g_main_lcore); 337 if (rc < 0) { 338 SPDK_ERRLOG("setting default frequency for core %u failed\n", g_main_lcore); 339 } 340 } else if (main_core->busy > main_core->idle) { 341 rc = governor->core_freq_up(g_main_lcore); 342 if (rc < 0) { 343 SPDK_ERRLOG("increasing frequency for core %u failed\n", g_main_lcore); 344 } 345 } else { 346 rc = governor->core_freq_down(g_main_lcore); 347 if (rc < 0) { 348 SPDK_ERRLOG("lowering frequency for core %u failed\n", g_main_lcore); 349 } 350 } 351 } 352 353 struct json_scheduler_opts { 354 uint8_t load_limit; 355 uint8_t core_limit; 356 uint8_t core_busy; 357 }; 358 359 static const struct spdk_json_object_decoder sched_decoders[] = { 360 {"load_limit", offsetof(struct json_scheduler_opts, load_limit), spdk_json_decode_uint8, true}, 361 {"core_limit", offsetof(struct json_scheduler_opts, core_limit), spdk_json_decode_uint8, true}, 362 {"core_busy", offsetof(struct json_scheduler_opts, core_busy), spdk_json_decode_uint8, true}, 363 }; 364 365 static int 366 set_opts(const struct spdk_json_val *opts) 367 { 368 struct json_scheduler_opts scheduler_opts; 369 370 scheduler_opts.load_limit = g_scheduler_load_limit; 371 scheduler_opts.core_limit = g_scheduler_core_limit; 372 scheduler_opts.core_busy = g_scheduler_core_busy; 373 374 if (opts != NULL) { 375 if (spdk_json_decode_object_relaxed(opts, sched_decoders, 376 SPDK_COUNTOF(sched_decoders), &scheduler_opts)) { 377 SPDK_ERRLOG("Decoding scheduler opts JSON failed\n"); 378 return -1; 379 } 380 } 381 382 SPDK_NOTICELOG("Setting scheduler load limit to %d\n", scheduler_opts.load_limit); 383 g_scheduler_load_limit = scheduler_opts.load_limit; 384 SPDK_NOTICELOG("Setting scheduler core limit to %d\n", scheduler_opts.core_limit); 385 g_scheduler_core_limit = scheduler_opts.core_limit; 386 SPDK_NOTICELOG("Setting scheduler core busy to %d\n", scheduler_opts.core_busy); 387 g_scheduler_core_busy = scheduler_opts.core_busy; 388 389 return 0; 390 } 391 392 static void 393 get_opts(struct spdk_json_write_ctx *ctx) 394 { 395 spdk_json_write_named_uint8(ctx, "load_limit", g_scheduler_load_limit); 396 spdk_json_write_named_uint8(ctx, "core_limit", g_scheduler_core_limit); 397 spdk_json_write_named_uint8(ctx, "core_busy", g_scheduler_core_busy); 398 } 399 400 static struct spdk_scheduler scheduler_dynamic = { 401 .name = "dynamic", 402 .init = init, 403 .deinit = deinit, 404 .balance = balance, 405 .set_opts = set_opts, 406 .get_opts = get_opts, 407 }; 408 409 SPDK_SCHEDULER_REGISTER(scheduler_dynamic); 410