xref: /spdk/test/unit/lib/event/reactor.c/reactor_ut.c (revision f6866117acb32c78d5ea7bd76ba330284655af35)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "spdk_cunit.h"
9 #include "common/lib/test_env.c"
10 #include "event/reactor.c"
11 #include "spdk/thread.h"
12 #include "spdk_internal/thread.h"
13 #include "event/scheduler_static.c"
14 #include "../module/scheduler/dynamic/scheduler_dynamic.c"
15 
16 static void
17 test_create_reactor(void)
18 {
19 	struct spdk_reactor reactor = {};
20 
21 	g_reactors = &reactor;
22 	g_reactor_count = 1;
23 
24 	reactor_construct(&reactor, 0);
25 
26 	CU_ASSERT(spdk_reactor_get(0) == &reactor);
27 
28 	spdk_ring_free(reactor.events);
29 	reactor_interrupt_fini(&reactor);
30 	g_reactors = NULL;
31 }
32 
33 static void
34 test_init_reactors(void)
35 {
36 	uint32_t core;
37 
38 	MOCK_SET(spdk_env_get_current_core, 0);
39 
40 	allocate_cores(3);
41 
42 	CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0);
43 
44 	CU_ASSERT(g_reactor_state == SPDK_REACTOR_STATE_INITIALIZED);
45 	for (core = 0; core < 3; core++) {
46 		CU_ASSERT(spdk_reactor_get(core) != NULL);
47 	}
48 
49 	spdk_reactors_fini();
50 
51 	free_cores();
52 
53 	MOCK_CLEAR(spdk_env_get_current_core);
54 }
55 
56 static void
57 ut_event_fn(void *arg1, void *arg2)
58 {
59 	uint8_t *test1 = arg1;
60 	uint8_t *test2 = arg2;
61 
62 	*test1 = 1;
63 	*test2 = 0xFF;
64 }
65 
66 static void
67 test_event_call(void)
68 {
69 	uint8_t test1 = 0, test2 = 0;
70 	struct spdk_event *evt;
71 	struct spdk_reactor *reactor;
72 
73 	MOCK_SET(spdk_env_get_current_core, 0);
74 
75 	allocate_cores(1);
76 
77 	CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0);
78 
79 	evt = spdk_event_allocate(0, ut_event_fn, &test1, &test2);
80 	CU_ASSERT(evt != NULL);
81 
82 	MOCK_SET(spdk_env_get_current_core, 0);
83 
84 	spdk_event_call(evt);
85 
86 	reactor = spdk_reactor_get(0);
87 	CU_ASSERT(reactor != NULL);
88 
89 	CU_ASSERT(event_queue_run_batch(reactor) == 1);
90 	CU_ASSERT(test1 == 1);
91 	CU_ASSERT(test2 == 0xFF);
92 
93 	MOCK_CLEAR(spdk_env_get_current_core);
94 
95 	spdk_reactors_fini();
96 
97 	free_cores();
98 
99 	MOCK_CLEAR(spdk_env_get_current_core);
100 }
101 
102 static void
103 test_schedule_thread(void)
104 {
105 	struct spdk_cpuset cpuset = {};
106 	struct spdk_thread *thread;
107 	struct spdk_reactor *reactor;
108 	struct spdk_lw_thread *lw_thread;
109 
110 	MOCK_SET(spdk_env_get_current_core, 0);
111 
112 	allocate_cores(5);
113 
114 	CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0);
115 
116 	spdk_cpuset_set_cpu(&cpuset, 3, true);
117 	g_next_core = 4;
118 
119 	MOCK_SET(spdk_env_get_current_core, 3);
120 
121 	/* _reactor_schedule_thread() will be called in spdk_thread_create()
122 	 * at its end because it is passed to SPDK thread library by
123 	 * spdk_thread_lib_init().
124 	 */
125 	thread = spdk_thread_create(NULL, &cpuset);
126 	CU_ASSERT(thread != NULL);
127 
128 	reactor = spdk_reactor_get(3);
129 	CU_ASSERT(reactor != NULL);
130 
131 	CU_ASSERT(event_queue_run_batch(reactor) == 1);
132 
133 	MOCK_CLEAR(spdk_env_get_current_core);
134 
135 	lw_thread = TAILQ_FIRST(&reactor->threads);
136 	CU_ASSERT(lw_thread != NULL);
137 	CU_ASSERT(spdk_thread_get_from_ctx(lw_thread) == thread);
138 
139 	TAILQ_REMOVE(&reactor->threads, lw_thread, link);
140 	reactor->thread_count--;
141 	spdk_set_thread(thread);
142 	spdk_thread_exit(thread);
143 	while (!spdk_thread_is_exited(thread)) {
144 		spdk_thread_poll(thread, 0, 0);
145 	}
146 	spdk_thread_destroy(thread);
147 	spdk_set_thread(NULL);
148 
149 	spdk_reactors_fini();
150 
151 	free_cores();
152 }
153 
154 static void
155 test_reschedule_thread(void)
156 {
157 	struct spdk_cpuset cpuset = {};
158 	struct spdk_thread *thread;
159 	struct spdk_reactor *reactor;
160 	struct spdk_lw_thread *lw_thread;
161 
162 	MOCK_SET(spdk_env_get_current_core, 0);
163 
164 	allocate_cores(3);
165 
166 	CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0);
167 
168 	spdk_cpuset_set_cpu(&g_reactor_core_mask, 0, true);
169 	spdk_cpuset_set_cpu(&g_reactor_core_mask, 1, true);
170 	spdk_cpuset_set_cpu(&g_reactor_core_mask, 2, true);
171 	g_next_core = 0;
172 
173 	MOCK_SET(spdk_env_get_current_core, 1);
174 	/* Create and schedule the thread to core 1. */
175 	spdk_cpuset_set_cpu(&cpuset, 1, true);
176 
177 	thread = spdk_thread_create(NULL, &cpuset);
178 	CU_ASSERT(thread != NULL);
179 	lw_thread = spdk_thread_get_ctx(thread);
180 
181 	reactor = spdk_reactor_get(1);
182 	CU_ASSERT(reactor != NULL);
183 
184 	CU_ASSERT(event_queue_run_batch(reactor) == 1);
185 	CU_ASSERT(TAILQ_FIRST(&reactor->threads) == lw_thread);
186 
187 	spdk_set_thread(thread);
188 
189 	/* Call spdk_thread_set_cpumask() twice with different cpumask values.
190 	 * The cpumask of the 2nd call will be used in reschedule operation.
191 	 */
192 
193 	spdk_cpuset_zero(&cpuset);
194 	spdk_cpuset_set_cpu(&cpuset, 0, true);
195 	CU_ASSERT(spdk_thread_set_cpumask(&cpuset) == 0);
196 
197 	spdk_cpuset_zero(&cpuset);
198 	spdk_cpuset_set_cpu(&cpuset, 2, true);
199 	CU_ASSERT(spdk_thread_set_cpumask(&cpuset) == 0);
200 
201 	CU_ASSERT(lw_thread->resched == true);
202 
203 	reactor_run(reactor);
204 
205 	CU_ASSERT(lw_thread->resched == false);
206 	CU_ASSERT(TAILQ_EMPTY(&reactor->threads));
207 
208 	reactor = spdk_reactor_get(0);
209 	CU_ASSERT(reactor != NULL);
210 	MOCK_SET(spdk_env_get_current_core, 0);
211 
212 	CU_ASSERT(event_queue_run_batch(reactor) == 0);
213 
214 	reactor = spdk_reactor_get(2);
215 	CU_ASSERT(reactor != NULL);
216 	MOCK_SET(spdk_env_get_current_core, 2);
217 
218 	CU_ASSERT(event_queue_run_batch(reactor) == 1);
219 
220 	CU_ASSERT(TAILQ_FIRST(&reactor->threads) == lw_thread);
221 
222 	MOCK_CLEAR(spdk_env_get_current_core);
223 
224 	TAILQ_REMOVE(&reactor->threads, lw_thread, link);
225 	reactor->thread_count--;
226 	spdk_set_thread(thread);
227 	spdk_thread_exit(thread);
228 	while (!spdk_thread_is_exited(thread)) {
229 		spdk_thread_poll(thread, 0, 0);
230 	}
231 	spdk_thread_destroy(thread);
232 	spdk_set_thread(NULL);
233 
234 	spdk_reactors_fini();
235 
236 	free_cores();
237 }
238 
239 static void
240 for_each_reactor_done(void *arg1, void *arg2)
241 {
242 	uint32_t *count = arg1;
243 	bool *done = arg2;
244 
245 	(*count)++;
246 	*done = true;
247 }
248 
249 static void
250 for_each_reactor_cb(void *arg1, void *arg2)
251 {
252 	uint32_t *count = arg1;
253 
254 	(*count)++;
255 }
256 
257 static void
258 test_for_each_reactor(void)
259 {
260 	uint32_t count = 0, i;
261 	bool done = false;
262 	struct spdk_reactor *reactor;
263 
264 	MOCK_SET(spdk_env_get_current_core, 0);
265 
266 	allocate_cores(5);
267 
268 	CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0);
269 
270 	spdk_for_each_reactor(for_each_reactor_cb, &count, &done, for_each_reactor_done);
271 
272 	MOCK_CLEAR(spdk_env_get_current_core);
273 
274 	/* We have not processed any event yet, so count and done should be 0 and false,
275 	 *  respectively.
276 	 */
277 	CU_ASSERT(count == 0);
278 
279 	/* Poll each reactor to verify the event is passed to each */
280 	for (i = 0; i < 5; i++) {
281 		reactor = spdk_reactor_get(i);
282 		CU_ASSERT(reactor != NULL);
283 		MOCK_SET(spdk_env_get_current_core, i);
284 
285 		event_queue_run_batch(reactor);
286 		CU_ASSERT(count == (i + 1));
287 		CU_ASSERT(done == false);
288 		MOCK_CLEAR(spdk_env_get_current_core);
289 	}
290 
291 	MOCK_SET(spdk_env_get_current_core, 0);
292 	/* After each reactor is called, the completion calls it one more time. */
293 	reactor = spdk_reactor_get(0);
294 	CU_ASSERT(reactor != NULL);
295 
296 	event_queue_run_batch(reactor);
297 	CU_ASSERT(count == 6);
298 	CU_ASSERT(done == true);
299 	MOCK_CLEAR(spdk_env_get_current_core);
300 
301 	spdk_reactors_fini();
302 
303 	free_cores();
304 }
305 
306 static int
307 poller_run_idle(void *ctx)
308 {
309 	uint64_t delay_us = (uint64_t)ctx;
310 
311 	spdk_delay_us(delay_us);
312 
313 	return 0;
314 }
315 
316 static int
317 poller_run_busy(void *ctx)
318 {
319 	uint64_t delay_us = (uint64_t)ctx;
320 
321 	spdk_delay_us(delay_us);
322 
323 	return 1;
324 }
325 
326 static void
327 test_reactor_stats(void)
328 {
329 	struct spdk_cpuset cpuset = {};
330 	struct spdk_thread *thread1, *thread2;
331 	struct spdk_reactor *reactor;
332 	struct spdk_poller *busy1, *idle1, *busy2, *idle2;
333 	struct spdk_thread_stats stats;
334 	int rc __attribute__((unused));
335 
336 	/* Test case is the following:
337 	 * Create a reactor on CPU core0.
338 	 * Create thread1 and thread2 simultaneously on reactor0 at TSC = 100.
339 	 * Reactor runs
340 	 * - thread1 for 100 with busy
341 	 * - thread2 for 200 with idle
342 	 * - thread1 for 300 with idle
343 	 * - thread2 for 400 with busy.
344 	 * Then,
345 	 * - both elapsed TSC of thread1 and thread2 should be 1100 (= 100 + 1000).
346 	 * - busy TSC of reactor should be 500 (= 100 + 400).
347 	 * - idle TSC of reactor should be 500 (= 200 + 300).
348 	 *
349 	 * After that reactor0 runs with no threads for 900 TSC.
350 	 * Create thread1 on reactor0 at TSC = 2000.
351 	 * Reactor runs
352 	 * - thread1 for 100 with busy
353 	 * Then,
354 	 * - elapsed TSC of thread1 should be 2100 (= 2000+ 100).
355 	 * - busy TSC of reactor should be 600 (= 500 + 100).
356 	 * - idle TSC of reactor should be 500 (= 500 + 900).
357 	 */
358 
359 	MOCK_SET(spdk_env_get_current_core, 0);
360 
361 	allocate_cores(1);
362 
363 	CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0);
364 
365 	spdk_cpuset_set_cpu(&cpuset, 0, true);
366 
367 	reactor = spdk_reactor_get(0);
368 	SPDK_CU_ASSERT_FATAL(reactor != NULL);
369 
370 	/* First reactor_run() sets the tsc_last. */
371 	MOCK_SET(spdk_get_ticks, 100);
372 	reactor->tsc_last = spdk_get_ticks();
373 
374 	thread1 = spdk_thread_create(NULL, &cpuset);
375 	SPDK_CU_ASSERT_FATAL(thread1 != NULL);
376 
377 	thread2 = spdk_thread_create(NULL, &cpuset);
378 	SPDK_CU_ASSERT_FATAL(thread2 != NULL);
379 
380 	spdk_set_thread(thread1);
381 	busy1 = spdk_poller_register(poller_run_busy, (void *)100, 0);
382 	CU_ASSERT(busy1 != NULL);
383 
384 	spdk_set_thread(thread2);
385 	idle2 = spdk_poller_register(poller_run_idle, (void *)300, 0);
386 	CU_ASSERT(idle2 != NULL);
387 
388 	_reactor_run(reactor);
389 
390 	spdk_set_thread(thread1);
391 	CU_ASSERT(spdk_thread_get_last_tsc(thread1) == 200);
392 	CU_ASSERT(spdk_thread_get_stats(&stats) == 0);
393 	CU_ASSERT(stats.busy_tsc == 100);
394 	CU_ASSERT(stats.idle_tsc == 0);
395 	spdk_set_thread(thread2);
396 	CU_ASSERT(spdk_thread_get_last_tsc(thread2) == 500);
397 	CU_ASSERT(spdk_thread_get_stats(&stats) == 0);
398 	CU_ASSERT(stats.busy_tsc == 0);
399 	CU_ASSERT(stats.idle_tsc == 300);
400 
401 	CU_ASSERT(reactor->busy_tsc == 100);
402 	CU_ASSERT(reactor->idle_tsc == 300);
403 
404 	/* 100 + 100 + 300 = 500 ticks elapsed */
405 	CU_ASSERT(reactor->tsc_last == 500);
406 
407 	spdk_set_thread(thread1);
408 	spdk_poller_unregister(&busy1);
409 	idle1 = spdk_poller_register(poller_run_idle, (void *)200, 0);
410 	CU_ASSERT(idle1 != NULL);
411 
412 	spdk_set_thread(thread2);
413 	spdk_poller_unregister(&idle2);
414 	busy2 = spdk_poller_register(poller_run_busy, (void *)400, 0);
415 	CU_ASSERT(busy2 != NULL);
416 
417 	_reactor_run(reactor);
418 
419 	spdk_set_thread(thread1);
420 	CU_ASSERT(spdk_thread_get_last_tsc(thread1) == 700);
421 	CU_ASSERT(spdk_thread_get_stats(&stats) == 0);
422 	CU_ASSERT(stats.busy_tsc == 100);
423 	CU_ASSERT(stats.idle_tsc == 200);
424 	spdk_set_thread(thread2);
425 	CU_ASSERT(spdk_thread_get_last_tsc(thread2) == 1100);
426 	CU_ASSERT(spdk_thread_get_stats(&stats) == 0);
427 	CU_ASSERT(stats.busy_tsc == 400);
428 	CU_ASSERT(stats.idle_tsc == 300);
429 
430 	CU_ASSERT(reactor->busy_tsc == 500);
431 	CU_ASSERT(reactor->idle_tsc == 500);
432 
433 	/* 500 + 200 + 400 = 1100 ticks elapsed */
434 	CU_ASSERT(reactor->tsc_last == 1100);
435 
436 	spdk_set_thread(thread1);
437 	spdk_poller_unregister(&idle1);
438 	spdk_thread_exit(thread1);
439 
440 	spdk_set_thread(thread2);
441 	spdk_poller_unregister(&busy2);
442 	spdk_thread_exit(thread2);
443 
444 	_reactor_run(reactor);
445 
446 	/* After 900 ticks new thread is created. */
447 	/* 1100 + 900 = 2000 ticks elapsed */
448 	MOCK_SET(spdk_get_ticks, 2000);
449 	_reactor_run(reactor);
450 	CU_ASSERT(reactor->tsc_last == 2000);
451 
452 	thread1 = spdk_thread_create(NULL, &cpuset);
453 	SPDK_CU_ASSERT_FATAL(thread1 != NULL);
454 
455 	spdk_set_thread(thread1);
456 	busy1 = spdk_poller_register(poller_run_busy, (void *)100, 0);
457 	CU_ASSERT(busy1 != NULL);
458 
459 	_reactor_run(reactor);
460 
461 	spdk_set_thread(thread1);
462 	CU_ASSERT(spdk_thread_get_last_tsc(thread1) == 2100);
463 	CU_ASSERT(spdk_thread_get_stats(&stats) == 0);
464 	CU_ASSERT(stats.busy_tsc == 100);
465 	CU_ASSERT(stats.idle_tsc == 0);
466 
467 	CU_ASSERT(reactor->busy_tsc == 600);
468 	CU_ASSERT(reactor->idle_tsc == 1400);
469 
470 	/* 2000 + 100 = 2100 ticks elapsed */
471 	CU_ASSERT(reactor->tsc_last == 2100);
472 
473 	spdk_set_thread(thread1);
474 	spdk_poller_unregister(&busy1);
475 	spdk_thread_exit(thread1);
476 
477 	_reactor_run(reactor);
478 
479 	CU_ASSERT(TAILQ_EMPTY(&reactor->threads));
480 
481 	/* No further than 2100 ticks elapsed */
482 	CU_ASSERT(reactor->tsc_last == 2100);
483 
484 	spdk_reactors_fini();
485 
486 	free_cores();
487 
488 	MOCK_CLEAR(spdk_env_get_current_core);
489 }
490 
491 static uint32_t
492 _run_events_till_completion(uint32_t reactor_count)
493 {
494 	struct spdk_reactor *reactor;
495 	struct spdk_thread *app_thread = spdk_thread_get_app_thread();
496 	uint32_t i, events;
497 	uint32_t total_events = 0;
498 
499 	do {
500 		events = 0;
501 		for (i = 0; i < reactor_count; i++) {
502 			reactor = spdk_reactor_get(i);
503 			CU_ASSERT(reactor != NULL);
504 			MOCK_SET(spdk_env_get_current_core, i);
505 			events += event_queue_run_batch(reactor);
506 
507 			/* Some events still require app_thread to run */
508 			MOCK_SET(spdk_env_get_current_core, g_scheduling_reactor->lcore);
509 			spdk_thread_poll(app_thread, 0, 0);
510 
511 			MOCK_CLEAR(spdk_env_get_current_core);
512 		}
513 		total_events += events;
514 	} while (events > 0);
515 
516 	return total_events;
517 }
518 
519 static void
520 test_scheduler(void)
521 {
522 	struct spdk_cpuset cpuset = {};
523 	struct spdk_thread *thread[3];
524 	struct spdk_reactor *reactor;
525 	struct spdk_poller *busy, *idle;
526 	uint64_t reactor_busy_tsc[3], reactor_idle_tsc[3];
527 	uint64_t thread_busy_tsc[3], thread_idle_tsc[3];
528 	uint64_t current_time, end_time, busy_time, idle_time;
529 	struct spdk_thread_stats stats;
530 	int i;
531 
532 	MOCK_SET(spdk_env_get_current_core, 0);
533 
534 	allocate_cores(3);
535 
536 	CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0);
537 
538 	spdk_scheduler_set("dynamic");
539 
540 	for (i = 0; i < 3; i++) {
541 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
542 	}
543 	g_next_core = 0;
544 
545 	/* Create threads. */
546 	for (i = 0; i < 3; i++) {
547 		spdk_cpuset_zero(&cpuset);
548 		spdk_cpuset_set_cpu(&cpuset, i, true);
549 		thread[i] = spdk_thread_create(NULL, &cpuset);
550 		CU_ASSERT(thread[i] != NULL);
551 		thread_busy_tsc[i] = 0;
552 		thread_idle_tsc[i] = 0;
553 	}
554 
555 	for (i = 0; i < 3; i++) {
556 		reactor = spdk_reactor_get(i);
557 		CU_ASSERT(reactor != NULL);
558 		MOCK_SET(spdk_env_get_current_core, i);
559 		event_queue_run_batch(reactor);
560 		CU_ASSERT(!TAILQ_EMPTY(&reactor->threads));
561 		reactor_busy_tsc[i] = 0;
562 		reactor_idle_tsc[i] = 0;
563 	}
564 
565 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
566 
567 	MOCK_SET(spdk_env_get_current_core, 0);
568 
569 	/* Init threads stats (low load) */
570 	/* Each reactor starts at 100 tsc,
571 	 * ends at 100 + 100 = 200 tsc. */
572 	current_time = 100;
573 	idle_time = 100;
574 	busy_time = 0;
575 	end_time = current_time + idle_time + busy_time;
576 	for (i = 0; i < 3; i++) {
577 		spdk_set_thread(thread[i]);
578 		idle = spdk_poller_register(poller_run_idle, (void *)idle_time, 0);
579 		reactor = spdk_reactor_get(i);
580 		CU_ASSERT(reactor != NULL);
581 		MOCK_SET(spdk_get_ticks, current_time);
582 		reactor->tsc_last = spdk_get_ticks();
583 		_reactor_run(reactor);
584 		CU_ASSERT(reactor->tsc_last == end_time);
585 		spdk_poller_unregister(&idle);
586 
587 		CU_ASSERT(spdk_thread_get_last_tsc(thread[i]) == end_time);
588 		CU_ASSERT(spdk_thread_get_stats(&stats) == 0);
589 		CU_ASSERT(stats.busy_tsc == busy_time);
590 		thread_busy_tsc[i] = stats.busy_tsc;
591 		CU_ASSERT(stats.idle_tsc == idle_time);
592 		thread_idle_tsc[i] = stats.idle_tsc;
593 		CU_ASSERT(reactor->busy_tsc == busy_time);
594 		reactor_busy_tsc[i] = reactor->busy_tsc;
595 		CU_ASSERT(reactor->idle_tsc == idle_time);
596 		reactor_idle_tsc[i] = reactor->idle_tsc;
597 	}
598 	CU_ASSERT(spdk_get_ticks() == end_time);
599 	current_time = 200;
600 
601 	MOCK_SET(spdk_env_get_current_core, 0);
602 	_reactors_scheduler_gather_metrics(NULL, NULL);
603 
604 	_run_events_till_completion(3);
605 	MOCK_SET(spdk_env_get_current_core, 0);
606 
607 	/* Threads were idle, so all of them should be placed on core 0.
608 	 * All reactors start and end at 200 tsc, since for this iteration
609 	 * the threads have no pollers (so they consume no idle or busy tsc).
610 	 */
611 	for (i = 0; i < 3; i++) {
612 		reactor = spdk_reactor_get(i);
613 		CU_ASSERT(reactor != NULL);
614 		MOCK_SET(spdk_get_ticks, current_time);
615 		_reactor_run(reactor);
616 		CU_ASSERT(reactor->tsc_last == current_time);
617 		CU_ASSERT(reactor->busy_tsc == reactor_busy_tsc[i]);
618 		CU_ASSERT(reactor->idle_tsc == reactor_idle_tsc[i]);
619 		spdk_set_thread(thread[i]);
620 		CU_ASSERT(spdk_thread_get_last_tsc(thread[i]) == current_time);
621 		CU_ASSERT(spdk_thread_get_stats(&stats) == 0);
622 		CU_ASSERT(stats.busy_tsc == thread_busy_tsc[i]);
623 		CU_ASSERT(stats.idle_tsc == thread_idle_tsc[i]);
624 	}
625 	CU_ASSERT(spdk_get_ticks() == current_time);
626 
627 	/* 2 threads should be scheduled to core 0 */
628 	reactor = spdk_reactor_get(0);
629 	CU_ASSERT(reactor != NULL);
630 	MOCK_SET(spdk_env_get_current_core, 0);
631 	event_queue_run_batch(reactor);
632 
633 	reactor = spdk_reactor_get(0);
634 	CU_ASSERT(reactor != NULL);
635 	CU_ASSERT(!TAILQ_EMPTY(&reactor->threads));
636 	reactor = spdk_reactor_get(1);
637 	CU_ASSERT(reactor != NULL);
638 	CU_ASSERT(TAILQ_EMPTY(&reactor->threads));
639 	reactor = spdk_reactor_get(2);
640 	CU_ASSERT(reactor != NULL);
641 	CU_ASSERT(TAILQ_EMPTY(&reactor->threads));
642 
643 	/* Make threads busy */
644 	reactor = spdk_reactor_get(0);
645 	CU_ASSERT(reactor != NULL);
646 
647 	/* All threads run on single reactor,
648 	 * reactor 0 starts at 200 tsc,
649 	 * ending at 200 + (100 * 3) = 500 tsc. */
650 	MOCK_SET(spdk_get_ticks, current_time);
651 	busy_time = 100;
652 	idle_time = 0;
653 	for (i = 0; i < 3; i++) {
654 		spdk_set_thread(thread[i]);
655 		busy = spdk_poller_register(poller_run_busy, (void *)busy_time, 0);
656 		_reactor_run(reactor);
657 		spdk_poller_unregister(&busy);
658 		current_time += busy_time;
659 
660 		CU_ASSERT(reactor->tsc_last == current_time);
661 		CU_ASSERT(spdk_thread_get_last_tsc(thread[i]) == current_time);
662 		CU_ASSERT(spdk_thread_get_stats(&stats) == 0);
663 		CU_ASSERT(stats.busy_tsc == thread_busy_tsc[i] + busy_time);
664 		CU_ASSERT(stats.idle_tsc == thread_idle_tsc[i] + idle_time);;
665 	}
666 	CU_ASSERT(reactor->busy_tsc == reactor_busy_tsc[0] + 3 * busy_time);
667 	CU_ASSERT(reactor->idle_tsc == reactor_idle_tsc[0] + 3 * idle_time);
668 	CU_ASSERT(spdk_get_ticks() == current_time);
669 
670 	/* Run scheduler again, this time all threads are busy */
671 	MOCK_SET(spdk_env_get_current_core, 0);
672 	_reactors_scheduler_gather_metrics(NULL, NULL);
673 
674 	_run_events_till_completion(3);
675 	MOCK_SET(spdk_env_get_current_core, 0);
676 
677 	/* Threads were busy, 2 will stay on core 0, 1 will move to core 1 */
678 	for (i = 0; i < 3; i++) {
679 		MOCK_SET(spdk_env_get_current_core, i);
680 		reactor = spdk_reactor_get(i);
681 		CU_ASSERT(reactor != NULL);
682 		_reactor_run(reactor);
683 	}
684 
685 	for (i = 0; i < 3; i++) {
686 		reactor = spdk_reactor_get(i);
687 		CU_ASSERT(reactor != NULL);
688 		CU_ASSERT(!TAILQ_EMPTY(&reactor->threads));
689 	}
690 
691 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
692 
693 	/* Destroy threads */
694 	for (i = 0; i < 3; i++) {
695 		spdk_set_thread(thread[i]);
696 		spdk_thread_exit(thread[i]);
697 	}
698 	for (i = 0; i < 3; i++) {
699 		reactor = spdk_reactor_get(i);
700 		CU_ASSERT(reactor != NULL);
701 		reactor_run(reactor);
702 	}
703 
704 	spdk_set_thread(NULL);
705 
706 	MOCK_CLEAR(spdk_env_get_current_core);
707 
708 	spdk_reactors_fini();
709 
710 	free_cores();
711 }
712 
713 uint8_t g_curr_freq;
714 
715 static int
716 core_freq_up(uint32_t lcore)
717 {
718 	if (g_curr_freq != UINT8_MAX) {
719 		g_curr_freq++;
720 	}
721 
722 	return 0;
723 }
724 
725 static int
726 core_freq_down(uint32_t lcore)
727 {
728 	if (g_curr_freq != 0) {
729 		g_curr_freq--;
730 	}
731 
732 	return 0;
733 }
734 
735 static int
736 core_freq_max(uint32_t lcore)
737 {
738 	g_curr_freq = UINT8_MAX;
739 
740 	return 0;
741 }
742 
743 DEFINE_STUB(core_freq_min, int, (uint32_t lcore_id), 0);
744 DEFINE_STUB(core_caps, int,
745 	    (uint32_t lcore_id, struct spdk_governor_capabilities *capabilities), 0);
746 DEFINE_STUB(governor_init, int, (void), 0);
747 DEFINE_STUB_V(governor_deinit, (void));
748 
749 static struct spdk_governor governor = {
750 	.name = "dpdk_governor",
751 	.get_core_curr_freq = NULL,
752 	.core_freq_up = core_freq_up,
753 	.core_freq_down = core_freq_down,
754 	.set_core_freq_max = core_freq_max,
755 	.set_core_freq_min = core_freq_min,
756 	.get_core_capabilities = core_caps,
757 	.init = governor_init,
758 	.deinit = governor_deinit,
759 };
760 
761 static void
762 test_governor(void)
763 {
764 	struct spdk_cpuset cpuset = {};
765 	struct spdk_thread *thread[2];
766 	struct spdk_lw_thread *lw_thread;
767 	struct spdk_reactor *reactor;
768 	struct spdk_poller *busy, *idle;
769 	uint8_t last_freq = 100;
770 	int i;
771 
772 	MOCK_SET(spdk_env_get_current_core, 0);
773 
774 	g_curr_freq = last_freq;
775 	spdk_governor_register(&governor);
776 
777 	allocate_cores(2);
778 
779 	CU_ASSERT(spdk_reactors_init(SPDK_DEFAULT_MSG_MEMPOOL_SIZE) == 0);
780 
781 	spdk_scheduler_set("dynamic");
782 	spdk_governor_set("dpdk_governor");
783 
784 	for (i = 0; i < 2; i++) {
785 		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
786 	}
787 
788 	/* Create threads. */
789 	for (i = 0; i < 2; i++) {
790 		spdk_cpuset_zero(&cpuset);
791 		spdk_cpuset_set_cpu(&cpuset, i, true);
792 		thread[i] = spdk_thread_create(NULL, &cpuset);
793 		CU_ASSERT(thread[i] != NULL);
794 	}
795 
796 	for (i = 0; i < 2; i++) {
797 		reactor = spdk_reactor_get(i);
798 		CU_ASSERT(reactor != NULL);
799 		MOCK_SET(spdk_env_get_current_core, i);
800 		CU_ASSERT(event_queue_run_batch(reactor) == 1);
801 		CU_ASSERT(!TAILQ_EMPTY(&reactor->threads));
802 	}
803 
804 	reactor = spdk_reactor_get(0);
805 	CU_ASSERT(reactor != NULL);
806 	MOCK_SET(spdk_env_get_current_core, 0);
807 
808 	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
809 
810 	/* TEST 1 */
811 	/* Init thread stats (low load) */
812 	MOCK_SET(spdk_get_ticks, 100);
813 	reactor->tsc_last = 100;
814 
815 	for (i = 0; i < 2; i++) {
816 		spdk_set_thread(thread[i]);
817 		idle = spdk_poller_register(poller_run_idle, (void *)200, 0);
818 		reactor = spdk_reactor_get(i);
819 		CU_ASSERT(reactor != NULL);
820 		MOCK_SET(spdk_env_get_current_core, i);
821 		_reactor_run(reactor);
822 		spdk_poller_unregister(&idle);
823 
824 		/* Update last stats so that we don't have to call scheduler twice */
825 		lw_thread = spdk_thread_get_ctx(thread[i]);
826 		lw_thread->current_stats.idle_tsc = 1;
827 	}
828 
829 	MOCK_SET(spdk_env_get_current_core, 0);
830 	_reactors_scheduler_gather_metrics(NULL, NULL);
831 
832 	CU_ASSERT(_run_events_till_completion(2) == 2);
833 	MOCK_SET(spdk_env_get_current_core, 0);
834 
835 	/* Threads were idle, so all of them should be placed on core 0 */
836 	for (i = 0; i < 2; i++) {
837 		reactor = spdk_reactor_get(i);
838 		CU_ASSERT(reactor != NULL);
839 		_reactor_run(reactor);
840 	}
841 
842 	/* 1 thread should be scheduled to core 0 */
843 	reactor = spdk_reactor_get(0);
844 	CU_ASSERT(reactor != NULL);
845 	MOCK_SET(spdk_env_get_current_core, 0);
846 	CU_ASSERT(event_queue_run_batch(reactor) == 1);
847 
848 	/* Main core should be busy less than 50% time now - frequency should be lowered */
849 	CU_ASSERT(g_curr_freq == last_freq - 1);
850 
851 	last_freq = g_curr_freq;
852 
853 	/* TEST 2 */
854 	/* Make first threads busy - both threads will be still on core 0, but frequency will have to be raised */
855 	spdk_set_thread(thread[0]);
856 	busy = spdk_poller_register(poller_run_busy, (void *)1000, 0);
857 	_reactor_run(reactor);
858 	spdk_poller_unregister(&busy);
859 
860 	spdk_set_thread(thread[1]);
861 	idle = spdk_poller_register(poller_run_idle, (void *)100, 0);
862 	_reactor_run(reactor);
863 	spdk_poller_unregister(&idle);
864 
865 	/* Run scheduler again */
866 	MOCK_SET(spdk_env_get_current_core, 0);
867 	_reactors_scheduler_gather_metrics(NULL, NULL);
868 
869 	i = _run_events_till_completion(2);
870 	/* Six runs when interrupt mode is supported, two if not. */
871 	CU_ASSERT(i == 6 || i == 2);
872 	MOCK_SET(spdk_env_get_current_core, 0);
873 
874 	/* Main core should be busy more than 50% time now - frequency should be raised */
875 	CU_ASSERT(g_curr_freq == last_freq + 1);
876 
877 	/* TEST 3 */
878 	/* Make second thread very busy so that it will be moved to second core */
879 	spdk_set_thread(thread[1]);
880 	busy = spdk_poller_register(poller_run_busy, (void *)2000, 0);
881 	_reactor_run(reactor);
882 	spdk_poller_unregister(&busy);
883 
884 	/* Update first thread stats */
885 	spdk_set_thread(thread[0]);
886 	idle = spdk_poller_register(poller_run_idle, (void *)100, 0);
887 	_reactor_run(reactor);
888 	spdk_poller_unregister(&idle);
889 
890 	/* Run scheduler again */
891 	MOCK_SET(spdk_env_get_current_core, 0);
892 	_reactors_scheduler_gather_metrics(NULL, NULL);
893 
894 	i = _run_events_till_completion(2);
895 	/* Six runs when interrupt mode is supported, two if not. */
896 	CU_ASSERT(i == 6 || i == 2);
897 	MOCK_SET(spdk_env_get_current_core, 0);
898 
899 	for (i = 0; i < 2; i++) {
900 		reactor = spdk_reactor_get(i);
901 		CU_ASSERT(reactor != NULL);
902 		_reactor_run(reactor);
903 	}
904 
905 	/* Main core frequency should be set to max when we have busy threads on other cores */
906 	CU_ASSERT(g_curr_freq == UINT8_MAX);
907 
908 	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
909 
910 	/* Destroy threads */
911 	for (i = 0; i < 2; i++) {
912 		spdk_set_thread(thread[i]);
913 		spdk_thread_exit(thread[i]);
914 	}
915 	for (i = 0; i < 2; i++) {
916 		reactor = spdk_reactor_get(i);
917 		CU_ASSERT(reactor != NULL);
918 		reactor_run(reactor);
919 	}
920 
921 	spdk_set_thread(NULL);
922 
923 	MOCK_CLEAR(spdk_env_get_current_core);
924 
925 	spdk_reactors_fini();
926 
927 	free_cores();
928 }
929 
930 int
931 main(int argc, char **argv)
932 {
933 	CU_pSuite suite = NULL;
934 	unsigned int num_failures;
935 
936 	CU_set_error_action(CUEA_ABORT);
937 	CU_initialize_registry();
938 
939 	suite = CU_add_suite("app_suite", NULL, NULL);
940 
941 	CU_ADD_TEST(suite, test_create_reactor);
942 	CU_ADD_TEST(suite, test_init_reactors);
943 	CU_ADD_TEST(suite, test_event_call);
944 	CU_ADD_TEST(suite, test_schedule_thread);
945 	CU_ADD_TEST(suite, test_reschedule_thread);
946 	CU_ADD_TEST(suite, test_for_each_reactor);
947 	CU_ADD_TEST(suite, test_reactor_stats);
948 	CU_ADD_TEST(suite, test_scheduler);
949 	CU_ADD_TEST(suite, test_governor);
950 
951 	CU_basic_set_mode(CU_BRM_VERBOSE);
952 	CU_basic_run_tests();
953 	num_failures = CU_get_number_of_failures();
954 	CU_cleanup_registry();
955 
956 	return num_failures;
957 }
958