1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/likely.h" 36 37 #include "spdk_internal/event.h" 38 #include "spdk_internal/log.h" 39 #include "spdk_internal/thread.h" 40 41 #include "spdk/log.h" 42 #include "spdk/thread.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 46 #define SPDK_EVENT_BATCH_SIZE 8 47 48 enum spdk_reactor_state { 49 SPDK_REACTOR_STATE_INVALID = 0, 50 SPDK_REACTOR_STATE_INITIALIZED = 1, 51 SPDK_REACTOR_STATE_RUNNING = 2, 52 SPDK_REACTOR_STATE_EXITING = 3, 53 SPDK_REACTOR_STATE_SHUTDOWN = 4, 54 }; 55 56 struct spdk_lw_thread { 57 TAILQ_ENTRY(spdk_lw_thread) link; 58 }; 59 60 struct spdk_reactor { 61 /* Logical core number for this reactor. */ 62 uint32_t lcore; 63 64 /* Lightweight threads running on this reactor */ 65 TAILQ_HEAD(, spdk_lw_thread) threads; 66 67 /* Poller for get the rusage for the reactor. */ 68 struct spdk_poller *rusage_poller; 69 70 /* The last known rusage values */ 71 struct rusage rusage; 72 73 struct spdk_ring *events; 74 } __attribute__((aligned(64))); 75 76 static struct spdk_reactor *g_reactors; 77 78 static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_INVALID; 79 80 static bool g_context_switch_monitor_enabled = true; 81 82 static void spdk_reactor_construct(struct spdk_reactor *w, uint32_t lcore); 83 84 static struct spdk_mempool *g_spdk_event_mempool = NULL; 85 86 static struct spdk_cpuset *g_spdk_app_core_mask; 87 88 static struct spdk_reactor * 89 spdk_reactor_get(uint32_t lcore) 90 { 91 struct spdk_reactor *reactor; 92 reactor = spdk_likely(g_reactors) ? &g_reactors[lcore] : NULL; 93 return reactor; 94 } 95 96 struct spdk_event * 97 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) 98 { 99 struct spdk_event *event = NULL; 100 struct spdk_reactor *reactor = spdk_reactor_get(lcore); 101 102 if (!reactor) { 103 assert(false); 104 return NULL; 105 } 106 107 event = spdk_mempool_get(g_spdk_event_mempool); 108 if (event == NULL) { 109 assert(false); 110 return NULL; 111 } 112 113 event->lcore = lcore; 114 event->fn = fn; 115 event->arg1 = arg1; 116 event->arg2 = arg2; 117 118 return event; 119 } 120 121 void 122 spdk_event_call(struct spdk_event *event) 123 { 124 int rc; 125 struct spdk_reactor *reactor; 126 127 reactor = spdk_reactor_get(event->lcore); 128 129 assert(reactor->events != NULL); 130 rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1); 131 if (rc != 1) { 132 assert(false); 133 } 134 } 135 136 static inline uint32_t 137 _spdk_event_queue_run_batch(struct spdk_reactor *reactor) 138 { 139 unsigned count, i; 140 void *events[SPDK_EVENT_BATCH_SIZE]; 141 struct spdk_thread *thread; 142 struct spdk_lw_thread *lw_thread; 143 144 #ifdef DEBUG 145 /* 146 * spdk_ring_dequeue() fills events and returns how many entries it wrote, 147 * so we will never actually read uninitialized data from events, but just to be sure 148 * (and to silence a static analyzer false positive), initialize the array to NULL pointers. 149 */ 150 memset(events, 0, sizeof(events)); 151 #endif 152 153 count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); 154 if (count == 0) { 155 return 0; 156 } 157 158 /* Execute the events. There are still some remaining events 159 * that must occur on an SPDK thread. To accomodate those, try to 160 * run them on the first thread in the list, if it exists. */ 161 lw_thread = TAILQ_FIRST(&reactor->threads); 162 if (lw_thread) { 163 thread = spdk_thread_get_from_ctx(lw_thread); 164 } else { 165 thread = NULL; 166 } 167 168 spdk_set_thread(thread); 169 170 for (i = 0; i < count; i++) { 171 struct spdk_event *event = events[i]; 172 173 assert(event != NULL); 174 event->fn(event->arg1, event->arg2); 175 } 176 177 spdk_set_thread(NULL); 178 179 spdk_mempool_put_bulk(g_spdk_event_mempool, events, count); 180 181 return count; 182 } 183 184 #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000 185 186 static int 187 get_rusage(struct spdk_reactor *reactor) 188 { 189 struct rusage rusage; 190 191 if (getrusage(RUSAGE_THREAD, &rusage) != 0) { 192 return -1; 193 } 194 195 if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) { 196 SPDK_INFOLOG(SPDK_LOG_REACTOR, 197 "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n", 198 reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw, 199 rusage.ru_nivcsw - reactor->rusage.ru_nivcsw); 200 } 201 reactor->rusage = rusage; 202 203 return -1; 204 } 205 206 void 207 spdk_reactor_enable_context_switch_monitor(bool enable) 208 { 209 /* This global is being read by multiple threads, so this isn't 210 * strictly thread safe. However, we're toggling between true and 211 * false here, and if a thread sees the value update later than it 212 * should, it's no big deal. */ 213 g_context_switch_monitor_enabled = enable; 214 } 215 216 bool 217 spdk_reactor_context_switch_monitor_enabled(void) 218 { 219 return g_context_switch_monitor_enabled; 220 } 221 222 static int 223 _spdk_reactor_run(void *arg) 224 { 225 struct spdk_reactor *reactor = arg; 226 struct spdk_thread *thread; 227 uint64_t last_rusage = 0; 228 struct spdk_lw_thread *lw_thread, *tmp; 229 230 SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); 231 232 while (1) { 233 uint64_t now; 234 235 /* For each loop through the reactor, capture the time. This time 236 * is used for all threads. */ 237 now = spdk_get_ticks(); 238 239 _spdk_event_queue_run_batch(reactor); 240 241 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 242 thread = spdk_thread_get_from_ctx(lw_thread); 243 244 spdk_thread_poll(thread, 0, now); 245 } 246 247 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 248 break; 249 } 250 251 if (g_context_switch_monitor_enabled) { 252 if ((last_rusage + CONTEXT_SWITCH_MONITOR_PERIOD) < now) { 253 get_rusage(reactor); 254 last_rusage = now; 255 } 256 } 257 } 258 259 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 260 thread = spdk_thread_get_from_ctx(lw_thread); 261 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 262 spdk_thread_exit(thread); 263 } 264 265 return 0; 266 } 267 268 static void 269 spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) 270 { 271 reactor->lcore = lcore; 272 273 TAILQ_INIT(&reactor->threads); 274 275 reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); 276 assert(reactor->events != NULL); 277 } 278 279 int 280 spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask) 281 { 282 int ret; 283 struct spdk_cpuset *validmask; 284 285 ret = spdk_cpuset_parse(cpumask, mask); 286 if (ret < 0) { 287 return ret; 288 } 289 290 validmask = spdk_app_get_core_mask(); 291 spdk_cpuset_and(cpumask, validmask); 292 293 return 0; 294 } 295 296 struct spdk_cpuset * 297 spdk_app_get_core_mask(void) 298 { 299 return g_spdk_app_core_mask; 300 } 301 302 void 303 spdk_reactors_start(void) 304 { 305 struct spdk_reactor *reactor; 306 uint32_t i, current_core; 307 int rc; 308 char thread_name[32]; 309 310 g_reactor_state = SPDK_REACTOR_STATE_RUNNING; 311 g_spdk_app_core_mask = spdk_cpuset_alloc(); 312 313 current_core = spdk_env_get_current_core(); 314 SPDK_ENV_FOREACH_CORE(i) { 315 if (i != current_core) { 316 reactor = spdk_reactor_get(i); 317 rc = spdk_env_thread_launch_pinned(reactor->lcore, _spdk_reactor_run, reactor); 318 if (rc < 0) { 319 SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore); 320 assert(false); 321 return; 322 } 323 324 /* For now, for each reactor spawn one thread. */ 325 snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); 326 spdk_thread_create(thread_name); 327 } 328 spdk_cpuset_set_cpu(g_spdk_app_core_mask, i, true); 329 } 330 331 /* Start the master reactor */ 332 reactor = spdk_reactor_get(current_core); 333 _spdk_reactor_run(reactor); 334 335 spdk_env_thread_wait_all(); 336 337 g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN; 338 spdk_cpuset_free(g_spdk_app_core_mask); 339 g_spdk_app_core_mask = NULL; 340 } 341 342 void 343 spdk_reactors_stop(void *arg1) 344 { 345 g_reactor_state = SPDK_REACTOR_STATE_EXITING; 346 } 347 348 static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; 349 static uint32_t g_next_core = UINT32_MAX; 350 351 static void 352 _schedule_thread(void *arg1, void *arg2) 353 { 354 struct spdk_lw_thread *lw_thread = arg1; 355 struct spdk_reactor *reactor; 356 357 reactor = spdk_reactor_get(spdk_env_get_current_core()); 358 359 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 360 } 361 362 static void 363 spdk_reactor_schedule_thread(struct spdk_thread *thread) 364 { 365 uint32_t core; 366 struct spdk_lw_thread *lw_thread; 367 struct spdk_event *evt; 368 369 lw_thread = spdk_thread_get_ctx(thread); 370 assert(lw_thread != NULL); 371 memset(lw_thread, 0, sizeof(*lw_thread)); 372 373 pthread_mutex_lock(&g_scheduler_mtx); 374 if (g_next_core > spdk_env_get_last_core()) { 375 g_next_core = spdk_env_get_first_core(); 376 } 377 core = g_next_core; 378 g_next_core = spdk_env_get_next_core(g_next_core); 379 pthread_mutex_unlock(&g_scheduler_mtx); 380 381 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 382 spdk_event_call(evt); 383 } 384 385 int 386 spdk_reactors_init(void) 387 { 388 int rc; 389 uint32_t i, last_core; 390 struct spdk_reactor *reactor; 391 char mempool_name[32]; 392 393 snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid()); 394 g_spdk_event_mempool = spdk_mempool_create(mempool_name, 395 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ 396 sizeof(struct spdk_event), 397 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 398 SPDK_ENV_SOCKET_ID_ANY); 399 400 if (g_spdk_event_mempool == NULL) { 401 SPDK_ERRLOG("spdk_event_mempool creation failed\n"); 402 return -1; 403 } 404 405 /* struct spdk_reactor must be aligned on 64 byte boundary */ 406 last_core = spdk_env_get_last_core(); 407 rc = posix_memalign((void **)&g_reactors, 64, 408 (last_core + 1) * sizeof(struct spdk_reactor)); 409 if (rc != 0) { 410 SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", 411 last_core + 1); 412 spdk_mempool_free(g_spdk_event_mempool); 413 return -1; 414 } 415 416 memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor)); 417 418 spdk_thread_lib_init(spdk_reactor_schedule_thread, sizeof(struct spdk_lw_thread)); 419 420 SPDK_ENV_FOREACH_CORE(i) { 421 reactor = spdk_reactor_get(i); 422 spdk_reactor_construct(reactor, i); 423 } 424 425 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 426 427 return 0; 428 } 429 430 void 431 spdk_reactors_fini(void) 432 { 433 uint32_t i; 434 struct spdk_reactor *reactor; 435 436 spdk_thread_lib_fini(); 437 438 SPDK_ENV_FOREACH_CORE(i) { 439 reactor = spdk_reactor_get(i); 440 if (spdk_likely(reactor != NULL) && reactor->events != NULL) { 441 spdk_ring_free(reactor->events); 442 } 443 } 444 445 spdk_mempool_free(g_spdk_event_mempool); 446 447 free(g_reactors); 448 g_reactors = NULL; 449 } 450 451 SPDK_LOG_REGISTER_COMPONENT("reactor", SPDK_LOG_REACTOR) 452