xref: /spdk/test/unit/lib/thread/thread.c/thread_ut.c (revision a1dfa7ec92a6c49538482c8bb73f0b1ce040441f)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk_cunit.h"
10 
11 #include "thread/thread_internal.h"
12 
13 #include "thread/thread.c"
14 #include "common/lib/ut_multithread.c"
15 
16 #define SMALL_BUFSIZE 128
17 #define LARGE_BUFSIZE 512
18 
19 static int g_sched_rc = 0;
20 
21 static int
22 _thread_schedule(struct spdk_thread *thread)
23 {
24 	return g_sched_rc;
25 }
26 
27 static bool
28 _thread_op_supported(enum spdk_thread_op op)
29 {
30 	switch (op) {
31 	case SPDK_THREAD_OP_NEW:
32 		return true;
33 	default:
34 		return false;
35 	}
36 }
37 
38 static int
39 _thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
40 {
41 	switch (op) {
42 	case SPDK_THREAD_OP_NEW:
43 		return _thread_schedule(thread);
44 	default:
45 		return -ENOTSUP;
46 	}
47 }
48 
49 static void
50 thread_alloc(void)
51 {
52 	struct spdk_thread *thread;
53 
54 	/* No schedule callback */
55 	spdk_thread_lib_init(NULL, 0);
56 	thread = spdk_thread_create(NULL, NULL);
57 	SPDK_CU_ASSERT_FATAL(thread != NULL);
58 	spdk_set_thread(thread);
59 	spdk_thread_exit(thread);
60 	while (!spdk_thread_is_exited(thread)) {
61 		spdk_thread_poll(thread, 0, 0);
62 	}
63 	spdk_thread_destroy(thread);
64 	spdk_thread_lib_fini();
65 
66 	/* Schedule callback exists */
67 	spdk_thread_lib_init(_thread_schedule, 0);
68 
69 	/* Scheduling succeeds */
70 	g_sched_rc = 0;
71 	thread = spdk_thread_create(NULL, NULL);
72 	SPDK_CU_ASSERT_FATAL(thread != NULL);
73 	spdk_set_thread(thread);
74 	spdk_thread_exit(thread);
75 	while (!spdk_thread_is_exited(thread)) {
76 		spdk_thread_poll(thread, 0, 0);
77 	}
78 	spdk_thread_destroy(thread);
79 
80 	/* Scheduling fails */
81 	g_sched_rc = -1;
82 	thread = spdk_thread_create(NULL, NULL);
83 	SPDK_CU_ASSERT_FATAL(thread == NULL);
84 
85 	spdk_thread_lib_fini();
86 
87 	/* Scheduling callback exists with extended thread library initialization. */
88 	spdk_thread_lib_init_ext(_thread_op, _thread_op_supported, 0,
89 				 SPDK_DEFAULT_MSG_MEMPOOL_SIZE);
90 
91 	/* Scheduling succeeds */
92 	g_sched_rc = 0;
93 	thread = spdk_thread_create(NULL, NULL);
94 	SPDK_CU_ASSERT_FATAL(thread != NULL);
95 	spdk_set_thread(thread);
96 	spdk_thread_exit(thread);
97 	while (!spdk_thread_is_exited(thread)) {
98 		spdk_thread_poll(thread, 0, 0);
99 	}
100 	spdk_thread_destroy(thread);
101 
102 	/* Scheduling fails */
103 	g_sched_rc = -1;
104 	thread = spdk_thread_create(NULL, NULL);
105 	SPDK_CU_ASSERT_FATAL(thread == NULL);
106 
107 	spdk_thread_lib_fini();
108 }
109 
110 static void
111 send_msg_cb(void *ctx)
112 {
113 	bool *done = ctx;
114 
115 	*done = true;
116 }
117 
118 static void
119 thread_send_msg(void)
120 {
121 	struct spdk_thread *thread0;
122 	bool done = false;
123 
124 	allocate_threads(2);
125 	set_thread(0);
126 	thread0 = spdk_get_thread();
127 
128 	set_thread(1);
129 	/* Simulate thread 1 sending a message to thread 0. */
130 	spdk_thread_send_msg(thread0, send_msg_cb, &done);
131 
132 	/* We have not polled thread 0 yet, so done should be false. */
133 	CU_ASSERT(!done);
134 
135 	/*
136 	 * Poll thread 1.  The message was sent to thread 0, so this should be
137 	 *  a nop and done should still be false.
138 	 */
139 	poll_thread(1);
140 	CU_ASSERT(!done);
141 
142 	/*
143 	 * Poll thread 0.  This should execute the message and done should then
144 	 *  be true.
145 	 */
146 	poll_thread(0);
147 	CU_ASSERT(done);
148 
149 	free_threads();
150 }
151 
152 static int
153 poller_run_done(void *ctx)
154 {
155 	bool	*poller_run = ctx;
156 
157 	*poller_run = true;
158 
159 	return -1;
160 }
161 
162 static void
163 thread_poller(void)
164 {
165 	struct spdk_poller	*poller = NULL;
166 	bool			poller_run = false;
167 
168 	allocate_threads(1);
169 
170 	set_thread(0);
171 	MOCK_SET(spdk_get_ticks, 0);
172 	/* Register a poller with no-wait time and test execution */
173 	poller = spdk_poller_register(poller_run_done, &poller_run, 0);
174 	CU_ASSERT(poller != NULL);
175 
176 	poll_threads();
177 	CU_ASSERT(poller_run == true);
178 
179 	spdk_poller_unregister(&poller);
180 	CU_ASSERT(poller == NULL);
181 
182 	/* Register a poller with 1000us wait time and test single execution */
183 	poller_run = false;
184 	poller = spdk_poller_register(poller_run_done, &poller_run, 1000);
185 	CU_ASSERT(poller != NULL);
186 
187 	poll_threads();
188 	CU_ASSERT(poller_run == false);
189 
190 	spdk_delay_us(1000);
191 	poll_threads();
192 	CU_ASSERT(poller_run == true);
193 
194 	poller_run = false;
195 	poll_threads();
196 	CU_ASSERT(poller_run == false);
197 
198 	spdk_delay_us(1000);
199 	poll_threads();
200 	CU_ASSERT(poller_run == true);
201 
202 	spdk_poller_unregister(&poller);
203 	CU_ASSERT(poller == NULL);
204 
205 	free_threads();
206 }
207 
208 struct poller_ctx {
209 	struct spdk_poller	*poller;
210 	bool			run;
211 };
212 
213 static int
214 poller_run_pause(void *ctx)
215 {
216 	struct poller_ctx *poller_ctx = ctx;
217 
218 	poller_ctx->run = true;
219 	spdk_poller_pause(poller_ctx->poller);
220 
221 	return 0;
222 }
223 
224 /* Verify the same poller can be switched multiple times between
225  * pause and resume while it runs.
226  */
227 static int
228 poller_run_pause_resume_pause(void *ctx)
229 {
230 	struct poller_ctx *poller_ctx = ctx;
231 
232 	poller_ctx->run = true;
233 
234 	spdk_poller_pause(poller_ctx->poller);
235 	spdk_poller_resume(poller_ctx->poller);
236 	spdk_poller_pause(poller_ctx->poller);
237 
238 	return 0;
239 }
240 
241 static void
242 poller_msg_pause_cb(void *ctx)
243 {
244 	struct spdk_poller *poller = ctx;
245 
246 	spdk_poller_pause(poller);
247 }
248 
249 static void
250 poller_msg_resume_cb(void *ctx)
251 {
252 	struct spdk_poller *poller = ctx;
253 
254 	spdk_poller_resume(poller);
255 }
256 
257 static void
258 poller_pause(void)
259 {
260 	struct poller_ctx poller_ctx = {};
261 	unsigned int delay[] = { 0, 1000 };
262 	unsigned int i;
263 
264 	allocate_threads(1);
265 	set_thread(0);
266 
267 	/* Register a poller that pauses itself */
268 	poller_ctx.poller = spdk_poller_register(poller_run_pause, &poller_ctx, 0);
269 	CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller);
270 
271 	poller_ctx.run = false;
272 	poll_threads();
273 	CU_ASSERT_EQUAL(poller_ctx.run, true);
274 
275 	poller_ctx.run = false;
276 	poll_threads();
277 	CU_ASSERT_EQUAL(poller_ctx.run, false);
278 
279 	spdk_poller_unregister(&poller_ctx.poller);
280 	CU_ASSERT_PTR_NULL(poller_ctx.poller);
281 
282 	/* Register a poller that switches between pause and resume itself */
283 	poller_ctx.poller = spdk_poller_register(poller_run_pause_resume_pause, &poller_ctx, 0);
284 	CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller);
285 
286 	poller_ctx.run = false;
287 	poll_threads();
288 	CU_ASSERT_EQUAL(poller_ctx.run, true);
289 
290 	poller_ctx.run = false;
291 	poll_threads();
292 	CU_ASSERT_EQUAL(poller_ctx.run, false);
293 
294 	spdk_poller_unregister(&poller_ctx.poller);
295 	CU_ASSERT_PTR_NULL(poller_ctx.poller);
296 
297 	/* Verify that resuming an unpaused poller doesn't do anything */
298 	poller_ctx.poller = spdk_poller_register(poller_run_done, &poller_ctx.run, 0);
299 	CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller);
300 
301 	spdk_poller_resume(poller_ctx.poller);
302 
303 	poller_ctx.run = false;
304 	poll_threads();
305 	CU_ASSERT_EQUAL(poller_ctx.run, true);
306 
307 	/* Verify that pausing the same poller twice works too */
308 	spdk_poller_pause(poller_ctx.poller);
309 
310 	poller_ctx.run = false;
311 	poll_threads();
312 	CU_ASSERT_EQUAL(poller_ctx.run, false);
313 
314 	spdk_poller_pause(poller_ctx.poller);
315 	poll_threads();
316 	CU_ASSERT_EQUAL(poller_ctx.run, false);
317 
318 	spdk_poller_resume(poller_ctx.poller);
319 	poll_threads();
320 	CU_ASSERT_EQUAL(poller_ctx.run, true);
321 
322 	/* Verify that a poller is run when it's resumed immediately after pausing */
323 	poller_ctx.run = false;
324 	spdk_poller_pause(poller_ctx.poller);
325 	spdk_poller_resume(poller_ctx.poller);
326 	poll_threads();
327 	CU_ASSERT_EQUAL(poller_ctx.run, true);
328 
329 	spdk_poller_unregister(&poller_ctx.poller);
330 	CU_ASSERT_PTR_NULL(poller_ctx.poller);
331 
332 	/* Poll the thread to make sure the previous poller gets unregistered */
333 	poll_threads();
334 	CU_ASSERT_EQUAL(spdk_thread_has_pollers(spdk_get_thread()), false);
335 
336 	/* Verify that it's possible to unregister a paused poller */
337 	poller_ctx.poller = spdk_poller_register(poller_run_done, &poller_ctx.run, 0);
338 	CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller);
339 
340 	poller_ctx.run = false;
341 	poll_threads();
342 	CU_ASSERT_EQUAL(poller_ctx.run, true);
343 
344 	spdk_poller_pause(poller_ctx.poller);
345 
346 	poller_ctx.run = false;
347 	poll_threads();
348 	CU_ASSERT_EQUAL(poller_ctx.run, false);
349 
350 	spdk_poller_unregister(&poller_ctx.poller);
351 
352 	poll_threads();
353 	CU_ASSERT_EQUAL(poller_ctx.run, false);
354 	CU_ASSERT_EQUAL(spdk_thread_has_pollers(spdk_get_thread()), false);
355 
356 	/* Register pollers with 0 and 1000us wait time and pause/resume them */
357 	for (i = 0; i < SPDK_COUNTOF(delay); ++i) {
358 		poller_ctx.poller = spdk_poller_register(poller_run_done, &poller_ctx.run, delay[i]);
359 		CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller);
360 
361 		spdk_delay_us(delay[i]);
362 		poller_ctx.run = false;
363 		poll_threads();
364 		CU_ASSERT_EQUAL(poller_ctx.run, true);
365 
366 		spdk_poller_pause(poller_ctx.poller);
367 
368 		spdk_delay_us(delay[i]);
369 		poller_ctx.run = false;
370 		poll_threads();
371 		CU_ASSERT_EQUAL(poller_ctx.run, false);
372 
373 		spdk_poller_resume(poller_ctx.poller);
374 
375 		spdk_delay_us(delay[i]);
376 		poll_threads();
377 		CU_ASSERT_EQUAL(poller_ctx.run, true);
378 
379 		/* Verify that the poller can be paused/resumed from spdk_thread_send_msg */
380 		spdk_thread_send_msg(spdk_get_thread(), poller_msg_pause_cb, poller_ctx.poller);
381 
382 		spdk_delay_us(delay[i]);
383 		poller_ctx.run = false;
384 		poll_threads();
385 		CU_ASSERT_EQUAL(poller_ctx.run, false);
386 
387 		spdk_thread_send_msg(spdk_get_thread(), poller_msg_resume_cb, poller_ctx.poller);
388 
389 		poll_threads();
390 		if (delay[i] > 0) {
391 			spdk_delay_us(delay[i]);
392 			poll_threads();
393 		}
394 		CU_ASSERT_EQUAL(poller_ctx.run, true);
395 
396 		spdk_poller_unregister(&poller_ctx.poller);
397 		CU_ASSERT_PTR_NULL(poller_ctx.poller);
398 
399 		/* Register a timed poller that pauses itself */
400 		poller_ctx.poller = spdk_poller_register(poller_run_pause, &poller_ctx, delay[i]);
401 		CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller);
402 
403 		spdk_delay_us(delay[i]);
404 		poller_ctx.run = false;
405 		poll_threads();
406 		CU_ASSERT_EQUAL(poller_ctx.run, true);
407 
408 		poller_ctx.run = false;
409 		spdk_delay_us(delay[i]);
410 		poll_threads();
411 		CU_ASSERT_EQUAL(poller_ctx.run, false);
412 
413 		spdk_poller_resume(poller_ctx.poller);
414 
415 		CU_ASSERT_EQUAL(poller_ctx.run, false);
416 		spdk_delay_us(delay[i]);
417 		poll_threads();
418 		CU_ASSERT_EQUAL(poller_ctx.run, true);
419 
420 		spdk_poller_unregister(&poller_ctx.poller);
421 		CU_ASSERT_PTR_NULL(poller_ctx.poller);
422 
423 		/* Register a timed poller that switches between pause and resume itself */
424 		poller_ctx.poller = spdk_poller_register(poller_run_pause_resume_pause,
425 				    &poller_ctx, delay[i]);
426 		CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller);
427 
428 		spdk_delay_us(delay[i]);
429 		poller_ctx.run = false;
430 		poll_threads();
431 		CU_ASSERT_EQUAL(poller_ctx.run, true);
432 
433 		poller_ctx.run = false;
434 		spdk_delay_us(delay[i]);
435 		poll_threads();
436 		CU_ASSERT_EQUAL(poller_ctx.run, false);
437 
438 		spdk_poller_resume(poller_ctx.poller);
439 
440 		CU_ASSERT_EQUAL(poller_ctx.run, false);
441 		spdk_delay_us(delay[i]);
442 		poll_threads();
443 		CU_ASSERT_EQUAL(poller_ctx.run, true);
444 
445 		spdk_poller_unregister(&poller_ctx.poller);
446 		CU_ASSERT_PTR_NULL(poller_ctx.poller);
447 	}
448 
449 	free_threads();
450 }
451 
452 static void
453 for_each_cb(void *ctx)
454 {
455 	int *count = ctx;
456 
457 	(*count)++;
458 }
459 
460 static void
461 thread_for_each(void)
462 {
463 	int count = 0;
464 	int i;
465 
466 	allocate_threads(3);
467 	set_thread(0);
468 
469 	spdk_for_each_thread(for_each_cb, &count, for_each_cb);
470 
471 	/* We have not polled thread 0 yet, so count should be 0 */
472 	CU_ASSERT(count == 0);
473 
474 	/* Poll each thread to verify the message is passed to each */
475 	for (i = 0; i < 3; i++) {
476 		poll_thread(i);
477 		CU_ASSERT(count == (i + 1));
478 	}
479 
480 	/*
481 	 * After each thread is called, the completion calls it
482 	 * one more time.
483 	 */
484 	poll_thread(0);
485 	CU_ASSERT(count == 4);
486 
487 	free_threads();
488 }
489 
490 static int
491 channel_create(void *io_device, void *ctx_buf)
492 {
493 	int *ch_count = io_device;
494 
495 	(*ch_count)++;
496 	return 0;
497 }
498 
499 static void
500 channel_destroy(void *io_device, void *ctx_buf)
501 {
502 	int *ch_count = io_device;
503 
504 	(*ch_count)--;
505 }
506 
507 static void
508 channel_msg(struct spdk_io_channel_iter *i)
509 {
510 	int *msg_count = spdk_io_channel_iter_get_ctx(i);
511 
512 	(*msg_count)++;
513 	spdk_for_each_channel_continue(i, 0);
514 }
515 
516 static void
517 channel_cpl(struct spdk_io_channel_iter *i, int status)
518 {
519 	int *msg_count = spdk_io_channel_iter_get_ctx(i);
520 
521 	(*msg_count)++;
522 }
523 
524 static void
525 for_each_channel_remove(void)
526 {
527 	struct spdk_io_channel *ch0, *ch1, *ch2;
528 	int ch_count = 0;
529 	int msg_count = 0;
530 
531 	allocate_threads(3);
532 	set_thread(0);
533 	spdk_io_device_register(&ch_count, channel_create, channel_destroy, sizeof(int), NULL);
534 	ch0 = spdk_get_io_channel(&ch_count);
535 	set_thread(1);
536 	ch1 = spdk_get_io_channel(&ch_count);
537 	set_thread(2);
538 	ch2 = spdk_get_io_channel(&ch_count);
539 	CU_ASSERT(ch_count == 3);
540 
541 	/*
542 	 * Test that io_channel handles the case where we start to iterate through
543 	 *  the channels, and during the iteration, one of the channels is deleted.
544 	 * This is done in some different and sometimes non-intuitive orders, because
545 	 *  some operations are deferred and won't execute until their threads are
546 	 *  polled.
547 	 *
548 	 * Case #1: Put the I/O channel before spdk_for_each_channel.
549 	 */
550 	set_thread(0);
551 	spdk_put_io_channel(ch0);
552 	CU_ASSERT(ch_count == 3);
553 	poll_threads();
554 	CU_ASSERT(ch_count == 2);
555 	spdk_for_each_channel(&ch_count, channel_msg, &msg_count, channel_cpl);
556 	CU_ASSERT(msg_count == 0);
557 	poll_threads();
558 	CU_ASSERT(msg_count == 3);
559 
560 	msg_count = 0;
561 
562 	/*
563 	 * Case #2: Put the I/O channel after spdk_for_each_channel, but before
564 	 *  thread 0 is polled.
565 	 */
566 	ch0 = spdk_get_io_channel(&ch_count);
567 	CU_ASSERT(ch_count == 3);
568 	spdk_for_each_channel(&ch_count, channel_msg, &msg_count, channel_cpl);
569 	spdk_put_io_channel(ch0);
570 	CU_ASSERT(ch_count == 3);
571 
572 	poll_threads();
573 	CU_ASSERT(ch_count == 2);
574 	CU_ASSERT(msg_count == 4);
575 	set_thread(1);
576 	spdk_put_io_channel(ch1);
577 	CU_ASSERT(ch_count == 2);
578 	set_thread(2);
579 	spdk_put_io_channel(ch2);
580 	CU_ASSERT(ch_count == 2);
581 	poll_threads();
582 	CU_ASSERT(ch_count == 0);
583 
584 	spdk_io_device_unregister(&ch_count, NULL);
585 	poll_threads();
586 
587 	free_threads();
588 }
589 
590 struct unreg_ctx {
591 	bool	ch_done;
592 	bool	foreach_done;
593 };
594 
595 static void
596 unreg_ch_done(struct spdk_io_channel_iter *i)
597 {
598 	struct unreg_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
599 
600 	ctx->ch_done = true;
601 
602 	SPDK_CU_ASSERT_FATAL(i->cur_thread != NULL);
603 	spdk_for_each_channel_continue(i, 0);
604 }
605 
606 static void
607 unreg_foreach_done(struct spdk_io_channel_iter *i, int status)
608 {
609 	struct unreg_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
610 
611 	ctx->foreach_done = true;
612 }
613 
614 static void
615 for_each_channel_unreg(void)
616 {
617 	struct spdk_io_channel *ch0;
618 	struct io_device *dev;
619 	struct unreg_ctx ctx = {};
620 	int io_target = 0;
621 
622 	allocate_threads(1);
623 	set_thread(0);
624 	CU_ASSERT(RB_EMPTY(&g_io_devices));
625 	spdk_io_device_register(&io_target, channel_create, channel_destroy, sizeof(int), NULL);
626 	CU_ASSERT(!RB_EMPTY(&g_io_devices));
627 	dev = RB_MIN(io_device_tree, &g_io_devices);
628 	SPDK_CU_ASSERT_FATAL(dev != NULL);
629 	CU_ASSERT(RB_NEXT(io_device_tree, &g_io_devices, dev) == NULL);
630 	ch0 = spdk_get_io_channel(&io_target);
631 
632 	spdk_io_device_register(&io_target, channel_create, channel_destroy, sizeof(int), NULL);
633 
634 	/*
635 	 * There is already a device registered at &io_target, so a new io_device should not
636 	 *  have been added to g_io_devices.
637 	 */
638 	CU_ASSERT(dev == RB_MIN(io_device_tree, &g_io_devices));
639 	CU_ASSERT(RB_NEXT(io_device_tree, &g_io_devices, dev) == NULL);
640 
641 	spdk_for_each_channel(&io_target, unreg_ch_done, &ctx, unreg_foreach_done);
642 	spdk_io_device_unregister(&io_target, NULL);
643 	/*
644 	 * There is an outstanding foreach call on the io_device, so the unregister should not
645 	 *  have immediately removed the device.
646 	 */
647 	CU_ASSERT(dev == RB_MIN(io_device_tree, &g_io_devices));
648 
649 	poll_thread(0);
650 	CU_ASSERT(ctx.ch_done == true);
651 	CU_ASSERT(ctx.foreach_done == true);
652 
653 	/*
654 	 * There are no more foreach operations outstanding, so the device should be
655 	 * unregistered.
656 	 */
657 	CU_ASSERT(RB_EMPTY(&g_io_devices));
658 
659 	set_thread(0);
660 	spdk_put_io_channel(ch0);
661 
662 	poll_threads();
663 
664 	free_threads();
665 }
666 
667 static void
668 thread_name(void)
669 {
670 	struct spdk_thread *thread;
671 	const char *name;
672 
673 	spdk_thread_lib_init(NULL, 0);
674 
675 	/* Create thread with no name, which automatically generates one */
676 	thread = spdk_thread_create(NULL, NULL);
677 	spdk_set_thread(thread);
678 	thread = spdk_get_thread();
679 	SPDK_CU_ASSERT_FATAL(thread != NULL);
680 	name = spdk_thread_get_name(thread);
681 	CU_ASSERT(name != NULL);
682 	spdk_thread_exit(thread);
683 	while (!spdk_thread_is_exited(thread)) {
684 		spdk_thread_poll(thread, 0, 0);
685 	}
686 	spdk_thread_destroy(thread);
687 
688 	/* Create thread named "test_thread" */
689 	thread = spdk_thread_create("test_thread", NULL);
690 	spdk_set_thread(thread);
691 	thread = spdk_get_thread();
692 	SPDK_CU_ASSERT_FATAL(thread != NULL);
693 	name = spdk_thread_get_name(thread);
694 	SPDK_CU_ASSERT_FATAL(name != NULL);
695 	CU_ASSERT(strcmp(name, "test_thread") == 0);
696 	spdk_thread_exit(thread);
697 	while (!spdk_thread_is_exited(thread)) {
698 		spdk_thread_poll(thread, 0, 0);
699 	}
700 	spdk_thread_destroy(thread);
701 
702 	spdk_thread_lib_fini();
703 }
704 
705 static uint64_t g_device1;
706 static uint64_t g_device2;
707 static uint64_t g_device3;
708 
709 static uint64_t g_ctx1 = 0x1111;
710 static uint64_t g_ctx2 = 0x2222;
711 
712 static int g_create_cb_calls = 0;
713 static int g_destroy_cb_calls = 0;
714 
715 static int
716 create_cb_1(void *io_device, void *ctx_buf)
717 {
718 	CU_ASSERT(io_device == &g_device1);
719 	*(uint64_t *)ctx_buf = g_ctx1;
720 	g_create_cb_calls++;
721 	return 0;
722 }
723 
724 static void
725 destroy_cb_1(void *io_device, void *ctx_buf)
726 {
727 	CU_ASSERT(io_device == &g_device1);
728 	CU_ASSERT(*(uint64_t *)ctx_buf == g_ctx1);
729 	g_destroy_cb_calls++;
730 }
731 
732 static int
733 create_cb_2(void *io_device, void *ctx_buf)
734 {
735 	CU_ASSERT(io_device == &g_device2);
736 	*(uint64_t *)ctx_buf = g_ctx2;
737 	g_create_cb_calls++;
738 	return 0;
739 }
740 
741 static void
742 destroy_cb_2(void *io_device, void *ctx_buf)
743 {
744 	CU_ASSERT(io_device == &g_device2);
745 	CU_ASSERT(*(uint64_t *)ctx_buf == g_ctx2);
746 	g_destroy_cb_calls++;
747 }
748 
749 static void
750 channel(void)
751 {
752 	struct spdk_io_channel *ch1, *ch2;
753 	void *ctx;
754 
755 	allocate_threads(1);
756 	set_thread(0);
757 
758 	spdk_io_device_register(&g_device1, create_cb_1, destroy_cb_1, sizeof(g_ctx1), NULL);
759 	spdk_io_device_register(&g_device2, create_cb_2, destroy_cb_2, sizeof(g_ctx2), NULL);
760 
761 	g_create_cb_calls = 0;
762 	ch1 = spdk_get_io_channel(&g_device1);
763 	CU_ASSERT(g_create_cb_calls == 1);
764 	SPDK_CU_ASSERT_FATAL(ch1 != NULL);
765 	CU_ASSERT(spdk_io_channel_get_io_device(ch1) == &g_device1);
766 
767 	g_create_cb_calls = 0;
768 	ch2 = spdk_get_io_channel(&g_device1);
769 	CU_ASSERT(g_create_cb_calls == 0);
770 	CU_ASSERT(ch1 == ch2);
771 	SPDK_CU_ASSERT_FATAL(ch2 != NULL);
772 	CU_ASSERT(spdk_io_channel_get_io_device(ch2) == &g_device1);
773 
774 	g_destroy_cb_calls = 0;
775 	spdk_put_io_channel(ch2);
776 	poll_threads();
777 	CU_ASSERT(g_destroy_cb_calls == 0);
778 
779 	g_create_cb_calls = 0;
780 	ch2 = spdk_get_io_channel(&g_device2);
781 	CU_ASSERT(g_create_cb_calls == 1);
782 	CU_ASSERT(ch1 != ch2);
783 	SPDK_CU_ASSERT_FATAL(ch2 != NULL);
784 	CU_ASSERT(spdk_io_channel_get_io_device(ch2) == &g_device2);
785 
786 	ctx = spdk_io_channel_get_ctx(ch2);
787 	CU_ASSERT(*(uint64_t *)ctx == g_ctx2);
788 
789 	g_destroy_cb_calls = 0;
790 	spdk_put_io_channel(ch1);
791 	poll_threads();
792 	CU_ASSERT(g_destroy_cb_calls == 1);
793 
794 	g_destroy_cb_calls = 0;
795 	spdk_put_io_channel(ch2);
796 	poll_threads();
797 	CU_ASSERT(g_destroy_cb_calls == 1);
798 
799 	ch1 = spdk_get_io_channel(&g_device3);
800 	CU_ASSERT(ch1 == NULL);
801 
802 	spdk_io_device_unregister(&g_device1, NULL);
803 	poll_threads();
804 	spdk_io_device_unregister(&g_device2, NULL);
805 	poll_threads();
806 	CU_ASSERT(RB_EMPTY(&g_io_devices));
807 	free_threads();
808 	CU_ASSERT(TAILQ_EMPTY(&g_threads));
809 }
810 
811 static int
812 create_cb(void *io_device, void *ctx_buf)
813 {
814 	uint64_t *refcnt = (uint64_t *)ctx_buf;
815 
816 	CU_ASSERT(*refcnt == 0);
817 	*refcnt = 1;
818 
819 	return 0;
820 }
821 
822 static void
823 destroy_cb(void *io_device, void *ctx_buf)
824 {
825 	uint64_t *refcnt = (uint64_t *)ctx_buf;
826 
827 	CU_ASSERT(*refcnt == 1);
828 	*refcnt = 0;
829 }
830 
831 /**
832  * This test is checking that a sequence of get, put, get, put without allowing
833  * the deferred put operation to complete doesn't result in releasing the memory
834  * for the channel twice.
835  */
836 static void
837 channel_destroy_races(void)
838 {
839 	uint64_t device;
840 	struct spdk_io_channel *ch;
841 
842 	allocate_threads(1);
843 	set_thread(0);
844 
845 	spdk_io_device_register(&device, create_cb, destroy_cb, sizeof(uint64_t), NULL);
846 
847 	ch = spdk_get_io_channel(&device);
848 	SPDK_CU_ASSERT_FATAL(ch != NULL);
849 
850 	spdk_put_io_channel(ch);
851 
852 	ch = spdk_get_io_channel(&device);
853 	SPDK_CU_ASSERT_FATAL(ch != NULL);
854 
855 	spdk_put_io_channel(ch);
856 	poll_threads();
857 
858 	spdk_io_device_unregister(&device, NULL);
859 	poll_threads();
860 
861 	CU_ASSERT(RB_EMPTY(&g_io_devices));
862 	free_threads();
863 	CU_ASSERT(TAILQ_EMPTY(&g_threads));
864 }
865 
866 static void
867 thread_exit_test(void)
868 {
869 	struct spdk_thread *thread;
870 	struct spdk_io_channel *ch;
871 	struct spdk_poller *poller1, *poller2;
872 	void *ctx;
873 	bool done1 = false, done2 = false, poller1_run = false, poller2_run = false;
874 	int rc __attribute__((unused));
875 
876 	MOCK_SET(spdk_get_ticks, 10);
877 	MOCK_SET(spdk_get_ticks_hz, 1);
878 
879 	allocate_threads(4);
880 
881 	/* Test if all pending messages are reaped for the exiting thread, and the
882 	 * thread moves to the exited state.
883 	 */
884 	set_thread(0);
885 	thread = spdk_get_thread();
886 
887 	/* Sending message to thread 0 will be accepted. */
888 	rc = spdk_thread_send_msg(thread, send_msg_cb, &done1);
889 	CU_ASSERT(rc == 0);
890 	CU_ASSERT(!done1);
891 
892 	/* Move thread 0 to the exiting state. */
893 	spdk_thread_exit(thread);
894 
895 	CU_ASSERT(spdk_thread_is_exited(thread) == false);
896 
897 	/* Sending message to thread 0 will be still accepted. */
898 	rc = spdk_thread_send_msg(thread, send_msg_cb, &done2);
899 	CU_ASSERT(rc == 0);
900 
901 	/* Thread 0 will reap pending messages. */
902 	poll_thread(0);
903 	CU_ASSERT(done1 == true);
904 	CU_ASSERT(done2 == true);
905 
906 	/* Thread 0 will move to the exited state. */
907 	CU_ASSERT(spdk_thread_is_exited(thread) == true);
908 
909 	/* Test releasing I/O channel is reaped even after the thread moves to
910 	 * the exiting state
911 	 */
912 	set_thread(1);
913 
914 	spdk_io_device_register(&g_device1, create_cb_1, destroy_cb_1, sizeof(g_ctx1), NULL);
915 
916 	g_create_cb_calls = 0;
917 	ch = spdk_get_io_channel(&g_device1);
918 	CU_ASSERT(g_create_cb_calls == 1);
919 	SPDK_CU_ASSERT_FATAL(ch != NULL);
920 
921 	ctx = spdk_io_channel_get_ctx(ch);
922 	CU_ASSERT(*(uint64_t *)ctx == g_ctx1);
923 
924 	g_destroy_cb_calls = 0;
925 	spdk_put_io_channel(ch);
926 
927 	thread = spdk_get_thread();
928 	spdk_thread_exit(thread);
929 
930 	/* Thread 1 will not move to the exited state yet because I/O channel release
931 	 * does not complete yet.
932 	 */
933 	CU_ASSERT(spdk_thread_is_exited(thread) == false);
934 
935 	/* Thread 1 will be able to get the another reference of I/O channel
936 	 * even after the thread moves to the exiting state.
937 	 */
938 	g_create_cb_calls = 0;
939 	ch = spdk_get_io_channel(&g_device1);
940 
941 	CU_ASSERT(g_create_cb_calls == 0);
942 	SPDK_CU_ASSERT_FATAL(ch != NULL);
943 
944 	ctx = spdk_io_channel_get_ctx(ch);
945 	CU_ASSERT(*(uint64_t *)ctx == g_ctx1);
946 
947 	spdk_put_io_channel(ch);
948 
949 	poll_threads();
950 	CU_ASSERT(g_destroy_cb_calls == 1);
951 
952 	/* Thread 1 will move to the exited state after I/O channel is released.
953 	 * are released.
954 	 */
955 	CU_ASSERT(spdk_thread_is_exited(thread) == true);
956 
957 	spdk_io_device_unregister(&g_device1, NULL);
958 	poll_threads();
959 
960 	/* Test if unregistering poller is reaped for the exiting thread, and the
961 	 * thread moves to the exited thread.
962 	 */
963 	set_thread(2);
964 	thread = spdk_get_thread();
965 
966 	poller1 = spdk_poller_register(poller_run_done, &poller1_run, 0);
967 	CU_ASSERT(poller1 != NULL);
968 
969 	spdk_poller_unregister(&poller1);
970 
971 	spdk_thread_exit(thread);
972 
973 	poller2 = spdk_poller_register(poller_run_done, &poller2_run, 0);
974 
975 	poll_threads();
976 
977 	CU_ASSERT(poller1_run == false);
978 	CU_ASSERT(poller2_run == true);
979 
980 	CU_ASSERT(spdk_thread_is_exited(thread) == false);
981 
982 	spdk_poller_unregister(&poller2);
983 
984 	poll_threads();
985 
986 	CU_ASSERT(spdk_thread_is_exited(thread) == true);
987 
988 	/* Test if the exiting thread is exited forcefully after timeout. */
989 	set_thread(3);
990 	thread = spdk_get_thread();
991 
992 	poller1 = spdk_poller_register(poller_run_done, &poller1_run, 0);
993 	CU_ASSERT(poller1 != NULL);
994 
995 	spdk_thread_exit(thread);
996 
997 	CU_ASSERT(spdk_thread_is_exited(thread) == false);
998 
999 	MOCK_SET(spdk_get_ticks, 11);
1000 
1001 	poll_threads();
1002 
1003 	CU_ASSERT(spdk_thread_is_exited(thread) == false);
1004 
1005 	/* Cause timeout forcefully. */
1006 	MOCK_SET(spdk_get_ticks, 15);
1007 
1008 	poll_threads();
1009 
1010 	CU_ASSERT(spdk_thread_is_exited(thread) == true);
1011 
1012 	spdk_poller_unregister(&poller1);
1013 
1014 	poll_threads();
1015 
1016 	MOCK_CLEAR(spdk_get_ticks);
1017 	MOCK_CLEAR(spdk_get_ticks_hz);
1018 
1019 	free_threads();
1020 }
1021 
1022 static int
1023 poller_run_idle(void *ctx)
1024 {
1025 	uint64_t delay_us = (uint64_t)ctx;
1026 
1027 	spdk_delay_us(delay_us);
1028 
1029 	return 0;
1030 }
1031 
1032 static int
1033 poller_run_busy(void *ctx)
1034 {
1035 	uint64_t delay_us = (uint64_t)ctx;
1036 
1037 	spdk_delay_us(delay_us);
1038 
1039 	return 1;
1040 }
1041 
1042 static void
1043 thread_update_stats_test(void)
1044 {
1045 	struct spdk_poller	*poller;
1046 	struct spdk_thread	*thread;
1047 
1048 	MOCK_SET(spdk_get_ticks, 10);
1049 
1050 	allocate_threads(1);
1051 
1052 	set_thread(0);
1053 	thread = spdk_get_thread();
1054 
1055 	CU_ASSERT(thread->tsc_last == 10);
1056 	CU_ASSERT(thread->stats.idle_tsc == 0);
1057 	CU_ASSERT(thread->stats.busy_tsc == 0);
1058 
1059 	/* Test if idle_tsc is updated expectedly. */
1060 	poller = spdk_poller_register(poller_run_idle, (void *)1000, 0);
1061 	CU_ASSERT(poller != NULL);
1062 
1063 	spdk_delay_us(100);
1064 
1065 	poll_thread_times(0, 1);
1066 
1067 	CU_ASSERT(thread->tsc_last == 1110);
1068 	CU_ASSERT(thread->stats.idle_tsc == 1000);
1069 	CU_ASSERT(thread->stats.busy_tsc == 0);
1070 
1071 	spdk_delay_us(100);
1072 
1073 	poll_thread_times(0, 1);
1074 
1075 	CU_ASSERT(thread->tsc_last == 2210);
1076 	CU_ASSERT(thread->stats.idle_tsc == 2000);
1077 	CU_ASSERT(thread->stats.busy_tsc == 0);
1078 
1079 	spdk_poller_unregister(&poller);
1080 
1081 	/* Test if busy_tsc is updated expectedly. */
1082 	poller = spdk_poller_register(poller_run_busy, (void *)100000, 0);
1083 	CU_ASSERT(poller != NULL);
1084 
1085 	spdk_delay_us(10000);
1086 
1087 	poll_thread_times(0, 1);
1088 
1089 	CU_ASSERT(thread->tsc_last == 112210);
1090 	CU_ASSERT(thread->stats.idle_tsc == 2000);
1091 	CU_ASSERT(thread->stats.busy_tsc == 100000);
1092 
1093 	spdk_delay_us(10000);
1094 
1095 	poll_thread_times(0, 1);
1096 
1097 	CU_ASSERT(thread->tsc_last == 222210);
1098 	CU_ASSERT(thread->stats.idle_tsc == 2000);
1099 	CU_ASSERT(thread->stats.busy_tsc == 200000);
1100 
1101 	spdk_poller_unregister(&poller);
1102 
1103 	MOCK_CLEAR(spdk_get_ticks);
1104 
1105 	free_threads();
1106 }
1107 
1108 struct ut_nested_ch {
1109 	struct spdk_io_channel *child;
1110 	struct spdk_poller *poller;
1111 };
1112 
1113 struct ut_nested_dev {
1114 	struct ut_nested_dev *child;
1115 };
1116 
1117 static int
1118 ut_null_poll(void *ctx)
1119 {
1120 	return -1;
1121 }
1122 
1123 static int
1124 ut_nested_ch_create_cb(void *io_device, void *ctx_buf)
1125 {
1126 	struct ut_nested_ch *_ch = ctx_buf;
1127 	struct ut_nested_dev *_dev = io_device;
1128 	struct ut_nested_dev *_child;
1129 
1130 	_child = _dev->child;
1131 
1132 	if (_child != NULL) {
1133 		_ch->child = spdk_get_io_channel(_child);
1134 		SPDK_CU_ASSERT_FATAL(_ch->child != NULL);
1135 	} else {
1136 		_ch->child = NULL;
1137 	}
1138 
1139 	_ch->poller = spdk_poller_register(ut_null_poll, NULL, 0);
1140 	SPDK_CU_ASSERT_FATAL(_ch->poller != NULL);
1141 
1142 	return 0;
1143 }
1144 
1145 static void
1146 ut_nested_ch_destroy_cb(void *io_device, void *ctx_buf)
1147 {
1148 	struct ut_nested_ch *_ch = ctx_buf;
1149 	struct spdk_io_channel *child;
1150 
1151 	child = _ch->child;
1152 	if (child != NULL) {
1153 		spdk_put_io_channel(child);
1154 	}
1155 
1156 	spdk_poller_unregister(&_ch->poller);
1157 }
1158 
1159 static void
1160 ut_check_nested_ch_create(struct spdk_io_channel *ch, struct io_device *dev)
1161 {
1162 	CU_ASSERT(ch->ref == 1);
1163 	CU_ASSERT(ch->dev == dev);
1164 	CU_ASSERT(dev->refcnt == 1);
1165 }
1166 
1167 static void
1168 ut_check_nested_ch_destroy_pre(struct spdk_io_channel *ch, struct io_device *dev)
1169 {
1170 	CU_ASSERT(ch->ref == 0);
1171 	CU_ASSERT(ch->destroy_ref == 1);
1172 	CU_ASSERT(dev->refcnt == 1);
1173 }
1174 
1175 static void
1176 ut_check_nested_ch_destroy_post(struct io_device *dev)
1177 {
1178 	CU_ASSERT(dev->refcnt == 0);
1179 }
1180 
1181 static void
1182 ut_check_nested_poller_register(struct spdk_poller *poller)
1183 {
1184 	SPDK_CU_ASSERT_FATAL(poller != NULL);
1185 }
1186 
1187 static void
1188 nested_channel(void)
1189 {
1190 	struct ut_nested_dev _dev1, _dev2, _dev3;
1191 	struct ut_nested_ch *_ch1, *_ch2, *_ch3;
1192 	struct io_device *dev1, *dev2, *dev3;
1193 	struct spdk_io_channel *ch1, *ch2, *ch3;
1194 	struct spdk_poller *poller;
1195 	struct spdk_thread *thread;
1196 
1197 	allocate_threads(1);
1198 	set_thread(0);
1199 
1200 	thread = spdk_get_thread();
1201 	SPDK_CU_ASSERT_FATAL(thread != NULL);
1202 
1203 	_dev1.child = &_dev2;
1204 	_dev2.child = &_dev3;
1205 	_dev3.child = NULL;
1206 
1207 	spdk_io_device_register(&_dev1, ut_nested_ch_create_cb, ut_nested_ch_destroy_cb,
1208 				sizeof(struct ut_nested_ch), "dev1");
1209 	spdk_io_device_register(&_dev2, ut_nested_ch_create_cb, ut_nested_ch_destroy_cb,
1210 				sizeof(struct ut_nested_ch), "dev2");
1211 	spdk_io_device_register(&_dev3, ut_nested_ch_create_cb, ut_nested_ch_destroy_cb,
1212 				sizeof(struct ut_nested_ch), "dev3");
1213 
1214 	dev1 = io_device_get(&_dev1);
1215 	SPDK_CU_ASSERT_FATAL(dev1 != NULL);
1216 	dev2 = io_device_get(&_dev2);
1217 	SPDK_CU_ASSERT_FATAL(dev2 != NULL);
1218 	dev3 = io_device_get(&_dev3);
1219 	SPDK_CU_ASSERT_FATAL(dev3 != NULL);
1220 
1221 	/* A single call spdk_get_io_channel() to dev1 will also create channels
1222 	 * to dev2 and dev3 continuously. Pollers will be registered together.
1223 	 */
1224 	ch1 = spdk_get_io_channel(&_dev1);
1225 	SPDK_CU_ASSERT_FATAL(ch1 != NULL);
1226 
1227 	_ch1 = spdk_io_channel_get_ctx(ch1);
1228 	ch2 = _ch1->child;
1229 	SPDK_CU_ASSERT_FATAL(ch2 != NULL);
1230 
1231 	_ch2 = spdk_io_channel_get_ctx(ch2);
1232 	ch3 = _ch2->child;
1233 	SPDK_CU_ASSERT_FATAL(ch3 != NULL);
1234 
1235 	_ch3 = spdk_io_channel_get_ctx(ch3);
1236 	CU_ASSERT(_ch3->child == NULL);
1237 
1238 	ut_check_nested_ch_create(ch1, dev1);
1239 	ut_check_nested_ch_create(ch2, dev2);
1240 	ut_check_nested_ch_create(ch3, dev3);
1241 
1242 	poller = spdk_poller_register(ut_null_poll, NULL, 0);
1243 
1244 	ut_check_nested_poller_register(poller);
1245 	ut_check_nested_poller_register(_ch1->poller);
1246 	ut_check_nested_poller_register(_ch2->poller);
1247 	ut_check_nested_poller_register(_ch3->poller);
1248 
1249 	spdk_poller_unregister(&poller);
1250 	poll_thread_times(0, 1);
1251 
1252 	/* A single call spdk_put_io_channel() to dev1 will also destroy channels
1253 	 * to dev2 and dev3 continuously. Pollers will be unregistered together.
1254 	 */
1255 	spdk_put_io_channel(ch1);
1256 
1257 	/* Start exiting the current thread after unregistering the non-nested
1258 	 * I/O channel.
1259 	 */
1260 	spdk_thread_exit(thread);
1261 
1262 	ut_check_nested_ch_destroy_pre(ch1, dev1);
1263 	poll_thread_times(0, 1);
1264 	ut_check_nested_ch_destroy_post(dev1);
1265 
1266 	CU_ASSERT(spdk_thread_is_exited(thread) == false);
1267 
1268 	ut_check_nested_ch_destroy_pre(ch2, dev2);
1269 	poll_thread_times(0, 1);
1270 	ut_check_nested_ch_destroy_post(dev2);
1271 
1272 	CU_ASSERT(spdk_thread_is_exited(thread) == false);
1273 
1274 	ut_check_nested_ch_destroy_pre(ch3, dev3);
1275 	poll_thread_times(0, 1);
1276 	ut_check_nested_ch_destroy_post(dev3);
1277 
1278 	CU_ASSERT(spdk_thread_is_exited(thread) == true);
1279 
1280 	spdk_io_device_unregister(&_dev1, NULL);
1281 	spdk_io_device_unregister(&_dev2, NULL);
1282 	spdk_io_device_unregister(&_dev3, NULL);
1283 	CU_ASSERT(RB_EMPTY(&g_io_devices));
1284 
1285 	free_threads();
1286 	CU_ASSERT(TAILQ_EMPTY(&g_threads));
1287 }
1288 
1289 static int
1290 create_cb2(void *io_device, void *ctx_buf)
1291 {
1292 	uint64_t *devcnt = (uint64_t *)io_device;
1293 
1294 	*devcnt += 1;
1295 
1296 	return 0;
1297 }
1298 
1299 static void
1300 destroy_cb2(void *io_device, void *ctx_buf)
1301 {
1302 	uint64_t *devcnt = (uint64_t *)io_device;
1303 
1304 	CU_ASSERT(*devcnt > 0);
1305 	*devcnt -= 1;
1306 }
1307 
1308 static void
1309 unregister_cb2(void *io_device)
1310 {
1311 	uint64_t *devcnt = (uint64_t *)io_device;
1312 
1313 	CU_ASSERT(*devcnt == 0);
1314 }
1315 
1316 static void
1317 device_unregister_and_thread_exit_race(void)
1318 {
1319 	uint64_t device = 0;
1320 	struct spdk_io_channel *ch1, *ch2;
1321 	struct spdk_thread *thread1, *thread2;
1322 
1323 	/* Create two threads and each thread gets a channel from the same device. */
1324 	allocate_threads(2);
1325 	set_thread(0);
1326 
1327 	thread1 = spdk_get_thread();
1328 	SPDK_CU_ASSERT_FATAL(thread1 != NULL);
1329 
1330 	spdk_io_device_register(&device, create_cb2, destroy_cb2, sizeof(uint64_t), NULL);
1331 
1332 	ch1 = spdk_get_io_channel(&device);
1333 	SPDK_CU_ASSERT_FATAL(ch1 != NULL);
1334 
1335 	set_thread(1);
1336 
1337 	thread2 = spdk_get_thread();
1338 	SPDK_CU_ASSERT_FATAL(thread2 != NULL);
1339 
1340 	ch2 = spdk_get_io_channel(&device);
1341 	SPDK_CU_ASSERT_FATAL(ch2 != NULL);
1342 
1343 	set_thread(0);
1344 
1345 	/* Move thread 0 to the exiting state, but it should keep exiting until two channels
1346 	 * and a device are released.
1347 	 */
1348 	spdk_thread_exit(thread1);
1349 	poll_thread(0);
1350 
1351 	spdk_put_io_channel(ch1);
1352 
1353 	spdk_io_device_unregister(&device, unregister_cb2);
1354 	poll_thread(0);
1355 
1356 	CU_ASSERT(spdk_thread_is_exited(thread1) == false);
1357 
1358 	set_thread(1);
1359 
1360 	/* Move thread 1 to the exiting state, but it should keep exiting until its channel
1361 	 * is released.
1362 	 */
1363 	spdk_thread_exit(thread2);
1364 	poll_thread(1);
1365 
1366 	CU_ASSERT(spdk_thread_is_exited(thread2) == false);
1367 
1368 	spdk_put_io_channel(ch2);
1369 	poll_thread(1);
1370 
1371 	CU_ASSERT(spdk_thread_is_exited(thread1) == false);
1372 	CU_ASSERT(spdk_thread_is_exited(thread2) == true);
1373 
1374 	poll_thread(0);
1375 
1376 	CU_ASSERT(spdk_thread_is_exited(thread1) == true);
1377 
1378 	free_threads();
1379 }
1380 
1381 static int
1382 dummy_poller(void *arg)
1383 {
1384 	return SPDK_POLLER_IDLE;
1385 }
1386 
1387 static void
1388 cache_closest_timed_poller(void)
1389 {
1390 	struct spdk_thread *thread;
1391 	struct spdk_poller *poller1, *poller2, *poller3, *tmp;
1392 
1393 	allocate_threads(1);
1394 	set_thread(0);
1395 
1396 	thread = spdk_get_thread();
1397 	SPDK_CU_ASSERT_FATAL(thread != NULL);
1398 
1399 	poller1 = spdk_poller_register(dummy_poller, NULL, 1000);
1400 	SPDK_CU_ASSERT_FATAL(poller1 != NULL);
1401 
1402 	poller2 = spdk_poller_register(dummy_poller, NULL, 1500);
1403 	SPDK_CU_ASSERT_FATAL(poller2 != NULL);
1404 
1405 	poller3 = spdk_poller_register(dummy_poller, NULL, 1800);
1406 	SPDK_CU_ASSERT_FATAL(poller3 != NULL);
1407 
1408 	poll_threads();
1409 
1410 	/* When multiple timed pollers are inserted, the cache should
1411 	 * have the closest timed poller.
1412 	 */
1413 	CU_ASSERT(thread->first_timed_poller == poller1);
1414 	CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == poller1);
1415 
1416 	spdk_delay_us(1000);
1417 	poll_threads();
1418 
1419 	CU_ASSERT(thread->first_timed_poller == poller2);
1420 	CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == poller2);
1421 
1422 	/* If we unregister a timed poller by spdk_poller_unregister()
1423 	 * when it is waiting, it is marked as being unregistered and
1424 	 * is actually unregistered when it is expired.
1425 	 *
1426 	 * Hence if we unregister the closest timed poller when it is waiting,
1427 	 * the cache is not updated to the next timed poller until it is expired.
1428 	 */
1429 	tmp = poller2;
1430 
1431 	spdk_poller_unregister(&poller2);
1432 	CU_ASSERT(poller2 == NULL);
1433 
1434 	spdk_delay_us(499);
1435 	poll_threads();
1436 
1437 	CU_ASSERT(thread->first_timed_poller == tmp);
1438 	CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == tmp);
1439 
1440 	spdk_delay_us(1);
1441 	poll_threads();
1442 
1443 	CU_ASSERT(thread->first_timed_poller == poller3);
1444 	CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == poller3);
1445 
1446 	/* If we pause a timed poller by spdk_poller_pause() when it is waiting,
1447 	 * it is marked as being paused and is actually paused when it is expired.
1448 	 *
1449 	 * Hence if we pause the closest timed poller when it is waiting, the cache
1450 	 * is not updated to the next timed poller until it is expired.
1451 	 */
1452 	spdk_poller_pause(poller3);
1453 
1454 	spdk_delay_us(299);
1455 	poll_threads();
1456 
1457 	CU_ASSERT(thread->first_timed_poller == poller3);
1458 	CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == poller3);
1459 
1460 	spdk_delay_us(1);
1461 	poll_threads();
1462 
1463 	CU_ASSERT(thread->first_timed_poller == poller1);
1464 	CU_ASSERT(RB_MIN(timed_pollers_tree, &thread->timed_pollers) == poller1);
1465 
1466 	/* After unregistering all timed pollers, the cache should
1467 	 * be NULL.
1468 	 */
1469 	spdk_poller_unregister(&poller1);
1470 	spdk_poller_unregister(&poller3);
1471 
1472 	spdk_delay_us(200);
1473 	poll_threads();
1474 
1475 	CU_ASSERT(thread->first_timed_poller == NULL);
1476 	CU_ASSERT(RB_EMPTY(&thread->timed_pollers));
1477 
1478 	free_threads();
1479 }
1480 
1481 static void
1482 multi_timed_pollers_have_same_expiration(void)
1483 {
1484 	struct spdk_thread *thread;
1485 	struct spdk_poller *poller1, *poller2, *poller3, *poller4, *tmp;
1486 	uint64_t start_ticks;
1487 
1488 	allocate_threads(1);
1489 	set_thread(0);
1490 
1491 	thread = spdk_get_thread();
1492 	SPDK_CU_ASSERT_FATAL(thread != NULL);
1493 
1494 	/*
1495 	 * case 1: multiple timed pollers have the same next_run_tick.
1496 	 */
1497 	start_ticks = spdk_get_ticks();
1498 
1499 	poller1 = spdk_poller_register(dummy_poller, NULL, 500);
1500 	SPDK_CU_ASSERT_FATAL(poller1 != NULL);
1501 
1502 	poller2 = spdk_poller_register(dummy_poller, NULL, 500);
1503 	SPDK_CU_ASSERT_FATAL(poller2 != NULL);
1504 
1505 	poller3 = spdk_poller_register(dummy_poller, NULL, 1000);
1506 	SPDK_CU_ASSERT_FATAL(poller3 != NULL);
1507 
1508 	poller4 = spdk_poller_register(dummy_poller, NULL, 1500);
1509 	SPDK_CU_ASSERT_FATAL(poller4 != NULL);
1510 
1511 	/* poller1 and poller2 have the same next_run_tick but cache has poller1
1512 	 * because poller1 is registered earlier than poller2.
1513 	 */
1514 	CU_ASSERT(thread->first_timed_poller == poller1);
1515 	CU_ASSERT(poller1->next_run_tick == start_ticks + 500);
1516 	CU_ASSERT(poller2->next_run_tick == start_ticks + 500);
1517 	CU_ASSERT(poller3->next_run_tick == start_ticks + 1000);
1518 	CU_ASSERT(poller4->next_run_tick == start_ticks + 1500);
1519 
1520 	/* after 500 usec, poller1 and poller2 are expired. */
1521 	spdk_delay_us(500);
1522 	CU_ASSERT(spdk_get_ticks() == start_ticks + 500);
1523 	poll_threads();
1524 
1525 	/* poller1, poller2, and poller3 have the same next_run_tick but cache
1526 	 * has poller3 because poller3 is not expired yet.
1527 	 */
1528 	CU_ASSERT(thread->first_timed_poller == poller3);
1529 	CU_ASSERT(poller1->next_run_tick == start_ticks + 1000);
1530 	CU_ASSERT(poller2->next_run_tick == start_ticks + 1000);
1531 	CU_ASSERT(poller3->next_run_tick == start_ticks + 1000);
1532 	CU_ASSERT(poller4->next_run_tick == start_ticks + 1500);
1533 
1534 	/* after 500 usec, poller1, poller2, and poller3 are expired. */
1535 	spdk_delay_us(500);
1536 	CU_ASSERT(spdk_get_ticks() == start_ticks + 1000);
1537 	poll_threads();
1538 
1539 	/* poller1, poller2, and poller4 have the same next_run_tick but cache
1540 	 * has poller4 because poller4 is not expired yet.
1541 	 */
1542 	CU_ASSERT(thread->first_timed_poller == poller4);
1543 	CU_ASSERT(poller1->next_run_tick == start_ticks + 1500);
1544 	CU_ASSERT(poller2->next_run_tick == start_ticks + 1500);
1545 	CU_ASSERT(poller3->next_run_tick == start_ticks + 2000);
1546 	CU_ASSERT(poller4->next_run_tick == start_ticks + 1500);
1547 
1548 	/* after 500 usec, poller1, poller2, and poller4 are expired. */
1549 	spdk_delay_us(500);
1550 	CU_ASSERT(spdk_get_ticks() == start_ticks + 1500);
1551 	poll_threads();
1552 
1553 	/* poller1, poller2, and poller3 have the same next_run_tick but cache
1554 	 * has poller3 because poller3 is updated earlier than poller1 and poller2.
1555 	 */
1556 	CU_ASSERT(thread->first_timed_poller == poller3);
1557 	CU_ASSERT(poller1->next_run_tick == start_ticks + 2000);
1558 	CU_ASSERT(poller2->next_run_tick == start_ticks + 2000);
1559 	CU_ASSERT(poller3->next_run_tick == start_ticks + 2000);
1560 	CU_ASSERT(poller4->next_run_tick == start_ticks + 3000);
1561 
1562 	spdk_poller_unregister(&poller1);
1563 	spdk_poller_unregister(&poller2);
1564 	spdk_poller_unregister(&poller3);
1565 	spdk_poller_unregister(&poller4);
1566 
1567 	spdk_delay_us(1500);
1568 	CU_ASSERT(spdk_get_ticks() == start_ticks + 3000);
1569 	poll_threads();
1570 
1571 	CU_ASSERT(thread->first_timed_poller == NULL);
1572 	CU_ASSERT(RB_EMPTY(&thread->timed_pollers));
1573 
1574 	/*
1575 	 * case 2: unregister timed pollers while multiple timed pollers are registered.
1576 	 */
1577 	start_ticks = spdk_get_ticks();
1578 
1579 	poller1 = spdk_poller_register(dummy_poller, NULL, 500);
1580 	SPDK_CU_ASSERT_FATAL(poller1 != NULL);
1581 
1582 	CU_ASSERT(thread->first_timed_poller == poller1);
1583 	CU_ASSERT(poller1->next_run_tick == start_ticks + 500);
1584 
1585 	/* after 250 usec, register poller2 and poller3. */
1586 	spdk_delay_us(250);
1587 	CU_ASSERT(spdk_get_ticks() == start_ticks + 250);
1588 
1589 	poller2 = spdk_poller_register(dummy_poller, NULL, 500);
1590 	SPDK_CU_ASSERT_FATAL(poller2 != NULL);
1591 
1592 	poller3 = spdk_poller_register(dummy_poller, NULL, 750);
1593 	SPDK_CU_ASSERT_FATAL(poller3 != NULL);
1594 
1595 	CU_ASSERT(thread->first_timed_poller == poller1);
1596 	CU_ASSERT(poller1->next_run_tick == start_ticks + 500);
1597 	CU_ASSERT(poller2->next_run_tick == start_ticks + 750);
1598 	CU_ASSERT(poller3->next_run_tick == start_ticks + 1000);
1599 
1600 	/* unregister poller2 which is not the closest. */
1601 	tmp = poller2;
1602 	spdk_poller_unregister(&poller2);
1603 
1604 	/* after 250 usec, poller1 is expired. */
1605 	spdk_delay_us(250);
1606 	CU_ASSERT(spdk_get_ticks() == start_ticks + 500);
1607 	poll_threads();
1608 
1609 	/* poller2 is not unregistered yet because it is not expired. */
1610 	CU_ASSERT(thread->first_timed_poller == tmp);
1611 	CU_ASSERT(poller1->next_run_tick == start_ticks + 1000);
1612 	CU_ASSERT(tmp->next_run_tick == start_ticks + 750);
1613 	CU_ASSERT(poller3->next_run_tick == start_ticks + 1000);
1614 
1615 	spdk_delay_us(250);
1616 	CU_ASSERT(spdk_get_ticks() == start_ticks + 750);
1617 	poll_threads();
1618 
1619 	CU_ASSERT(thread->first_timed_poller == poller3);
1620 	CU_ASSERT(poller1->next_run_tick == start_ticks + 1000);
1621 	CU_ASSERT(poller3->next_run_tick == start_ticks + 1000);
1622 
1623 	spdk_poller_unregister(&poller3);
1624 
1625 	spdk_delay_us(250);
1626 	CU_ASSERT(spdk_get_ticks() == start_ticks + 1000);
1627 	poll_threads();
1628 
1629 	CU_ASSERT(thread->first_timed_poller == poller1);
1630 	CU_ASSERT(poller1->next_run_tick == start_ticks + 1500);
1631 
1632 	spdk_poller_unregister(&poller1);
1633 
1634 	spdk_delay_us(500);
1635 	CU_ASSERT(spdk_get_ticks() == start_ticks + 1500);
1636 	poll_threads();
1637 
1638 	CU_ASSERT(thread->first_timed_poller == NULL);
1639 	CU_ASSERT(RB_EMPTY(&thread->timed_pollers));
1640 
1641 	free_threads();
1642 }
1643 
1644 static int
1645 dummy_create_cb(void *io_device, void *ctx_buf)
1646 {
1647 	return 0;
1648 }
1649 
1650 static void
1651 dummy_destroy_cb(void *io_device, void *ctx_buf)
1652 {
1653 }
1654 
1655 /* We had a bug that the compare function for the io_device tree
1656  * did not work as expected because subtraction caused overflow
1657  * when the difference between two keys was more than 32 bits.
1658  * This test case verifies the fix for the bug.
1659  */
1660 static void
1661 io_device_lookup(void)
1662 {
1663 	struct io_device dev1, dev2, *dev;
1664 	struct spdk_io_channel *ch;
1665 
1666 	/* The compare function io_device_cmp() had a overflow bug.
1667 	 * Verify the fix first.
1668 	 */
1669 	dev1.io_device = (void *)0x7FFFFFFF;
1670 	dev2.io_device = NULL;
1671 	CU_ASSERT(io_device_cmp(&dev1, &dev2) > 0);
1672 	CU_ASSERT(io_device_cmp(&dev2, &dev1) < 0);
1673 
1674 	/* Check if overflow due to 32 bits does not occur. */
1675 	dev1.io_device = (void *)0x80000000;
1676 	CU_ASSERT(io_device_cmp(&dev1, &dev2) > 0);
1677 	CU_ASSERT(io_device_cmp(&dev2, &dev1) < 0);
1678 
1679 	dev1.io_device = (void *)0x100000000;
1680 	CU_ASSERT(io_device_cmp(&dev1, &dev2) > 0);
1681 	CU_ASSERT(io_device_cmp(&dev2, &dev1) < 0);
1682 
1683 	dev1.io_device = (void *)0x8000000000000000;
1684 	CU_ASSERT(io_device_cmp(&dev1, &dev2) > 0);
1685 	CU_ASSERT(io_device_cmp(&dev2, &dev1) < 0);
1686 
1687 	allocate_threads(1);
1688 	set_thread(0);
1689 
1690 	spdk_io_device_register((void *)0x1, dummy_create_cb, dummy_destroy_cb, 0, NULL);
1691 	spdk_io_device_register((void *)0x7FFFFFFF, dummy_create_cb, dummy_destroy_cb, 0, NULL);
1692 	spdk_io_device_register((void *)0x80000000, dummy_create_cb, dummy_destroy_cb, 0, NULL);
1693 	spdk_io_device_register((void *)0x100000000, dummy_create_cb, dummy_destroy_cb, 0, NULL);
1694 	spdk_io_device_register((void *)0x8000000000000000, dummy_create_cb, dummy_destroy_cb, 0, NULL);
1695 	spdk_io_device_register((void *)0x8000000100000000, dummy_create_cb, dummy_destroy_cb, 0, NULL);
1696 	spdk_io_device_register((void *)UINT64_MAX, dummy_create_cb, dummy_destroy_cb, 0, NULL);
1697 
1698 	/* RB_MIN and RB_NEXT should return devs in ascending order by addresses.
1699 	 * RB_FOREACH uses RB_MIN and RB_NEXT internally.
1700 	 */
1701 	dev = RB_MIN(io_device_tree, &g_io_devices);
1702 	SPDK_CU_ASSERT_FATAL(dev != NULL);
1703 	CU_ASSERT(dev->io_device == (void *)0x1);
1704 
1705 	dev = RB_NEXT(io_device_tree, &g_io_devices, dev);
1706 	SPDK_CU_ASSERT_FATAL(dev != NULL);
1707 	CU_ASSERT(dev->io_device == (void *)0x7FFFFFFF);
1708 
1709 	dev = RB_NEXT(io_device_tree, &g_io_devices, dev);
1710 	SPDK_CU_ASSERT_FATAL(dev != NULL);
1711 	CU_ASSERT(dev->io_device == (void *)0x80000000);
1712 
1713 	dev = RB_NEXT(io_device_tree, &g_io_devices, dev);
1714 	SPDK_CU_ASSERT_FATAL(dev != NULL);
1715 	CU_ASSERT(dev->io_device == (void *)0x100000000);
1716 
1717 	dev = RB_NEXT(io_device_tree, &g_io_devices, dev);
1718 	SPDK_CU_ASSERT_FATAL(dev != NULL);
1719 	CU_ASSERT(dev->io_device == (void *)0x8000000000000000);
1720 
1721 	dev = RB_NEXT(io_device_tree, &g_io_devices, dev);
1722 	SPDK_CU_ASSERT_FATAL(dev != NULL);
1723 	CU_ASSERT(dev->io_device == (void *)0x8000000100000000);
1724 
1725 	dev = RB_NEXT(io_device_tree, &g_io_devices, dev);
1726 	SPDK_CU_ASSERT_FATAL(dev != NULL);
1727 	CU_ASSERT(dev->io_device == (void *)UINT64_MAX);
1728 
1729 	/* Verify spdk_get_io_channel() creates io_channels associated with the
1730 	 * correct io_devices.
1731 	 */
1732 	ch = spdk_get_io_channel((void *)0x1);
1733 	SPDK_CU_ASSERT_FATAL(ch != NULL);
1734 	CU_ASSERT(ch->dev->io_device == (void *)0x1);
1735 	spdk_put_io_channel(ch);
1736 
1737 	ch = spdk_get_io_channel((void *)0x7FFFFFFF);
1738 	SPDK_CU_ASSERT_FATAL(ch != NULL);
1739 	CU_ASSERT(ch->dev->io_device == (void *)0x7FFFFFFF);
1740 	spdk_put_io_channel(ch);
1741 
1742 	ch = spdk_get_io_channel((void *)0x80000000);
1743 	SPDK_CU_ASSERT_FATAL(ch != NULL);
1744 	CU_ASSERT(ch->dev->io_device == (void *)0x80000000);
1745 	spdk_put_io_channel(ch);
1746 
1747 	ch = spdk_get_io_channel((void *)0x100000000);
1748 	SPDK_CU_ASSERT_FATAL(ch != NULL);
1749 	CU_ASSERT(ch->dev->io_device == (void *)0x100000000);
1750 	spdk_put_io_channel(ch);
1751 
1752 	ch = spdk_get_io_channel((void *)0x8000000000000000);
1753 	SPDK_CU_ASSERT_FATAL(ch != NULL);
1754 	CU_ASSERT(ch->dev->io_device == (void *)0x8000000000000000);
1755 	spdk_put_io_channel(ch);
1756 
1757 	ch = spdk_get_io_channel((void *)0x8000000100000000);
1758 	SPDK_CU_ASSERT_FATAL(ch != NULL);
1759 	CU_ASSERT(ch->dev->io_device == (void *)0x8000000100000000);
1760 	spdk_put_io_channel(ch);
1761 
1762 	ch = spdk_get_io_channel((void *)UINT64_MAX);
1763 	SPDK_CU_ASSERT_FATAL(ch != NULL);
1764 	CU_ASSERT(ch->dev->io_device == (void *)UINT64_MAX);
1765 	spdk_put_io_channel(ch);
1766 
1767 	poll_threads();
1768 
1769 	spdk_io_device_unregister((void *)0x1, NULL);
1770 	spdk_io_device_unregister((void *)0x7FFFFFFF, NULL);
1771 	spdk_io_device_unregister((void *)0x80000000, NULL);
1772 	spdk_io_device_unregister((void *)0x100000000, NULL);
1773 	spdk_io_device_unregister((void *)0x8000000000000000, NULL);
1774 	spdk_io_device_unregister((void *)0x8000000100000000, NULL);
1775 	spdk_io_device_unregister((void *)UINT64_MAX, NULL);
1776 
1777 	poll_threads();
1778 
1779 	CU_ASSERT(RB_EMPTY(&g_io_devices));
1780 
1781 	free_threads();
1782 }
1783 
1784 static enum spin_error g_spin_err;
1785 static uint32_t g_spin_err_count = 0;
1786 
1787 static void
1788 ut_track_abort(enum spin_error err)
1789 {
1790 	g_spin_err = err;
1791 	g_spin_err_count++;
1792 }
1793 
1794 static void
1795 spdk_spin(void)
1796 {
1797 	struct spdk_spinlock lock;
1798 
1799 	g_spin_abort_fn = ut_track_abort;
1800 
1801 	/* Do not need to be on an SPDK thread to initialize an spdk_spinlock */
1802 	g_spin_err_count = 0;
1803 	spdk_spin_init(&lock);
1804 	CU_ASSERT(g_spin_err_count == 0);
1805 
1806 	/* Trying to take a lock while not on an SPDK thread is an error */
1807 	g_spin_err_count = 0;
1808 	spdk_spin_lock(&lock);
1809 	CU_ASSERT(g_spin_err_count == 1);
1810 	CU_ASSERT(g_spin_err == SPIN_ERR_NOT_SPDK_THREAD);
1811 
1812 	/* Trying to check if a lock is held while not on an SPDK thread is an error */
1813 	g_spin_err_count = 0;
1814 	spdk_spin_held(&lock);
1815 	CU_ASSERT(g_spin_err_count == 1);
1816 	CU_ASSERT(g_spin_err == SPIN_ERR_NOT_SPDK_THREAD);
1817 
1818 	/* Do not need to be on an SPDK thread to destroy an spdk_spinlock */
1819 	g_spin_err_count = 0;
1820 	spdk_spin_destroy(&lock);
1821 	CU_ASSERT(g_spin_err_count == 0);
1822 
1823 	allocate_threads(2);
1824 	set_thread(0);
1825 
1826 	/* Can initialize an spdk_spinlock on an SPDK thread */
1827 	g_spin_err_count = 0;
1828 	spdk_spin_init(&lock);
1829 	CU_ASSERT(g_spin_err_count == 0);
1830 
1831 	/* Can take spinlock */
1832 	g_spin_err_count = 0;
1833 	spdk_spin_lock(&lock);
1834 	CU_ASSERT(g_spin_err_count == 0);
1835 
1836 	/* Can release spinlock */
1837 	g_spin_err_count = 0;
1838 	spdk_spin_unlock(&lock);
1839 	CU_ASSERT(g_spin_err_count == 0);
1840 
1841 	/* Deadlock detected */
1842 	g_spin_err_count = 0;
1843 	g_spin_err = SPIN_ERR_NONE;
1844 	spdk_spin_lock(&lock);
1845 	CU_ASSERT(g_spin_err_count == 0);
1846 	spdk_spin_lock(&lock);
1847 	CU_ASSERT(g_spin_err_count == 1);
1848 	CU_ASSERT(g_spin_err == SPIN_ERR_DEADLOCK);
1849 
1850 	/* Cannot unlock from wrong thread */
1851 	set_thread(1);
1852 	g_spin_err_count = 0;
1853 	spdk_spin_unlock(&lock);
1854 	CU_ASSERT(g_spin_err_count == 1);
1855 	CU_ASSERT(g_spin_err == SPIN_ERR_WRONG_THREAD);
1856 
1857 	/* Get back to a known good state */
1858 	set_thread(0);
1859 	g_spin_err_count = 0;
1860 	spdk_spin_unlock(&lock);
1861 	CU_ASSERT(g_spin_err_count == 0);
1862 
1863 	/* Cannot release the same lock twice */
1864 	g_spin_err_count = 0;
1865 	spdk_spin_lock(&lock);
1866 	CU_ASSERT(g_spin_err_count == 0);
1867 	spdk_spin_unlock(&lock);
1868 	CU_ASSERT(g_spin_err_count == 0);
1869 	spdk_spin_unlock(&lock);
1870 	CU_ASSERT(g_spin_err_count == 1);
1871 	CU_ASSERT(g_spin_err == SPIN_ERR_WRONG_THREAD);
1872 
1873 	/* A lock that is not held is properly recognized */
1874 	g_spin_err_count = 0;
1875 	CU_ASSERT(!spdk_spin_held(&lock));
1876 	CU_ASSERT(g_spin_err_count == 0);
1877 
1878 	/* A lock that is held is recognized as held by only the thread that holds it. */
1879 	set_thread(1);
1880 	g_spin_err_count = 0;
1881 	spdk_spin_lock(&lock);
1882 	CU_ASSERT(g_spin_err_count == 0);
1883 	CU_ASSERT(spdk_spin_held(&lock));
1884 	CU_ASSERT(g_spin_err_count == 0);
1885 	set_thread(0);
1886 	CU_ASSERT(!spdk_spin_held(&lock));
1887 	CU_ASSERT(g_spin_err_count == 0);
1888 
1889 	/* After releasing, no one thinks it is held */
1890 	set_thread(1);
1891 	spdk_spin_unlock(&lock);
1892 	CU_ASSERT(g_spin_err_count == 0);
1893 	CU_ASSERT(!spdk_spin_held(&lock));
1894 	CU_ASSERT(g_spin_err_count == 0);
1895 	set_thread(0);
1896 	CU_ASSERT(!spdk_spin_held(&lock));
1897 	CU_ASSERT(g_spin_err_count == 0);
1898 
1899 	/* Destroying a lock that is held is an error. */
1900 	set_thread(0);
1901 	g_spin_err_count = 0;
1902 	spdk_spin_lock(&lock);
1903 	CU_ASSERT(g_spin_err_count == 0);
1904 	spdk_spin_destroy(&lock);
1905 	CU_ASSERT(g_spin_err_count == 1);
1906 	CU_ASSERT(g_spin_err == SPIN_ERR_LOCK_HELD);
1907 	g_spin_err_count = 0;
1908 	spdk_spin_unlock(&lock);
1909 	CU_ASSERT(g_spin_err_count == 0);
1910 
1911 	/* Clean up */
1912 	g_spin_err_count = 0;
1913 	spdk_spin_destroy(&lock);
1914 	CU_ASSERT(g_spin_err_count == 0);
1915 	free_threads();
1916 	g_spin_abort_fn = __posix_abort;
1917 }
1918 
1919 struct ut_iobuf_entry {
1920 	struct spdk_iobuf_channel	*ioch;
1921 	struct spdk_iobuf_entry		iobuf;
1922 	void				*buf;
1923 	uint32_t			thread_id;
1924 	const char			*module;
1925 };
1926 
1927 static void
1928 ut_iobuf_finish_cb(void *ctx)
1929 {
1930 	*(int *)ctx = 1;
1931 }
1932 
1933 static void
1934 ut_iobuf_get_buf_cb(struct spdk_iobuf_entry *entry, void *buf)
1935 {
1936 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
1937 
1938 	ut_entry->buf = buf;
1939 }
1940 
1941 static int
1942 ut_iobuf_foreach_cb(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, void *cb_arg)
1943 {
1944 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
1945 
1946 	ut_entry->buf = cb_arg;
1947 
1948 	return 0;
1949 }
1950 
1951 static void
1952 iobuf(void)
1953 {
1954 	struct spdk_iobuf_opts opts = {
1955 		.small_pool_count = 2,
1956 		.large_pool_count = 2,
1957 		.small_bufsize = SMALL_BUFSIZE,
1958 		.large_bufsize = LARGE_BUFSIZE,
1959 	};
1960 	struct ut_iobuf_entry *entry;
1961 	struct spdk_iobuf_channel mod0_ch[2], mod1_ch[2];
1962 	struct ut_iobuf_entry mod0_entries[] = {
1963 		{ .thread_id = 0, .module = "ut_module0", },
1964 		{ .thread_id = 0, .module = "ut_module0", },
1965 		{ .thread_id = 0, .module = "ut_module0", },
1966 		{ .thread_id = 0, .module = "ut_module0", },
1967 		{ .thread_id = 1, .module = "ut_module0", },
1968 		{ .thread_id = 1, .module = "ut_module0", },
1969 		{ .thread_id = 1, .module = "ut_module0", },
1970 		{ .thread_id = 1, .module = "ut_module0", },
1971 	};
1972 	struct ut_iobuf_entry mod1_entries[] = {
1973 		{ .thread_id = 0, .module = "ut_module1", },
1974 		{ .thread_id = 0, .module = "ut_module1", },
1975 		{ .thread_id = 0, .module = "ut_module1", },
1976 		{ .thread_id = 0, .module = "ut_module1", },
1977 		{ .thread_id = 1, .module = "ut_module1", },
1978 		{ .thread_id = 1, .module = "ut_module1", },
1979 		{ .thread_id = 1, .module = "ut_module1", },
1980 		{ .thread_id = 1, .module = "ut_module1", },
1981 	};
1982 	int rc, finish = 0;
1983 	uint32_t i;
1984 
1985 	allocate_cores(2);
1986 	allocate_threads(2);
1987 
1988 	set_thread(0);
1989 
1990 	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
1991 	g_iobuf.opts = opts;
1992 	rc = spdk_iobuf_initialize();
1993 	CU_ASSERT_EQUAL(rc, 0);
1994 
1995 	rc = spdk_iobuf_register_module("ut_module0");
1996 	CU_ASSERT_EQUAL(rc, 0);
1997 
1998 	rc = spdk_iobuf_register_module("ut_module1");
1999 	CU_ASSERT_EQUAL(rc, 0);
2000 
2001 	set_thread(0);
2002 	rc = spdk_iobuf_channel_init(&mod0_ch[0], "ut_module0", 0, 0);
2003 	CU_ASSERT_EQUAL(rc, 0);
2004 	set_thread(1);
2005 	rc = spdk_iobuf_channel_init(&mod0_ch[1], "ut_module0", 0, 0);
2006 	CU_ASSERT_EQUAL(rc, 0);
2007 	for (i = 0; i < SPDK_COUNTOF(mod0_entries); ++i) {
2008 		mod0_entries[i].ioch = &mod0_ch[mod0_entries[i].thread_id];
2009 	}
2010 	set_thread(0);
2011 	rc = spdk_iobuf_channel_init(&mod1_ch[0], "ut_module1", 0, 0);
2012 	CU_ASSERT_EQUAL(rc, 0);
2013 	set_thread(1);
2014 	rc = spdk_iobuf_channel_init(&mod1_ch[1], "ut_module1", 0, 0);
2015 	CU_ASSERT_EQUAL(rc, 0);
2016 	for (i = 0; i < SPDK_COUNTOF(mod1_entries); ++i) {
2017 		mod1_entries[i].ioch = &mod1_ch[mod1_entries[i].thread_id];
2018 	}
2019 
2020 	/* First check that it's possible to retrieve the whole pools from a single module */
2021 	set_thread(0);
2022 	entry = &mod0_entries[0];
2023 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2024 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2025 	entry = &mod0_entries[1];
2026 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2027 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2028 	/* The next two should be put onto the large buf wait queue */
2029 	entry = &mod0_entries[2];
2030 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2031 	CU_ASSERT_PTR_NULL(entry->buf);
2032 	entry = &mod0_entries[3];
2033 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2034 	CU_ASSERT_PTR_NULL(entry->buf);
2035 	/* Pick the two next buffers from the small pool */
2036 	set_thread(1);
2037 	entry = &mod0_entries[4];
2038 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2039 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2040 	entry = &mod0_entries[5];
2041 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2042 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2043 	/* The next two should be put onto the small buf wait queue */
2044 	entry = &mod0_entries[6];
2045 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2046 	CU_ASSERT_PTR_NULL(entry->buf);
2047 	entry = &mod0_entries[7];
2048 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2049 	CU_ASSERT_PTR_NULL(entry->buf);
2050 
2051 	/* Now return one of the large buffers to the pool and verify that the first request's
2052 	 * (entry 2) callback was executed and it was removed from the wait queue.
2053 	 */
2054 	set_thread(0);
2055 	entry = &mod0_entries[0];
2056 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2057 	entry = &mod0_entries[2];
2058 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2059 	entry = &mod0_entries[3];
2060 	CU_ASSERT_PTR_NULL(entry->buf);
2061 
2062 	/* Return the second buffer and check that the other request is satisfied */
2063 	entry = &mod0_entries[1];
2064 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2065 	entry = &mod0_entries[3];
2066 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2067 
2068 	/* Return the remaining two buffers */
2069 	entry = &mod0_entries[2];
2070 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2071 	entry = &mod0_entries[3];
2072 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2073 
2074 	/* Check that it didn't change the requests waiting for the small buffers */
2075 	entry = &mod0_entries[6];
2076 	CU_ASSERT_PTR_NULL(entry->buf);
2077 	entry = &mod0_entries[7];
2078 	CU_ASSERT_PTR_NULL(entry->buf);
2079 
2080 	/* Do the same test as above, this time using the small pool */
2081 	set_thread(1);
2082 	entry = &mod0_entries[4];
2083 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2084 	entry = &mod0_entries[6];
2085 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2086 	entry = &mod0_entries[7];
2087 	CU_ASSERT_PTR_NULL(entry->buf);
2088 
2089 	/* Return the second buffer and check that the other request is satisfied */
2090 	entry = &mod0_entries[5];
2091 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2092 	entry = &mod0_entries[7];
2093 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2094 
2095 	/* Return the remaining two buffers */
2096 	entry = &mod0_entries[6];
2097 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2098 	entry = &mod0_entries[7];
2099 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2100 
2101 	/* Now check requesting buffers from different modules - first request all of them from one
2102 	 * module, starting from the large pool
2103 	 */
2104 	set_thread(0);
2105 	entry = &mod0_entries[0];
2106 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2107 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2108 	entry = &mod0_entries[1];
2109 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2110 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2111 	/* Request all of them from the small one */
2112 	set_thread(1);
2113 	entry = &mod0_entries[4];
2114 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2115 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2116 	entry = &mod0_entries[5];
2117 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2118 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2119 
2120 	/* Request one buffer per module from each pool  */
2121 	set_thread(0);
2122 	entry = &mod1_entries[0];
2123 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2124 	CU_ASSERT_PTR_NULL(entry->buf);
2125 	entry = &mod0_entries[3];
2126 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2127 	CU_ASSERT_PTR_NULL(entry->buf);
2128 	/* Change the order from the small pool and request a buffer from mod0 first */
2129 	set_thread(1);
2130 	entry = &mod0_entries[6];
2131 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2132 	CU_ASSERT_PTR_NULL(entry->buf);
2133 	entry = &mod1_entries[4];
2134 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2135 	CU_ASSERT_PTR_NULL(entry->buf);
2136 
2137 	/* Now return one buffer to the large pool */
2138 	set_thread(0);
2139 	entry = &mod0_entries[0];
2140 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2141 
2142 	/* Make sure the request from mod1 got the buffer, as it was the first to request it */
2143 	entry = &mod1_entries[0];
2144 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2145 	entry = &mod0_entries[3];
2146 	CU_ASSERT_PTR_NULL(entry->buf);
2147 
2148 	/* Return second buffer to the large pool and check the outstanding mod0 request */
2149 	entry = &mod0_entries[1];
2150 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2151 	entry = &mod0_entries[3];
2152 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2153 
2154 	/* Return the remaining two buffers */
2155 	entry = &mod1_entries[0];
2156 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2157 	entry = &mod0_entries[3];
2158 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2159 
2160 	/* Check the same for the small pool, but this time the order of the request is reversed
2161 	 * (mod0 before mod1)
2162 	 */
2163 	set_thread(1);
2164 	entry = &mod0_entries[4];
2165 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2166 	entry = &mod0_entries[6];
2167 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2168 	/* mod1 request was second in this case, so it still needs to wait */
2169 	entry = &mod1_entries[4];
2170 	CU_ASSERT_PTR_NULL(entry->buf);
2171 
2172 	/* Return the second requested buffer */
2173 	entry = &mod0_entries[5];
2174 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2175 	entry = &mod1_entries[4];
2176 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2177 
2178 	/* Return the remaining two buffers */
2179 	entry = &mod0_entries[6];
2180 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2181 	entry = &mod1_entries[4];
2182 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2183 
2184 	/* Request buffers to make the pools empty */
2185 	set_thread(0);
2186 	entry = &mod0_entries[0];
2187 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2188 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2189 	entry = &mod1_entries[0];
2190 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2191 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2192 	entry = &mod0_entries[1];
2193 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2194 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2195 	entry = &mod1_entries[1];
2196 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2197 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2198 
2199 	/* Queue more requests from both modules */
2200 	entry = &mod0_entries[2];
2201 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2202 	CU_ASSERT_PTR_NULL(entry->buf);
2203 	entry = &mod1_entries[2];
2204 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2205 	CU_ASSERT_PTR_NULL(entry->buf);
2206 	entry = &mod1_entries[3];
2207 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2208 	CU_ASSERT_PTR_NULL(entry->buf);
2209 	entry = &mod0_entries[3];
2210 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2211 	CU_ASSERT_PTR_NULL(entry->buf);
2212 
2213 	/* Check that abort correctly remove an entry from the queue */
2214 	entry = &mod0_entries[2];
2215 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
2216 	entry = &mod1_entries[3];
2217 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
2218 
2219 	entry = &mod0_entries[0];
2220 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2221 	CU_ASSERT_PTR_NOT_NULL(mod1_entries[2].buf);
2222 	entry = &mod0_entries[1];
2223 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2224 	CU_ASSERT_PTR_NOT_NULL(mod0_entries[3].buf);
2225 
2226 	/* Clean up */
2227 	entry = &mod1_entries[0];
2228 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2229 	entry = &mod1_entries[2];
2230 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2231 	entry = &mod1_entries[1];
2232 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2233 	entry = &mod0_entries[3];
2234 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2235 
2236 	/* Request buffers to make the pools empty */
2237 	set_thread(0);
2238 	entry = &mod0_entries[0];
2239 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2240 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2241 	entry = &mod1_entries[0];
2242 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2243 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2244 	entry = &mod0_entries[1];
2245 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2246 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2247 	entry = &mod1_entries[1];
2248 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2249 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
2250 
2251 	/* Request a buffer from each queue and each module on thread 0 */
2252 	set_thread(0);
2253 	entry = &mod0_entries[2];
2254 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2255 	CU_ASSERT_PTR_NULL(entry->buf);
2256 	entry = &mod1_entries[2];
2257 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2258 	CU_ASSERT_PTR_NULL(entry->buf);
2259 	entry = &mod0_entries[3];
2260 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2261 	CU_ASSERT_PTR_NULL(entry->buf);
2262 	entry = &mod1_entries[3];
2263 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2264 	CU_ASSERT_PTR_NULL(entry->buf);
2265 
2266 	/* Do the same on thread 1 */
2267 	set_thread(1);
2268 	entry = &mod0_entries[6];
2269 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2270 	CU_ASSERT_PTR_NULL(entry->buf);
2271 	entry = &mod1_entries[6];
2272 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2273 	CU_ASSERT_PTR_NULL(entry->buf);
2274 	entry = &mod0_entries[7];
2275 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2276 	CU_ASSERT_PTR_NULL(entry->buf);
2277 	entry = &mod1_entries[7];
2278 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
2279 	CU_ASSERT_PTR_NULL(entry->buf);
2280 
2281 	/* Now do the foreach and check that correct entries are iterated over by assigning their
2282 	 * ->buf pointers to different values.
2283 	 */
2284 	set_thread(0);
2285 	rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].large,
2286 				       ut_iobuf_foreach_cb, (void *)0xdeadbeef);
2287 	CU_ASSERT_EQUAL(rc, 0);
2288 	rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].small,
2289 				       ut_iobuf_foreach_cb, (void *)0xbeefdead);
2290 	CU_ASSERT_EQUAL(rc, 0);
2291 	rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].large,
2292 				       ut_iobuf_foreach_cb, (void *)0xfeedbeef);
2293 	CU_ASSERT_EQUAL(rc, 0);
2294 	rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].small,
2295 				       ut_iobuf_foreach_cb, (void *)0xbeeffeed);
2296 	CU_ASSERT_EQUAL(rc, 0);
2297 	set_thread(1);
2298 	rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].large,
2299 				       ut_iobuf_foreach_cb, (void *)0xcafebabe);
2300 	CU_ASSERT_EQUAL(rc, 0);
2301 	rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].small,
2302 				       ut_iobuf_foreach_cb, (void *)0xbabecafe);
2303 	CU_ASSERT_EQUAL(rc, 0);
2304 	rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].large,
2305 				       ut_iobuf_foreach_cb, (void *)0xbeefcafe);
2306 	CU_ASSERT_EQUAL(rc, 0);
2307 	rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].small,
2308 				       ut_iobuf_foreach_cb, (void *)0xcafebeef);
2309 	CU_ASSERT_EQUAL(rc, 0);
2310 
2311 	/* thread 0 */
2312 	CU_ASSERT_PTR_EQUAL(mod0_entries[2].buf, (void *)0xdeadbeef);
2313 	CU_ASSERT_PTR_EQUAL(mod0_entries[3].buf, (void *)0xbeefdead);
2314 	CU_ASSERT_PTR_EQUAL(mod1_entries[2].buf, (void *)0xfeedbeef);
2315 	CU_ASSERT_PTR_EQUAL(mod1_entries[3].buf, (void *)0xbeeffeed);
2316 	/* thread 1 */
2317 	CU_ASSERT_PTR_EQUAL(mod0_entries[6].buf, (void *)0xcafebabe);
2318 	CU_ASSERT_PTR_EQUAL(mod0_entries[7].buf, (void *)0xbabecafe);
2319 	CU_ASSERT_PTR_EQUAL(mod1_entries[6].buf, (void *)0xbeefcafe);
2320 	CU_ASSERT_PTR_EQUAL(mod1_entries[7].buf, (void *)0xcafebeef);
2321 
2322 	/* Clean everything up */
2323 	set_thread(0);
2324 	entry = &mod0_entries[2];
2325 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
2326 	entry = &mod0_entries[3];
2327 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
2328 	entry = &mod1_entries[2];
2329 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
2330 	entry = &mod1_entries[3];
2331 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
2332 
2333 	entry = &mod0_entries[0];
2334 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2335 	entry = &mod1_entries[0];
2336 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
2337 	entry = &mod0_entries[1];
2338 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2339 	entry = &mod1_entries[1];
2340 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
2341 
2342 	set_thread(1);
2343 	entry = &mod0_entries[6];
2344 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
2345 	entry = &mod0_entries[7];
2346 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
2347 	entry = &mod1_entries[6];
2348 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
2349 	entry = &mod1_entries[7];
2350 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
2351 
2352 	set_thread(0);
2353 	spdk_iobuf_channel_fini(&mod0_ch[0]);
2354 	poll_threads();
2355 	spdk_iobuf_channel_fini(&mod1_ch[0]);
2356 	poll_threads();
2357 	set_thread(1);
2358 	spdk_iobuf_channel_fini(&mod0_ch[1]);
2359 	poll_threads();
2360 	spdk_iobuf_channel_fini(&mod1_ch[1]);
2361 	poll_threads();
2362 
2363 	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
2364 	poll_threads();
2365 
2366 	CU_ASSERT_EQUAL(finish, 1);
2367 
2368 	free_threads();
2369 	free_cores();
2370 }
2371 
2372 static void
2373 iobuf_cache(void)
2374 {
2375 	struct spdk_iobuf_opts opts = {
2376 		.small_pool_count = 4,
2377 		.large_pool_count = 4,
2378 		.small_bufsize = SMALL_BUFSIZE,
2379 		.large_bufsize = LARGE_BUFSIZE,
2380 	};
2381 	struct spdk_iobuf_channel iobuf_ch[2];
2382 	struct ut_iobuf_entry *entry;
2383 	struct ut_iobuf_entry mod0_entries[] = {
2384 		{ .thread_id = 0, .module = "ut_module0", },
2385 		{ .thread_id = 0, .module = "ut_module0", },
2386 		{ .thread_id = 0, .module = "ut_module0", },
2387 		{ .thread_id = 0, .module = "ut_module0", },
2388 	};
2389 	struct ut_iobuf_entry mod1_entries[] = {
2390 		{ .thread_id = 0, .module = "ut_module1", },
2391 		{ .thread_id = 0, .module = "ut_module1", },
2392 	};
2393 	int rc, finish = 0;
2394 	uint32_t i, j, bufsize;
2395 
2396 	allocate_cores(1);
2397 	allocate_threads(1);
2398 
2399 	set_thread(0);
2400 
2401 	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
2402 	g_iobuf.opts = opts;
2403 	rc = spdk_iobuf_initialize();
2404 	CU_ASSERT_EQUAL(rc, 0);
2405 
2406 	rc = spdk_iobuf_register_module("ut_module0");
2407 	CU_ASSERT_EQUAL(rc, 0);
2408 
2409 	rc = spdk_iobuf_register_module("ut_module1");
2410 	CU_ASSERT_EQUAL(rc, 0);
2411 
2412 	/* First check that channel initialization fails when it's not possible to fill in the cache
2413 	 * from the pool.
2414 	 */
2415 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 5, 1);
2416 	CU_ASSERT_EQUAL(rc, -ENOMEM);
2417 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 1, 5);
2418 	CU_ASSERT_EQUAL(rc, -ENOMEM);
2419 
2420 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 4, 4);
2421 	CU_ASSERT_EQUAL(rc, 0);
2422 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 4, 4);
2423 	CU_ASSERT_EQUAL(rc, -ENOMEM);
2424 
2425 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
2426 	poll_threads();
2427 
2428 	/* Initialize one channel with cache, acquire buffers, and check that a second one can be
2429 	 * created once the buffers acquired from the first one are returned to the pool
2430 	 */
2431 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
2432 	CU_ASSERT_EQUAL(rc, 0);
2433 
2434 	for (i = 0; i < 3; ++i) {
2435 		mod0_entries[i].buf = spdk_iobuf_get(&iobuf_ch[0], LARGE_BUFSIZE, &mod0_entries[i].iobuf,
2436 						     ut_iobuf_get_buf_cb);
2437 		CU_ASSERT_PTR_NOT_NULL(mod0_entries[i].buf);
2438 	}
2439 
2440 	/* It should be able to create a channel with a single entry in the cache */
2441 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 1);
2442 	CU_ASSERT_EQUAL(rc, 0);
2443 	spdk_iobuf_channel_fini(&iobuf_ch[1]);
2444 	poll_threads();
2445 
2446 	/* But not with two entries */
2447 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
2448 	CU_ASSERT_EQUAL(rc, -ENOMEM);
2449 
2450 	for (i = 0; i < 2; ++i) {
2451 		spdk_iobuf_put(&iobuf_ch[0], mod0_entries[i].buf, LARGE_BUFSIZE);
2452 		rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
2453 		CU_ASSERT_EQUAL(rc, -ENOMEM);
2454 	}
2455 
2456 	spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, LARGE_BUFSIZE);
2457 
2458 	/* The last buffer should be released back to the pool, so we should be able to create a new
2459 	 * channel
2460 	 */
2461 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
2462 	CU_ASSERT_EQUAL(rc, 0);
2463 
2464 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
2465 	spdk_iobuf_channel_fini(&iobuf_ch[1]);
2466 	poll_threads();
2467 
2468 	/* Check that the pool is only used when the cache is empty and that the cache guarantees a
2469 	 * certain set of buffers
2470 	 */
2471 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
2472 	CU_ASSERT_EQUAL(rc, 0);
2473 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 1, 1);
2474 	CU_ASSERT_EQUAL(rc, 0);
2475 
2476 	uint32_t buffer_sizes[] = { SMALL_BUFSIZE, LARGE_BUFSIZE };
2477 	for (i = 0; i < SPDK_COUNTOF(buffer_sizes); ++i) {
2478 		bufsize = buffer_sizes[i];
2479 
2480 		for (j = 0; j < 3; ++j) {
2481 			entry = &mod0_entries[j];
2482 			entry->buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &entry->iobuf,
2483 						    ut_iobuf_get_buf_cb);
2484 			CU_ASSERT_PTR_NOT_NULL(entry->buf);
2485 		}
2486 
2487 		mod1_entries[0].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[0].iobuf,
2488 						     ut_iobuf_get_buf_cb);
2489 		CU_ASSERT_PTR_NOT_NULL(mod1_entries[0].buf);
2490 
2491 		/* The whole pool is exhausted now */
2492 		mod1_entries[1].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[1].iobuf,
2493 						     ut_iobuf_get_buf_cb);
2494 		CU_ASSERT_PTR_NULL(mod1_entries[1].buf);
2495 		mod0_entries[3].buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &mod0_entries[3].iobuf,
2496 						     ut_iobuf_get_buf_cb);
2497 		CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
2498 
2499 		/* If there are outstanding requests waiting for a buffer, they should have priority
2500 		 * over filling in the cache, even if they're from different modules.
2501 		 */
2502 		spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, bufsize);
2503 		/* Also make sure the queue is FIFO and doesn't care about which module requested
2504 		 * and which module released the buffer.
2505 		 */
2506 		CU_ASSERT_PTR_NOT_NULL(mod1_entries[1].buf);
2507 		CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
2508 
2509 		/* Return the buffers back */
2510 		spdk_iobuf_entry_abort(&iobuf_ch[0], &mod0_entries[3].iobuf, bufsize);
2511 		for (j = 0; j < 2; ++j) {
2512 			spdk_iobuf_put(&iobuf_ch[0], mod0_entries[j].buf, bufsize);
2513 			spdk_iobuf_put(&iobuf_ch[1], mod1_entries[j].buf, bufsize);
2514 		}
2515 	}
2516 
2517 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
2518 	spdk_iobuf_channel_fini(&iobuf_ch[1]);
2519 	poll_threads();
2520 
2521 	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
2522 	poll_threads();
2523 
2524 	CU_ASSERT_EQUAL(finish, 1);
2525 
2526 	free_threads();
2527 	free_cores();
2528 }
2529 
2530 int
2531 main(int argc, char **argv)
2532 {
2533 	CU_pSuite	suite = NULL;
2534 	unsigned int	num_failures;
2535 
2536 	CU_set_error_action(CUEA_ABORT);
2537 	CU_initialize_registry();
2538 
2539 	suite = CU_add_suite("io_channel", NULL, NULL);
2540 
2541 	CU_ADD_TEST(suite, thread_alloc);
2542 	CU_ADD_TEST(suite, thread_send_msg);
2543 	CU_ADD_TEST(suite, thread_poller);
2544 	CU_ADD_TEST(suite, poller_pause);
2545 	CU_ADD_TEST(suite, thread_for_each);
2546 	CU_ADD_TEST(suite, for_each_channel_remove);
2547 	CU_ADD_TEST(suite, for_each_channel_unreg);
2548 	CU_ADD_TEST(suite, thread_name);
2549 	CU_ADD_TEST(suite, channel);
2550 	CU_ADD_TEST(suite, channel_destroy_races);
2551 	CU_ADD_TEST(suite, thread_exit_test);
2552 	CU_ADD_TEST(suite, thread_update_stats_test);
2553 	CU_ADD_TEST(suite, nested_channel);
2554 	CU_ADD_TEST(suite, device_unregister_and_thread_exit_race);
2555 	CU_ADD_TEST(suite, cache_closest_timed_poller);
2556 	CU_ADD_TEST(suite, multi_timed_pollers_have_same_expiration);
2557 	CU_ADD_TEST(suite, io_device_lookup);
2558 	CU_ADD_TEST(suite, spdk_spin);
2559 	CU_ADD_TEST(suite, iobuf);
2560 	CU_ADD_TEST(suite, iobuf_cache);
2561 
2562 	CU_basic_set_mode(CU_BRM_VERBOSE);
2563 	CU_basic_run_tests();
2564 	num_failures = CU_get_number_of_failures();
2565 	CU_cleanup_registry();
2566 	return num_failures;
2567 }
2568