xref: /spdk/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c (revision 7506a7aa53d239f533af3bc768f0d2af55e735fe)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_cunit.h"
35 
36 #include "common/lib/ut_multithread.c"
37 #include "unit/lib/json_mock.c"
38 
39 #include "spdk/config.h"
40 /* HACK: disable VTune integration so the unit test doesn't need VTune headers and libs to build */
41 #undef SPDK_CONFIG_VTUNE
42 
43 #include "bdev/bdev.c"
44 
45 #define BDEV_UT_NUM_THREADS 3
46 
47 DEFINE_STUB(spdk_notify_send, uint64_t, (const char *type, const char *ctx), 0);
48 DEFINE_STUB(spdk_notify_type_register, struct spdk_notify_type *, (const char *type), NULL);
49 DEFINE_STUB_V(spdk_scsi_nvme_translate, (const struct spdk_bdev_io *bdev_io, int *sc, int *sk,
50 		int *asc, int *ascq));
51 DEFINE_STUB(spdk_memory_domain_get_dma_device_id, const char *, (struct spdk_memory_domain *domain),
52 	    "test_domain");
53 DEFINE_STUB(spdk_memory_domain_get_dma_device_type, enum spdk_dma_device_type,
54 	    (struct spdk_memory_domain *domain), 0);
55 
56 DEFINE_RETURN_MOCK(spdk_memory_domain_pull_data, int);
57 int
58 spdk_memory_domain_pull_data(struct spdk_memory_domain *src_domain, void *src_domain_ctx,
59 			     struct iovec *src_iov, uint32_t src_iov_cnt, struct iovec *dst_iov, uint32_t dst_iov_cnt,
60 			     spdk_memory_domain_data_cpl_cb cpl_cb, void *cpl_cb_arg)
61 {
62 	HANDLE_RETURN_MOCK(spdk_memory_domain_pull_data);
63 
64 	cpl_cb(cpl_cb_arg, 0);
65 	return 0;
66 }
67 
68 DEFINE_RETURN_MOCK(spdk_memory_domain_push_data, int);
69 int
70 spdk_memory_domain_push_data(struct spdk_memory_domain *dst_domain, void *dst_domain_ctx,
71 			     struct iovec *dst_iov, uint32_t dst_iovcnt, struct iovec *src_iov, uint32_t src_iovcnt,
72 			     spdk_memory_domain_data_cpl_cb cpl_cb, void *cpl_cb_arg)
73 {
74 	HANDLE_RETURN_MOCK(spdk_memory_domain_push_data);
75 
76 	cpl_cb(cpl_cb_arg, 0);
77 	return 0;
78 }
79 
80 struct ut_bdev {
81 	struct spdk_bdev	bdev;
82 	void			*io_target;
83 };
84 
85 struct ut_bdev_channel {
86 	TAILQ_HEAD(, spdk_bdev_io)	outstanding_io;
87 	uint32_t			outstanding_cnt;
88 	uint32_t			avail_cnt;
89 };
90 
91 int g_io_device;
92 struct ut_bdev g_bdev;
93 struct spdk_bdev_desc *g_desc;
94 bool g_teardown_done = false;
95 bool g_get_io_channel = true;
96 bool g_create_ch = true;
97 bool g_init_complete_called = false;
98 bool g_fini_start_called = true;
99 int g_status = 0;
100 int g_count = 0;
101 struct spdk_histogram_data *g_histogram = NULL;
102 
103 static int
104 stub_create_ch(void *io_device, void *ctx_buf)
105 {
106 	struct ut_bdev_channel *ch = ctx_buf;
107 
108 	if (g_create_ch == false) {
109 		return -1;
110 	}
111 
112 	TAILQ_INIT(&ch->outstanding_io);
113 	ch->outstanding_cnt = 0;
114 	/*
115 	 * When avail gets to 0, the submit_request function will return ENOMEM.
116 	 *  Most tests to not want ENOMEM to occur, so by default set this to a
117 	 *  big value that won't get hit.  The ENOMEM tests can then override this
118 	 *  value to something much smaller to induce ENOMEM conditions.
119 	 */
120 	ch->avail_cnt = 2048;
121 	return 0;
122 }
123 
124 static void
125 stub_destroy_ch(void *io_device, void *ctx_buf)
126 {
127 }
128 
129 static struct spdk_io_channel *
130 stub_get_io_channel(void *ctx)
131 {
132 	struct ut_bdev *ut_bdev = ctx;
133 
134 	if (g_get_io_channel == true) {
135 		return spdk_get_io_channel(ut_bdev->io_target);
136 	} else {
137 		return NULL;
138 	}
139 }
140 
141 static int
142 stub_destruct(void *ctx)
143 {
144 	return 0;
145 }
146 
147 static void
148 stub_submit_request(struct spdk_io_channel *_ch, struct spdk_bdev_io *bdev_io)
149 {
150 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
151 	struct spdk_bdev_io *io;
152 
153 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
154 		while (!TAILQ_EMPTY(&ch->outstanding_io)) {
155 			io = TAILQ_FIRST(&ch->outstanding_io);
156 			TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
157 			ch->outstanding_cnt--;
158 			spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
159 			ch->avail_cnt++;
160 		}
161 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) {
162 		TAILQ_FOREACH(io, &ch->outstanding_io, module_link) {
163 			if (io == bdev_io->u.abort.bio_to_abort) {
164 				TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
165 				ch->outstanding_cnt--;
166 				spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
167 				ch->avail_cnt++;
168 
169 				spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
170 				return;
171 			}
172 		}
173 
174 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
175 		return;
176 	}
177 
178 	if (ch->avail_cnt > 0) {
179 		TAILQ_INSERT_TAIL(&ch->outstanding_io, bdev_io, module_link);
180 		ch->outstanding_cnt++;
181 		ch->avail_cnt--;
182 	} else {
183 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
184 	}
185 }
186 
187 static uint32_t
188 stub_complete_io(void *io_target, uint32_t num_to_complete)
189 {
190 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
191 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
192 	struct spdk_bdev_io *io;
193 	bool complete_all = (num_to_complete == 0);
194 	uint32_t num_completed = 0;
195 
196 	while (complete_all || num_completed < num_to_complete) {
197 		if (TAILQ_EMPTY(&ch->outstanding_io)) {
198 			break;
199 		}
200 		io = TAILQ_FIRST(&ch->outstanding_io);
201 		TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
202 		ch->outstanding_cnt--;
203 		spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_SUCCESS);
204 		ch->avail_cnt++;
205 		num_completed++;
206 	}
207 	spdk_put_io_channel(_ch);
208 	return num_completed;
209 }
210 
211 static bool
212 stub_io_type_supported(void *ctx, enum spdk_bdev_io_type type)
213 {
214 	return true;
215 }
216 
217 static struct spdk_bdev_fn_table fn_table = {
218 	.get_io_channel =	stub_get_io_channel,
219 	.destruct =		stub_destruct,
220 	.submit_request =	stub_submit_request,
221 	.io_type_supported =	stub_io_type_supported,
222 };
223 
224 struct spdk_bdev_module bdev_ut_if;
225 
226 static int
227 module_init(void)
228 {
229 	spdk_bdev_module_init_done(&bdev_ut_if);
230 	return 0;
231 }
232 
233 static void
234 module_fini(void)
235 {
236 }
237 
238 static void
239 init_complete(void)
240 {
241 	g_init_complete_called = true;
242 }
243 
244 static void
245 fini_start(void)
246 {
247 	g_fini_start_called = true;
248 }
249 
250 struct spdk_bdev_module bdev_ut_if = {
251 	.name = "bdev_ut",
252 	.module_init = module_init,
253 	.module_fini = module_fini,
254 	.async_init = true,
255 	.init_complete = init_complete,
256 	.fini_start = fini_start,
257 };
258 
259 SPDK_BDEV_MODULE_REGISTER(bdev_ut, &bdev_ut_if)
260 
261 static void
262 register_bdev(struct ut_bdev *ut_bdev, char *name, void *io_target)
263 {
264 	memset(ut_bdev, 0, sizeof(*ut_bdev));
265 
266 	ut_bdev->io_target = io_target;
267 	ut_bdev->bdev.ctxt = ut_bdev;
268 	ut_bdev->bdev.name = name;
269 	ut_bdev->bdev.fn_table = &fn_table;
270 	ut_bdev->bdev.module = &bdev_ut_if;
271 	ut_bdev->bdev.blocklen = 4096;
272 	ut_bdev->bdev.blockcnt = 1024;
273 
274 	spdk_bdev_register(&ut_bdev->bdev);
275 }
276 
277 static void
278 unregister_bdev(struct ut_bdev *ut_bdev)
279 {
280 	/* Handle any deferred messages. */
281 	poll_threads();
282 	spdk_bdev_unregister(&ut_bdev->bdev, NULL, NULL);
283 	/* Handle the async bdev unregister. */
284 	poll_threads();
285 }
286 
287 static void
288 bdev_init_cb(void *done, int rc)
289 {
290 	CU_ASSERT(rc == 0);
291 	*(bool *)done = true;
292 }
293 
294 static void
295 _bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
296 	       void *event_ctx)
297 {
298 	switch (type) {
299 	case SPDK_BDEV_EVENT_REMOVE:
300 		if (event_ctx != NULL) {
301 			*(bool *)event_ctx = true;
302 		}
303 		break;
304 	default:
305 		CU_ASSERT(false);
306 		break;
307 	}
308 }
309 
310 static void
311 setup_test(void)
312 {
313 	bool done = false;
314 
315 	allocate_cores(BDEV_UT_NUM_THREADS);
316 	allocate_threads(BDEV_UT_NUM_THREADS);
317 	set_thread(0);
318 	spdk_bdev_initialize(bdev_init_cb, &done);
319 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
320 				sizeof(struct ut_bdev_channel), NULL);
321 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
322 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
323 }
324 
325 static void
326 finish_cb(void *cb_arg)
327 {
328 	g_teardown_done = true;
329 }
330 
331 static void
332 teardown_test(void)
333 {
334 	set_thread(0);
335 	g_teardown_done = false;
336 	spdk_bdev_close(g_desc);
337 	g_desc = NULL;
338 	unregister_bdev(&g_bdev);
339 	spdk_io_device_unregister(&g_io_device, NULL);
340 	spdk_bdev_finish(finish_cb, NULL);
341 	poll_threads();
342 	memset(&g_bdev, 0, sizeof(g_bdev));
343 	CU_ASSERT(g_teardown_done == true);
344 	g_teardown_done = false;
345 	free_threads();
346 	free_cores();
347 }
348 
349 static uint32_t
350 bdev_io_tailq_cnt(bdev_io_tailq_t *tailq)
351 {
352 	struct spdk_bdev_io *io;
353 	uint32_t cnt = 0;
354 
355 	TAILQ_FOREACH(io, tailq, internal.link) {
356 		cnt++;
357 	}
358 
359 	return cnt;
360 }
361 
362 static void
363 basic(void)
364 {
365 	g_init_complete_called = false;
366 	setup_test();
367 	CU_ASSERT(g_init_complete_called == true);
368 
369 	set_thread(0);
370 
371 	g_get_io_channel = false;
372 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
373 	CU_ASSERT(g_ut_threads[0].ch == NULL);
374 
375 	g_get_io_channel = true;
376 	g_create_ch = false;
377 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
378 	CU_ASSERT(g_ut_threads[0].ch == NULL);
379 
380 	g_get_io_channel = true;
381 	g_create_ch = true;
382 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
383 	CU_ASSERT(g_ut_threads[0].ch != NULL);
384 	spdk_put_io_channel(g_ut_threads[0].ch);
385 
386 	g_fini_start_called = false;
387 	teardown_test();
388 	CU_ASSERT(g_fini_start_called == true);
389 }
390 
391 static void
392 _bdev_unregistered(void *done, int rc)
393 {
394 	CU_ASSERT(rc == 0);
395 	*(bool *)done = true;
396 }
397 
398 static void
399 unregister_and_close(void)
400 {
401 	bool done, remove_notify;
402 	struct spdk_bdev_desc *desc = NULL;
403 
404 	setup_test();
405 	set_thread(0);
406 
407 	/* setup_test() automatically opens the bdev,
408 	 * but this test needs to do that in a different
409 	 * way. */
410 	spdk_bdev_close(g_desc);
411 	poll_threads();
412 
413 	/* Try hotremoving a bdev with descriptors which don't provide
414 	 * any context to the notification callback */
415 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &desc);
416 	SPDK_CU_ASSERT_FATAL(desc != NULL);
417 
418 	/* There is an open descriptor on the device. Unregister it,
419 	 * which can't proceed until the descriptor is closed. */
420 	done = false;
421 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
422 
423 	/* Poll the threads to allow all events to be processed */
424 	poll_threads();
425 
426 	/* Make sure the bdev was not unregistered. We still have a
427 	 * descriptor open */
428 	CU_ASSERT(done == false);
429 
430 	spdk_bdev_close(desc);
431 	poll_threads();
432 	desc = NULL;
433 
434 	/* The unregister should have completed */
435 	CU_ASSERT(done == true);
436 
437 
438 	/* Register the bdev again */
439 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
440 
441 	remove_notify = false;
442 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, &remove_notify, &desc);
443 	SPDK_CU_ASSERT_FATAL(desc != NULL);
444 	CU_ASSERT(remove_notify == false);
445 
446 	/* There is an open descriptor on the device. Unregister it,
447 	 * which can't proceed until the descriptor is closed. */
448 	done = false;
449 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
450 	/* No polling has occurred, so neither of these should execute */
451 	CU_ASSERT(remove_notify == false);
452 	CU_ASSERT(done == false);
453 
454 	/* Prior to the unregister completing, close the descriptor */
455 	spdk_bdev_close(desc);
456 
457 	/* Poll the threads to allow all events to be processed */
458 	poll_threads();
459 
460 	/* Remove notify should not have been called because the
461 	 * descriptor is already closed. */
462 	CU_ASSERT(remove_notify == false);
463 
464 	/* The unregister should have completed */
465 	CU_ASSERT(done == true);
466 
467 	/* Restore the original g_bdev so that we can use teardown_test(). */
468 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
469 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
470 	teardown_test();
471 }
472 
473 static void
474 reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
475 {
476 	bool *done = cb_arg;
477 
478 	CU_ASSERT(success == true);
479 	*done = true;
480 	spdk_bdev_free_io(bdev_io);
481 }
482 
483 static void
484 put_channel_during_reset(void)
485 {
486 	struct spdk_io_channel *io_ch;
487 	bool done = false;
488 
489 	setup_test();
490 
491 	set_thread(0);
492 	io_ch = spdk_bdev_get_io_channel(g_desc);
493 	CU_ASSERT(io_ch != NULL);
494 
495 	/*
496 	 * Start a reset, but then put the I/O channel before
497 	 *  the deferred messages for the reset get a chance to
498 	 *  execute.
499 	 */
500 	spdk_bdev_reset(g_desc, io_ch, reset_done, &done);
501 	spdk_put_io_channel(io_ch);
502 	poll_threads();
503 	stub_complete_io(g_bdev.io_target, 0);
504 
505 	teardown_test();
506 }
507 
508 static void
509 aborted_reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
510 {
511 	enum spdk_bdev_io_status *status = cb_arg;
512 
513 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
514 	spdk_bdev_free_io(bdev_io);
515 }
516 
517 static void
518 aborted_reset(void)
519 {
520 	struct spdk_io_channel *io_ch[2];
521 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
522 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
523 
524 	setup_test();
525 
526 	set_thread(0);
527 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
528 	CU_ASSERT(io_ch[0] != NULL);
529 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
530 	poll_threads();
531 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
532 
533 	/*
534 	 * First reset has been submitted on ch0.  Now submit a second
535 	 *  reset on ch1 which will get queued since there is already a
536 	 *  reset in progress.
537 	 */
538 	set_thread(1);
539 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
540 	CU_ASSERT(io_ch[1] != NULL);
541 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
542 	poll_threads();
543 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
544 
545 	/*
546 	 * Now destroy ch1.  This will abort the queued reset.  Check that
547 	 *  the second reset was completed with failed status.  Also check
548 	 *  that bdev->internal.reset_in_progress != NULL, since the
549 	 *  original reset has not been completed yet.  This ensures that
550 	 *  the bdev code is correctly noticing that the failed reset is
551 	 *  *not* the one that had been submitted to the bdev module.
552 	 */
553 	set_thread(1);
554 	spdk_put_io_channel(io_ch[1]);
555 	poll_threads();
556 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_FAILED);
557 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
558 
559 	/*
560 	 * Now complete the first reset, verify that it completed with SUCCESS
561 	 *  status and that bdev->internal.reset_in_progress is also set back to NULL.
562 	 */
563 	set_thread(0);
564 	spdk_put_io_channel(io_ch[0]);
565 	stub_complete_io(g_bdev.io_target, 0);
566 	poll_threads();
567 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
568 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
569 
570 	teardown_test();
571 }
572 
573 static void
574 io_during_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
575 {
576 	enum spdk_bdev_io_status *status = cb_arg;
577 
578 	*status = bdev_io->internal.status;
579 	spdk_bdev_free_io(bdev_io);
580 }
581 
582 static void
583 io_during_reset(void)
584 {
585 	struct spdk_io_channel *io_ch[2];
586 	struct spdk_bdev_channel *bdev_ch[2];
587 	enum spdk_bdev_io_status status0, status1, status_reset;
588 	int rc;
589 
590 	setup_test();
591 
592 	/*
593 	 * First test normal case - submit an I/O on each of two channels (with no resets)
594 	 *  and verify they complete successfully.
595 	 */
596 	set_thread(0);
597 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
598 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
599 	CU_ASSERT(bdev_ch[0]->flags == 0);
600 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
601 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
602 	CU_ASSERT(rc == 0);
603 
604 	set_thread(1);
605 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
606 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
607 	CU_ASSERT(bdev_ch[1]->flags == 0);
608 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
609 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
610 	CU_ASSERT(rc == 0);
611 
612 	poll_threads();
613 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
614 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
615 
616 	set_thread(0);
617 	stub_complete_io(g_bdev.io_target, 0);
618 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
619 
620 	set_thread(1);
621 	stub_complete_io(g_bdev.io_target, 0);
622 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
623 
624 	/*
625 	 * Now submit a reset, and leave it pending while we submit I/O on two different
626 	 *  channels.  These I/O should be failed by the bdev layer since the reset is in
627 	 *  progress.
628 	 */
629 	set_thread(0);
630 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
631 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &status_reset);
632 	CU_ASSERT(rc == 0);
633 
634 	CU_ASSERT(bdev_ch[0]->flags == 0);
635 	CU_ASSERT(bdev_ch[1]->flags == 0);
636 	poll_threads();
637 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_RESET_IN_PROGRESS);
638 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_RESET_IN_PROGRESS);
639 
640 	set_thread(0);
641 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
642 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
643 	CU_ASSERT(rc == 0);
644 
645 	set_thread(1);
646 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
647 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
648 	CU_ASSERT(rc == 0);
649 
650 	/*
651 	 * A reset is in progress so these read I/O should complete with aborted.  Note that we
652 	 *  need to poll_threads() since I/O completed inline have their completion deferred.
653 	 */
654 	poll_threads();
655 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
656 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
657 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
658 
659 	/*
660 	 * Complete the reset
661 	 */
662 	set_thread(0);
663 	stub_complete_io(g_bdev.io_target, 0);
664 
665 	/*
666 	 * Only poll thread 0. We should not get a completion.
667 	 */
668 	poll_thread(0);
669 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
670 
671 	/*
672 	 * Poll both thread 0 and 1 so the messages can propagate and we
673 	 * get a completion.
674 	 */
675 	poll_threads();
676 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
677 
678 	spdk_put_io_channel(io_ch[0]);
679 	set_thread(1);
680 	spdk_put_io_channel(io_ch[1]);
681 	poll_threads();
682 
683 	teardown_test();
684 }
685 
686 static void
687 basic_qos(void)
688 {
689 	struct spdk_io_channel *io_ch[2];
690 	struct spdk_bdev_channel *bdev_ch[2];
691 	struct spdk_bdev *bdev;
692 	enum spdk_bdev_io_status status, abort_status;
693 	int rc;
694 
695 	setup_test();
696 
697 	/* Enable QoS */
698 	bdev = &g_bdev.bdev;
699 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
700 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
701 	TAILQ_INIT(&bdev->internal.qos->queued);
702 	/*
703 	 * Enable read/write IOPS, read only byte per second and
704 	 * read/write byte per second rate limits.
705 	 * In this case, all rate limits will take equal effect.
706 	 */
707 	/* 2000 read/write I/O per second, or 2 per millisecond */
708 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
709 	/* 8K read/write byte per millisecond with 4K block size */
710 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
711 	/* 8K read only byte per millisecond with 4K block size */
712 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 8192000;
713 
714 	g_get_io_channel = true;
715 
716 	set_thread(0);
717 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
718 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
719 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
720 
721 	set_thread(1);
722 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
723 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
724 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
725 
726 	/*
727 	 * Send an I/O on thread 0, which is where the QoS thread is running.
728 	 */
729 	set_thread(0);
730 	status = SPDK_BDEV_IO_STATUS_PENDING;
731 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
732 	CU_ASSERT(rc == 0);
733 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
734 	poll_threads();
735 	stub_complete_io(g_bdev.io_target, 0);
736 	poll_threads();
737 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
738 
739 	/* Send an I/O on thread 1. The QoS thread is not running here. */
740 	status = SPDK_BDEV_IO_STATUS_PENDING;
741 	set_thread(1);
742 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
743 	CU_ASSERT(rc == 0);
744 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
745 	poll_threads();
746 	/* Complete I/O on thread 1. This should not complete the I/O we submitted */
747 	stub_complete_io(g_bdev.io_target, 0);
748 	poll_threads();
749 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
750 	/* Now complete I/O on thread 0 */
751 	set_thread(0);
752 	poll_threads();
753 	stub_complete_io(g_bdev.io_target, 0);
754 	poll_threads();
755 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
756 
757 	/* Reset rate limit for the next test cases. */
758 	spdk_delay_us(SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
759 	poll_threads();
760 
761 	/*
762 	 * Test abort request when QoS is enabled.
763 	 */
764 
765 	/* Send an I/O on thread 0, which is where the QoS thread is running. */
766 	set_thread(0);
767 	status = SPDK_BDEV_IO_STATUS_PENDING;
768 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
769 	CU_ASSERT(rc == 0);
770 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
771 	/* Send an abort to the I/O on the same thread. */
772 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
773 	rc = spdk_bdev_abort(g_desc, io_ch[0], &status, io_during_io_done, &abort_status);
774 	CU_ASSERT(rc == 0);
775 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
776 	poll_threads();
777 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
778 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
779 
780 	/* Send an I/O on thread 1. The QoS thread is not running here. */
781 	status = SPDK_BDEV_IO_STATUS_PENDING;
782 	set_thread(1);
783 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
784 	CU_ASSERT(rc == 0);
785 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
786 	poll_threads();
787 	/* Send an abort to the I/O on the same thread. */
788 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
789 	rc = spdk_bdev_abort(g_desc, io_ch[1], &status, io_during_io_done, &abort_status);
790 	CU_ASSERT(rc == 0);
791 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
792 	poll_threads();
793 	/* Complete the I/O with failure and the abort with success on thread 1. */
794 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
795 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
796 
797 	set_thread(0);
798 
799 	/*
800 	 * Close the descriptor only, which should stop the qos channel as
801 	 * the last descriptor removed.
802 	 */
803 	spdk_bdev_close(g_desc);
804 	poll_threads();
805 	CU_ASSERT(bdev->internal.qos->ch == NULL);
806 
807 	/*
808 	 * Open the bdev again which shall setup the qos channel as the
809 	 * channels are valid.
810 	 */
811 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
812 	poll_threads();
813 	CU_ASSERT(bdev->internal.qos->ch != NULL);
814 
815 	/* Tear down the channels */
816 	set_thread(0);
817 	spdk_put_io_channel(io_ch[0]);
818 	set_thread(1);
819 	spdk_put_io_channel(io_ch[1]);
820 	poll_threads();
821 	set_thread(0);
822 
823 	/* Close the descriptor, which should stop the qos channel */
824 	spdk_bdev_close(g_desc);
825 	poll_threads();
826 	CU_ASSERT(bdev->internal.qos->ch == NULL);
827 
828 	/* Open the bdev again, no qos channel setup without valid channels. */
829 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
830 	poll_threads();
831 	CU_ASSERT(bdev->internal.qos->ch == NULL);
832 
833 	/* Create the channels in reverse order. */
834 	set_thread(1);
835 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
836 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
837 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
838 
839 	set_thread(0);
840 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
841 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
842 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
843 
844 	/* Confirm that the qos thread is now thread 1 */
845 	CU_ASSERT(bdev->internal.qos->ch == bdev_ch[1]);
846 
847 	/* Tear down the channels */
848 	set_thread(0);
849 	spdk_put_io_channel(io_ch[0]);
850 	set_thread(1);
851 	spdk_put_io_channel(io_ch[1]);
852 	poll_threads();
853 
854 	set_thread(0);
855 
856 	teardown_test();
857 }
858 
859 static void
860 io_during_qos_queue(void)
861 {
862 	struct spdk_io_channel *io_ch[2];
863 	struct spdk_bdev_channel *bdev_ch[2];
864 	struct spdk_bdev *bdev;
865 	enum spdk_bdev_io_status status0, status1, status2;
866 	int rc;
867 
868 	setup_test();
869 	MOCK_SET(spdk_get_ticks, 0);
870 
871 	/* Enable QoS */
872 	bdev = &g_bdev.bdev;
873 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
874 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
875 	TAILQ_INIT(&bdev->internal.qos->queued);
876 	/*
877 	 * Enable read/write IOPS, read only byte per sec, write only
878 	 * byte per sec and read/write byte per sec rate limits.
879 	 * In this case, both read only and write only byte per sec
880 	 * rate limit will take effect.
881 	 */
882 	/* 4000 read/write I/O per second, or 4 per millisecond */
883 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 4000;
884 	/* 8K byte per millisecond with 4K block size */
885 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
886 	/* 4K byte per millisecond with 4K block size */
887 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 4096000;
888 	/* 4K byte per millisecond with 4K block size */
889 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 4096000;
890 
891 	g_get_io_channel = true;
892 
893 	/* Create channels */
894 	set_thread(0);
895 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
896 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
897 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
898 
899 	set_thread(1);
900 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
901 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
902 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
903 
904 	/* Send two read I/Os */
905 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
906 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
907 	CU_ASSERT(rc == 0);
908 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
909 	set_thread(0);
910 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
911 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
912 	CU_ASSERT(rc == 0);
913 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
914 	/* Send one write I/O */
915 	status2 = SPDK_BDEV_IO_STATUS_PENDING;
916 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status2);
917 	CU_ASSERT(rc == 0);
918 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_PENDING);
919 
920 	/* Complete any I/O that arrived at the disk */
921 	poll_threads();
922 	set_thread(1);
923 	stub_complete_io(g_bdev.io_target, 0);
924 	set_thread(0);
925 	stub_complete_io(g_bdev.io_target, 0);
926 	poll_threads();
927 
928 	/* Only one of the two read I/Os should complete. (logical XOR) */
929 	if (status0 == SPDK_BDEV_IO_STATUS_SUCCESS) {
930 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
931 	} else {
932 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
933 	}
934 	/* The write I/O should complete. */
935 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
936 
937 	/* Advance in time by a millisecond */
938 	spdk_delay_us(1000);
939 
940 	/* Complete more I/O */
941 	poll_threads();
942 	set_thread(1);
943 	stub_complete_io(g_bdev.io_target, 0);
944 	set_thread(0);
945 	stub_complete_io(g_bdev.io_target, 0);
946 	poll_threads();
947 
948 	/* Now the second read I/O should be done */
949 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
950 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
951 
952 	/* Tear down the channels */
953 	set_thread(1);
954 	spdk_put_io_channel(io_ch[1]);
955 	set_thread(0);
956 	spdk_put_io_channel(io_ch[0]);
957 	poll_threads();
958 
959 	teardown_test();
960 }
961 
962 static void
963 io_during_qos_reset(void)
964 {
965 	struct spdk_io_channel *io_ch[2];
966 	struct spdk_bdev_channel *bdev_ch[2];
967 	struct spdk_bdev *bdev;
968 	enum spdk_bdev_io_status status0, status1, reset_status;
969 	int rc;
970 
971 	setup_test();
972 	MOCK_SET(spdk_get_ticks, 0);
973 
974 	/* Enable QoS */
975 	bdev = &g_bdev.bdev;
976 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
977 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
978 	TAILQ_INIT(&bdev->internal.qos->queued);
979 	/*
980 	 * Enable read/write IOPS, write only byte per sec and
981 	 * read/write byte per second rate limits.
982 	 * In this case, read/write byte per second rate limit will
983 	 * take effect first.
984 	 */
985 	/* 2000 read/write I/O per second, or 2 per millisecond */
986 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
987 	/* 4K byte per millisecond with 4K block size */
988 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 4096000;
989 	/* 8K byte per millisecond with 4K block size */
990 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 8192000;
991 
992 	g_get_io_channel = true;
993 
994 	/* Create channels */
995 	set_thread(0);
996 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
997 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
998 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
999 
1000 	set_thread(1);
1001 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1002 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1003 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
1004 
1005 	/* Send two I/O. One of these gets queued by QoS. The other is sitting at the disk. */
1006 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
1007 	rc = spdk_bdev_write_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
1008 	CU_ASSERT(rc == 0);
1009 	set_thread(0);
1010 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
1011 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
1012 	CU_ASSERT(rc == 0);
1013 
1014 	poll_threads();
1015 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
1016 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
1017 
1018 	/* Reset the bdev. */
1019 	reset_status = SPDK_BDEV_IO_STATUS_PENDING;
1020 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &reset_status);
1021 	CU_ASSERT(rc == 0);
1022 
1023 	/* Complete any I/O that arrived at the disk */
1024 	poll_threads();
1025 	set_thread(1);
1026 	stub_complete_io(g_bdev.io_target, 0);
1027 	set_thread(0);
1028 	stub_complete_io(g_bdev.io_target, 0);
1029 	poll_threads();
1030 
1031 	CU_ASSERT(reset_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1032 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
1033 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
1034 
1035 	/* Tear down the channels */
1036 	set_thread(1);
1037 	spdk_put_io_channel(io_ch[1]);
1038 	set_thread(0);
1039 	spdk_put_io_channel(io_ch[0]);
1040 	poll_threads();
1041 
1042 	teardown_test();
1043 }
1044 
1045 static void
1046 enomem_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1047 {
1048 	enum spdk_bdev_io_status *status = cb_arg;
1049 
1050 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1051 	spdk_bdev_free_io(bdev_io);
1052 }
1053 
1054 static void
1055 enomem(void)
1056 {
1057 	struct spdk_io_channel *io_ch;
1058 	struct spdk_bdev_channel *bdev_ch;
1059 	struct spdk_bdev_shared_resource *shared_resource;
1060 	struct ut_bdev_channel *ut_ch;
1061 	const uint32_t IO_ARRAY_SIZE = 64;
1062 	const uint32_t AVAIL = 20;
1063 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE], status_reset;
1064 	uint32_t nomem_cnt, i;
1065 	struct spdk_bdev_io *first_io;
1066 	int rc;
1067 
1068 	setup_test();
1069 
1070 	set_thread(0);
1071 	io_ch = spdk_bdev_get_io_channel(g_desc);
1072 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1073 	shared_resource = bdev_ch->shared_resource;
1074 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1075 	ut_ch->avail_cnt = AVAIL;
1076 
1077 	/* First submit a number of IOs equal to what the channel can support. */
1078 	for (i = 0; i < AVAIL; i++) {
1079 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1080 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1081 		CU_ASSERT(rc == 0);
1082 	}
1083 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1084 
1085 	/*
1086 	 * Next, submit one additional I/O.  This one should fail with ENOMEM and then go onto
1087 	 *  the enomem_io list.
1088 	 */
1089 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1090 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1091 	CU_ASSERT(rc == 0);
1092 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1093 	first_io = TAILQ_FIRST(&shared_resource->nomem_io);
1094 
1095 	/*
1096 	 * Now submit a bunch more I/O.  These should all fail with ENOMEM and get queued behind
1097 	 *  the first_io above.
1098 	 */
1099 	for (i = AVAIL + 1; i < IO_ARRAY_SIZE; i++) {
1100 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1101 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1102 		CU_ASSERT(rc == 0);
1103 	}
1104 
1105 	/* Assert that first_io is still at the head of the list. */
1106 	CU_ASSERT(TAILQ_FIRST(&shared_resource->nomem_io) == first_io);
1107 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == (IO_ARRAY_SIZE - AVAIL));
1108 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1109 	CU_ASSERT(shared_resource->nomem_threshold == (AVAIL - NOMEM_THRESHOLD_COUNT));
1110 
1111 	/*
1112 	 * Complete 1 I/O only.  The key check here is bdev_io_tailq_cnt - this should not have
1113 	 *  changed since completing just 1 I/O should not trigger retrying the queued nomem_io
1114 	 *  list.
1115 	 */
1116 	stub_complete_io(g_bdev.io_target, 1);
1117 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1118 
1119 	/*
1120 	 * Complete enough I/O to hit the nomem_threshold.  This should trigger retrying nomem_io,
1121 	 *  and we should see I/O get resubmitted to the test bdev module.
1122 	 */
1123 	stub_complete_io(g_bdev.io_target, NOMEM_THRESHOLD_COUNT - 1);
1124 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) < nomem_cnt);
1125 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1126 
1127 	/* Complete 1 I/O only.  This should not trigger retrying the queued nomem_io. */
1128 	stub_complete_io(g_bdev.io_target, 1);
1129 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1130 
1131 	/*
1132 	 * Send a reset and confirm that all I/O are completed, including the ones that
1133 	 *  were queued on the nomem_io list.
1134 	 */
1135 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
1136 	rc = spdk_bdev_reset(g_desc, io_ch, enomem_done, &status_reset);
1137 	poll_threads();
1138 	CU_ASSERT(rc == 0);
1139 	/* This will complete the reset. */
1140 	stub_complete_io(g_bdev.io_target, 0);
1141 
1142 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == 0);
1143 	CU_ASSERT(shared_resource->io_outstanding == 0);
1144 
1145 	spdk_put_io_channel(io_ch);
1146 	poll_threads();
1147 	teardown_test();
1148 }
1149 
1150 static void
1151 enomem_multi_bdev(void)
1152 {
1153 	struct spdk_io_channel *io_ch;
1154 	struct spdk_bdev_channel *bdev_ch;
1155 	struct spdk_bdev_shared_resource *shared_resource;
1156 	struct ut_bdev_channel *ut_ch;
1157 	const uint32_t IO_ARRAY_SIZE = 64;
1158 	const uint32_t AVAIL = 20;
1159 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1160 	uint32_t i;
1161 	struct ut_bdev *second_bdev;
1162 	struct spdk_bdev_desc *second_desc = NULL;
1163 	struct spdk_bdev_channel *second_bdev_ch;
1164 	struct spdk_io_channel *second_ch;
1165 	int rc;
1166 
1167 	setup_test();
1168 
1169 	/* Register second bdev with the same io_target  */
1170 	second_bdev = calloc(1, sizeof(*second_bdev));
1171 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1172 	register_bdev(second_bdev, "ut_bdev2", g_bdev.io_target);
1173 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1174 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1175 
1176 	set_thread(0);
1177 	io_ch = spdk_bdev_get_io_channel(g_desc);
1178 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1179 	shared_resource = bdev_ch->shared_resource;
1180 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1181 	ut_ch->avail_cnt = AVAIL;
1182 
1183 	second_ch = spdk_bdev_get_io_channel(second_desc);
1184 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1185 	SPDK_CU_ASSERT_FATAL(shared_resource == second_bdev_ch->shared_resource);
1186 
1187 	/* Saturate io_target through bdev A. */
1188 	for (i = 0; i < AVAIL; i++) {
1189 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1190 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1191 		CU_ASSERT(rc == 0);
1192 	}
1193 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1194 
1195 	/*
1196 	 * Now submit I/O through the second bdev. This should fail with ENOMEM
1197 	 * and then go onto the nomem_io list.
1198 	 */
1199 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1200 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1201 	CU_ASSERT(rc == 0);
1202 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1203 
1204 	/* Complete first bdev's I/O. This should retry sending second bdev's nomem_io */
1205 	stub_complete_io(g_bdev.io_target, AVAIL);
1206 
1207 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1208 	CU_ASSERT(shared_resource->io_outstanding == 1);
1209 
1210 	/* Now complete our retried I/O  */
1211 	stub_complete_io(g_bdev.io_target, 1);
1212 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1213 
1214 	spdk_put_io_channel(io_ch);
1215 	spdk_put_io_channel(second_ch);
1216 	spdk_bdev_close(second_desc);
1217 	unregister_bdev(second_bdev);
1218 	poll_threads();
1219 	free(second_bdev);
1220 	teardown_test();
1221 }
1222 
1223 static void
1224 enomem_multi_bdev_unregister(void)
1225 {
1226 	struct spdk_io_channel *io_ch;
1227 	struct spdk_bdev_channel *bdev_ch;
1228 	struct spdk_bdev_shared_resource *shared_resource;
1229 	struct ut_bdev_channel *ut_ch;
1230 	const uint32_t IO_ARRAY_SIZE = 64;
1231 	const uint32_t AVAIL = 20;
1232 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1233 	uint32_t i;
1234 	int rc;
1235 
1236 	setup_test();
1237 
1238 	set_thread(0);
1239 	io_ch = spdk_bdev_get_io_channel(g_desc);
1240 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1241 	shared_resource = bdev_ch->shared_resource;
1242 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1243 	ut_ch->avail_cnt = AVAIL;
1244 
1245 	/* Saturate io_target through the bdev. */
1246 	for (i = 0; i < AVAIL; i++) {
1247 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1248 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1249 		CU_ASSERT(rc == 0);
1250 	}
1251 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1252 
1253 	/*
1254 	 * Now submit I/O through the bdev. This should fail with ENOMEM
1255 	 * and then go onto the nomem_io list.
1256 	 */
1257 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1258 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1259 	CU_ASSERT(rc == 0);
1260 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1261 
1262 	/* Unregister the bdev to abort the IOs from nomem_io queue. */
1263 	unregister_bdev(&g_bdev);
1264 	CU_ASSERT(status[AVAIL] == SPDK_BDEV_IO_STATUS_FAILED);
1265 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1266 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == AVAIL);
1267 
1268 	/* Complete the bdev's I/O. */
1269 	stub_complete_io(g_bdev.io_target, AVAIL);
1270 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1271 
1272 	spdk_put_io_channel(io_ch);
1273 	poll_threads();
1274 	teardown_test();
1275 }
1276 
1277 static void
1278 enomem_multi_io_target(void)
1279 {
1280 	struct spdk_io_channel *io_ch;
1281 	struct spdk_bdev_channel *bdev_ch;
1282 	struct ut_bdev_channel *ut_ch;
1283 	const uint32_t IO_ARRAY_SIZE = 64;
1284 	const uint32_t AVAIL = 20;
1285 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1286 	uint32_t i;
1287 	int new_io_device;
1288 	struct ut_bdev *second_bdev;
1289 	struct spdk_bdev_desc *second_desc = NULL;
1290 	struct spdk_bdev_channel *second_bdev_ch;
1291 	struct spdk_io_channel *second_ch;
1292 	int rc;
1293 
1294 	setup_test();
1295 
1296 	/* Create new io_target and a second bdev using it */
1297 	spdk_io_device_register(&new_io_device, stub_create_ch, stub_destroy_ch,
1298 				sizeof(struct ut_bdev_channel), NULL);
1299 	second_bdev = calloc(1, sizeof(*second_bdev));
1300 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1301 	register_bdev(second_bdev, "ut_bdev2", &new_io_device);
1302 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1303 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1304 
1305 	set_thread(0);
1306 	io_ch = spdk_bdev_get_io_channel(g_desc);
1307 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1308 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1309 	ut_ch->avail_cnt = AVAIL;
1310 
1311 	/* Different io_target should imply a different shared_resource */
1312 	second_ch = spdk_bdev_get_io_channel(second_desc);
1313 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1314 	SPDK_CU_ASSERT_FATAL(bdev_ch->shared_resource != second_bdev_ch->shared_resource);
1315 
1316 	/* Saturate io_target through bdev A. */
1317 	for (i = 0; i < AVAIL; i++) {
1318 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1319 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1320 		CU_ASSERT(rc == 0);
1321 	}
1322 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1323 
1324 	/* Issue one more I/O to fill ENOMEM list. */
1325 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1326 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1327 	CU_ASSERT(rc == 0);
1328 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1329 
1330 	/*
1331 	 * Now submit I/O through the second bdev. This should go through and complete
1332 	 * successfully because we're using a different io_device underneath.
1333 	 */
1334 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1335 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1336 	CU_ASSERT(rc == 0);
1337 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&second_bdev_ch->shared_resource->nomem_io));
1338 	stub_complete_io(second_bdev->io_target, 1);
1339 
1340 	/* Cleanup; Complete outstanding I/O. */
1341 	stub_complete_io(g_bdev.io_target, AVAIL);
1342 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1343 	/* Complete the ENOMEM I/O */
1344 	stub_complete_io(g_bdev.io_target, 1);
1345 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1346 
1347 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1348 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1349 	spdk_put_io_channel(io_ch);
1350 	spdk_put_io_channel(second_ch);
1351 	spdk_bdev_close(second_desc);
1352 	unregister_bdev(second_bdev);
1353 	spdk_io_device_unregister(&new_io_device, NULL);
1354 	poll_threads();
1355 	free(second_bdev);
1356 	teardown_test();
1357 }
1358 
1359 static void
1360 qos_dynamic_enable_done(void *cb_arg, int status)
1361 {
1362 	int *rc = cb_arg;
1363 	*rc = status;
1364 }
1365 
1366 static void
1367 qos_dynamic_enable(void)
1368 {
1369 	struct spdk_io_channel *io_ch[2];
1370 	struct spdk_bdev_channel *bdev_ch[2];
1371 	struct spdk_bdev *bdev;
1372 	enum spdk_bdev_io_status bdev_io_status[2];
1373 	uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
1374 	int status, second_status, rc, i;
1375 
1376 	setup_test();
1377 	MOCK_SET(spdk_get_ticks, 0);
1378 
1379 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1380 		limits[i] = UINT64_MAX;
1381 	}
1382 
1383 	bdev = &g_bdev.bdev;
1384 
1385 	g_get_io_channel = true;
1386 
1387 	/* Create channels */
1388 	set_thread(0);
1389 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1390 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1391 	CU_ASSERT(bdev_ch[0]->flags == 0);
1392 
1393 	set_thread(1);
1394 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1395 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1396 	CU_ASSERT(bdev_ch[1]->flags == 0);
1397 
1398 	set_thread(0);
1399 
1400 	/*
1401 	 * Enable QoS: Read/Write IOPS, Read/Write byte,
1402 	 * Read only byte and Write only byte per second
1403 	 * rate limits.
1404 	 * More than 10 I/Os allowed per timeslice.
1405 	 */
1406 	status = -1;
1407 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1408 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 100;
1409 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 100;
1410 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 10;
1411 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1412 	poll_threads();
1413 	CU_ASSERT(status == 0);
1414 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1415 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1416 
1417 	/*
1418 	 * Submit and complete 10 I/O to fill the QoS allotment for this timeslice.
1419 	 * Additional I/O will then be queued.
1420 	 */
1421 	set_thread(0);
1422 	for (i = 0; i < 10; i++) {
1423 		bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1424 		rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1425 		CU_ASSERT(rc == 0);
1426 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1427 		poll_thread(0);
1428 		stub_complete_io(g_bdev.io_target, 0);
1429 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1430 	}
1431 
1432 	/*
1433 	 * Send two more I/O.  These I/O will be queued since the current timeslice allotment has been
1434 	 * filled already.  We want to test that when QoS is disabled that these two I/O:
1435 	 *  1) are not aborted
1436 	 *  2) are sent back to their original thread for resubmission
1437 	 */
1438 	bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1439 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1440 	CU_ASSERT(rc == 0);
1441 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1442 	set_thread(1);
1443 	bdev_io_status[1] = SPDK_BDEV_IO_STATUS_PENDING;
1444 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &bdev_io_status[1]);
1445 	CU_ASSERT(rc == 0);
1446 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1447 	poll_threads();
1448 
1449 	/*
1450 	 * Disable QoS: Read/Write IOPS, Read/Write byte,
1451 	 * Read only byte rate limits
1452 	 */
1453 	status = -1;
1454 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1455 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 0;
1456 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 0;
1457 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1458 	poll_threads();
1459 	CU_ASSERT(status == 0);
1460 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1461 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1462 
1463 	/* Disable QoS: Write only Byte per second rate limit */
1464 	status = -1;
1465 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 0;
1466 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1467 	poll_threads();
1468 	CU_ASSERT(status == 0);
1469 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1470 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1471 
1472 	/*
1473 	 * All I/O should have been resubmitted back on their original thread.  Complete
1474 	 *  all I/O on thread 0, and ensure that only the thread 0 I/O was completed.
1475 	 */
1476 	set_thread(0);
1477 	stub_complete_io(g_bdev.io_target, 0);
1478 	poll_threads();
1479 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1480 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1481 
1482 	/* Now complete all I/O on thread 1 and ensure the thread 1 I/O was completed. */
1483 	set_thread(1);
1484 	stub_complete_io(g_bdev.io_target, 0);
1485 	poll_threads();
1486 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_SUCCESS);
1487 
1488 	/* Disable QoS again */
1489 	status = -1;
1490 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1491 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1492 	poll_threads();
1493 	CU_ASSERT(status == 0); /* This should succeed */
1494 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1495 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1496 
1497 	/* Enable QoS on thread 0 */
1498 	status = -1;
1499 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1500 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1501 	poll_threads();
1502 	CU_ASSERT(status == 0);
1503 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1504 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1505 
1506 	/* Disable QoS on thread 1 */
1507 	set_thread(1);
1508 	status = -1;
1509 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1510 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1511 	/* Don't poll yet. This should leave the channels with QoS enabled */
1512 	CU_ASSERT(status == -1);
1513 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1514 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1515 
1516 	/* Enable QoS. This should immediately fail because the previous disable QoS hasn't completed. */
1517 	second_status = 0;
1518 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 10;
1519 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &second_status);
1520 	poll_threads();
1521 	CU_ASSERT(status == 0); /* The disable should succeed */
1522 	CU_ASSERT(second_status < 0); /* The enable should fail */
1523 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1524 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1525 
1526 	/* Enable QoS on thread 1. This should succeed now that the disable has completed. */
1527 	status = -1;
1528 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1529 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1530 	poll_threads();
1531 	CU_ASSERT(status == 0);
1532 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1533 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1534 
1535 	/* Tear down the channels */
1536 	set_thread(0);
1537 	spdk_put_io_channel(io_ch[0]);
1538 	set_thread(1);
1539 	spdk_put_io_channel(io_ch[1]);
1540 	poll_threads();
1541 
1542 	set_thread(0);
1543 	teardown_test();
1544 }
1545 
1546 static void
1547 histogram_status_cb(void *cb_arg, int status)
1548 {
1549 	g_status = status;
1550 }
1551 
1552 static void
1553 histogram_data_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
1554 {
1555 	g_status = status;
1556 	g_histogram = histogram;
1557 }
1558 
1559 static void
1560 histogram_io_count(void *ctx, uint64_t start, uint64_t end, uint64_t count,
1561 		   uint64_t total, uint64_t so_far)
1562 {
1563 	g_count += count;
1564 }
1565 
1566 static void
1567 bdev_histograms_mt(void)
1568 {
1569 	struct spdk_io_channel *ch[2];
1570 	struct spdk_histogram_data *histogram;
1571 	uint8_t buf[4096];
1572 	int status = false;
1573 	int rc;
1574 
1575 
1576 	setup_test();
1577 
1578 	set_thread(0);
1579 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1580 	CU_ASSERT(ch[0] != NULL);
1581 
1582 	set_thread(1);
1583 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1584 	CU_ASSERT(ch[1] != NULL);
1585 
1586 
1587 	/* Enable histogram */
1588 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, true);
1589 	poll_threads();
1590 	CU_ASSERT(g_status == 0);
1591 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1592 
1593 	/* Allocate histogram */
1594 	histogram = spdk_histogram_data_alloc();
1595 
1596 	/* Check if histogram is zeroed */
1597 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1598 	poll_threads();
1599 	CU_ASSERT(g_status == 0);
1600 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1601 
1602 	g_count = 0;
1603 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1604 
1605 	CU_ASSERT(g_count == 0);
1606 
1607 	set_thread(0);
1608 	rc = spdk_bdev_write_blocks(g_desc, ch[0], &buf, 0, 1, io_during_io_done, &status);
1609 	CU_ASSERT(rc == 0);
1610 
1611 	spdk_delay_us(10);
1612 	stub_complete_io(g_bdev.io_target, 1);
1613 	poll_threads();
1614 	CU_ASSERT(status == true);
1615 
1616 
1617 	set_thread(1);
1618 	rc = spdk_bdev_read_blocks(g_desc, ch[1], &buf, 0, 1, io_during_io_done, &status);
1619 	CU_ASSERT(rc == 0);
1620 
1621 	spdk_delay_us(10);
1622 	stub_complete_io(g_bdev.io_target, 1);
1623 	poll_threads();
1624 	CU_ASSERT(status == true);
1625 
1626 	set_thread(0);
1627 
1628 	/* Check if histogram gathered data from all I/O channels */
1629 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1630 	poll_threads();
1631 	CU_ASSERT(g_status == 0);
1632 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1633 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1634 
1635 	g_count = 0;
1636 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1637 	CU_ASSERT(g_count == 2);
1638 
1639 	/* Disable histogram */
1640 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, false);
1641 	poll_threads();
1642 	CU_ASSERT(g_status == 0);
1643 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == false);
1644 
1645 	spdk_histogram_data_free(histogram);
1646 
1647 	/* Tear down the channels */
1648 	set_thread(0);
1649 	spdk_put_io_channel(ch[0]);
1650 	set_thread(1);
1651 	spdk_put_io_channel(ch[1]);
1652 	poll_threads();
1653 	set_thread(0);
1654 	teardown_test();
1655 
1656 }
1657 
1658 struct timeout_io_cb_arg {
1659 	struct iovec iov;
1660 	uint8_t type;
1661 };
1662 
1663 static int
1664 bdev_channel_count_submitted_io(struct spdk_bdev_channel *ch)
1665 {
1666 	struct spdk_bdev_io *bdev_io;
1667 	int n = 0;
1668 
1669 	if (!ch) {
1670 		return -1;
1671 	}
1672 
1673 	TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) {
1674 		n++;
1675 	}
1676 
1677 	return n;
1678 }
1679 
1680 static void
1681 bdev_channel_io_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1682 {
1683 	struct timeout_io_cb_arg *ctx = cb_arg;
1684 
1685 	ctx->type = bdev_io->type;
1686 	ctx->iov.iov_base = bdev_io->iov.iov_base;
1687 	ctx->iov.iov_len = bdev_io->iov.iov_len;
1688 }
1689 
1690 static bool g_io_done;
1691 
1692 static void
1693 io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1694 {
1695 	g_io_done = true;
1696 	spdk_bdev_free_io(bdev_io);
1697 }
1698 
1699 static void
1700 bdev_set_io_timeout_mt(void)
1701 {
1702 	struct spdk_io_channel *ch[3];
1703 	struct spdk_bdev_channel *bdev_ch[3];
1704 	struct timeout_io_cb_arg cb_arg;
1705 
1706 	setup_test();
1707 
1708 	g_bdev.bdev.optimal_io_boundary = 16;
1709 	g_bdev.bdev.split_on_optimal_io_boundary = true;
1710 
1711 	set_thread(0);
1712 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1713 	CU_ASSERT(ch[0] != NULL);
1714 
1715 	set_thread(1);
1716 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1717 	CU_ASSERT(ch[1] != NULL);
1718 
1719 	set_thread(2);
1720 	ch[2] = spdk_bdev_get_io_channel(g_desc);
1721 	CU_ASSERT(ch[2] != NULL);
1722 
1723 	/* Multi-thread mode
1724 	 * 1, Check the poller was registered successfully
1725 	 * 2, Check the timeout IO and ensure the IO was the submitted by user
1726 	 * 3, Check the link int the bdev_ch works right.
1727 	 * 4, Close desc and put io channel during the timeout poller is polling
1728 	 */
1729 
1730 	/* In desc thread set the timeout */
1731 	set_thread(0);
1732 	CU_ASSERT(spdk_bdev_set_timeout(g_desc, 5, bdev_channel_io_timeout_cb, &cb_arg) == 0);
1733 	CU_ASSERT(g_desc->io_timeout_poller != NULL);
1734 	CU_ASSERT(g_desc->cb_fn == bdev_channel_io_timeout_cb);
1735 	CU_ASSERT(g_desc->cb_arg == &cb_arg);
1736 
1737 	/* check the IO submitted list and timeout handler */
1738 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x2000, 0, 1, io_done, NULL) == 0);
1739 	bdev_ch[0] = spdk_io_channel_get_ctx(ch[0]);
1740 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 1);
1741 
1742 	set_thread(1);
1743 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1744 	bdev_ch[1] = spdk_io_channel_get_ctx(ch[1]);
1745 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 1);
1746 
1747 	/* Now test that a single-vector command is split correctly.
1748 	 * Offset 14, length 8, payload 0xF000
1749 	 *  Child - Offset 14, length 2, payload 0xF000
1750 	 *  Child - Offset 16, length 6, payload 0xF000 + 2 * 512
1751 	 *
1752 	 * Set up the expected values before calling spdk_bdev_read_blocks
1753 	 */
1754 	set_thread(2);
1755 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0xF000, 14, 8, io_done, NULL) == 0);
1756 	bdev_ch[2] = spdk_io_channel_get_ctx(ch[2]);
1757 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 3);
1758 
1759 	set_thread(0);
1760 	memset(&cb_arg, 0, sizeof(cb_arg));
1761 	spdk_delay_us(3 * spdk_get_ticks_hz());
1762 	poll_threads();
1763 	CU_ASSERT(cb_arg.type == 0);
1764 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1765 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1766 
1767 	/* Now the time reach the limit */
1768 	spdk_delay_us(3 * spdk_get_ticks_hz());
1769 	poll_thread(0);
1770 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1771 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
1772 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1773 	stub_complete_io(g_bdev.io_target, 1);
1774 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 0);
1775 
1776 	memset(&cb_arg, 0, sizeof(cb_arg));
1777 	set_thread(1);
1778 	poll_thread(1);
1779 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
1780 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x1000);
1781 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1782 	stub_complete_io(g_bdev.io_target, 1);
1783 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 0);
1784 
1785 	memset(&cb_arg, 0, sizeof(cb_arg));
1786 	set_thread(2);
1787 	poll_thread(2);
1788 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1789 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0xF000);
1790 	CU_ASSERT(cb_arg.iov.iov_len == 8 * g_bdev.bdev.blocklen);
1791 	stub_complete_io(g_bdev.io_target, 1);
1792 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 2);
1793 	stub_complete_io(g_bdev.io_target, 1);
1794 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 0);
1795 
1796 	/* Run poll_timeout_done() it means complete the timeout poller */
1797 	set_thread(0);
1798 	poll_thread(0);
1799 	CU_ASSERT(g_desc->refs == 0);
1800 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1801 	set_thread(1);
1802 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x2000, 0, 2, io_done, NULL) == 0);
1803 	set_thread(2);
1804 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0x3000, 0, 3, io_done, NULL) == 0);
1805 
1806 	/* Trigger timeout poller to run again, desc->refs is incremented.
1807 	 * In thread 0 we destroy the io channel before timeout poller runs.
1808 	 * Timeout callback is not called on thread 0.
1809 	 */
1810 	spdk_delay_us(6 * spdk_get_ticks_hz());
1811 	memset(&cb_arg, 0, sizeof(cb_arg));
1812 	set_thread(0);
1813 	stub_complete_io(g_bdev.io_target, 1);
1814 	spdk_put_io_channel(ch[0]);
1815 	poll_thread(0);
1816 	CU_ASSERT(g_desc->refs == 1)
1817 	CU_ASSERT(cb_arg.type == 0);
1818 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1819 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1820 
1821 	/* In thread 1 timeout poller runs then we destroy the io channel
1822 	 * Timeout callback is called on thread 1.
1823 	 */
1824 	memset(&cb_arg, 0, sizeof(cb_arg));
1825 	set_thread(1);
1826 	poll_thread(1);
1827 	stub_complete_io(g_bdev.io_target, 1);
1828 	spdk_put_io_channel(ch[1]);
1829 	poll_thread(1);
1830 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
1831 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
1832 	CU_ASSERT(cb_arg.iov.iov_len == 2 * g_bdev.bdev.blocklen);
1833 
1834 	/* Close the desc.
1835 	 * Unregister the timeout poller first.
1836 	 * Then decrement desc->refs but it's not zero yet so desc is not freed.
1837 	 */
1838 	set_thread(0);
1839 	spdk_bdev_close(g_desc);
1840 	CU_ASSERT(g_desc->refs == 1);
1841 	CU_ASSERT(g_desc->io_timeout_poller == NULL);
1842 
1843 	/* Timeout poller runs on thread 2 then we destroy the io channel.
1844 	 * Desc is closed so we would exit the timeout poller directly.
1845 	 * timeout callback is not called on thread 2.
1846 	 */
1847 	memset(&cb_arg, 0, sizeof(cb_arg));
1848 	set_thread(2);
1849 	poll_thread(2);
1850 	stub_complete_io(g_bdev.io_target, 1);
1851 	spdk_put_io_channel(ch[2]);
1852 	poll_thread(2);
1853 	CU_ASSERT(cb_arg.type == 0);
1854 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1855 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1856 
1857 	set_thread(0);
1858 	poll_thread(0);
1859 	g_teardown_done = false;
1860 	unregister_bdev(&g_bdev);
1861 	spdk_io_device_unregister(&g_io_device, NULL);
1862 	spdk_bdev_finish(finish_cb, NULL);
1863 	poll_threads();
1864 	memset(&g_bdev, 0, sizeof(g_bdev));
1865 	CU_ASSERT(g_teardown_done == true);
1866 	g_teardown_done = false;
1867 	free_threads();
1868 	free_cores();
1869 }
1870 
1871 static bool g_io_done2;
1872 static bool g_lock_lba_range_done;
1873 static bool g_unlock_lba_range_done;
1874 
1875 static void
1876 io_done2(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1877 {
1878 	g_io_done2 = true;
1879 	spdk_bdev_free_io(bdev_io);
1880 }
1881 
1882 static void
1883 lock_lba_range_done(void *ctx, int status)
1884 {
1885 	g_lock_lba_range_done = true;
1886 }
1887 
1888 static void
1889 unlock_lba_range_done(void *ctx, int status)
1890 {
1891 	g_unlock_lba_range_done = true;
1892 }
1893 
1894 static uint32_t
1895 stub_channel_outstanding_cnt(void *io_target)
1896 {
1897 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
1898 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1899 	uint32_t outstanding_cnt;
1900 
1901 	outstanding_cnt = ch->outstanding_cnt;
1902 
1903 	spdk_put_io_channel(_ch);
1904 	return outstanding_cnt;
1905 }
1906 
1907 static void
1908 lock_lba_range_then_submit_io(void)
1909 {
1910 	struct spdk_bdev_desc *desc = NULL;
1911 	void *io_target;
1912 	struct spdk_io_channel *io_ch[3];
1913 	struct spdk_bdev_channel *bdev_ch[3];
1914 	struct lba_range *range;
1915 	char buf[4096];
1916 	int ctx0, ctx1, ctx2;
1917 	int rc;
1918 
1919 	setup_test();
1920 
1921 	io_target = g_bdev.io_target;
1922 	desc = g_desc;
1923 
1924 	set_thread(0);
1925 	io_ch[0] = spdk_bdev_get_io_channel(desc);
1926 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1927 	CU_ASSERT(io_ch[0] != NULL);
1928 
1929 	set_thread(1);
1930 	io_ch[1] = spdk_bdev_get_io_channel(desc);
1931 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1932 	CU_ASSERT(io_ch[1] != NULL);
1933 
1934 	set_thread(0);
1935 	g_lock_lba_range_done = false;
1936 	rc = bdev_lock_lba_range(desc, io_ch[0], 20, 10, lock_lba_range_done, &ctx0);
1937 	CU_ASSERT(rc == 0);
1938 	poll_threads();
1939 
1940 	/* The lock should immediately become valid, since there are no outstanding
1941 	 * write I/O.
1942 	 */
1943 	CU_ASSERT(g_lock_lba_range_done == true);
1944 	range = TAILQ_FIRST(&bdev_ch[0]->locked_ranges);
1945 	SPDK_CU_ASSERT_FATAL(range != NULL);
1946 	CU_ASSERT(range->offset == 20);
1947 	CU_ASSERT(range->length == 10);
1948 	CU_ASSERT(range->owner_ch == bdev_ch[0]);
1949 
1950 	g_io_done = false;
1951 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1952 	rc = spdk_bdev_read_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
1953 	CU_ASSERT(rc == 0);
1954 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1955 
1956 	stub_complete_io(io_target, 1);
1957 	poll_threads();
1958 	CU_ASSERT(g_io_done == true);
1959 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1960 
1961 	/* Try a write I/O.  This should actually be allowed to execute, since the channel
1962 	 * holding the lock is submitting the write I/O.
1963 	 */
1964 	g_io_done = false;
1965 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1966 	rc = spdk_bdev_write_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
1967 	CU_ASSERT(rc == 0);
1968 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1969 
1970 	stub_complete_io(io_target, 1);
1971 	poll_threads();
1972 	CU_ASSERT(g_io_done == true);
1973 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1974 
1975 	/* Try a write I/O.  This should get queued in the io_locked tailq. */
1976 	set_thread(1);
1977 	g_io_done = false;
1978 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1979 	rc = spdk_bdev_write_blocks(desc, io_ch[1], buf, 20, 1, io_done, &ctx1);
1980 	CU_ASSERT(rc == 0);
1981 	poll_threads();
1982 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
1983 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1984 	CU_ASSERT(g_io_done == false);
1985 
1986 	/* Try to unlock the lba range using thread 1's io_ch.  This should fail. */
1987 	rc = bdev_unlock_lba_range(desc, io_ch[1], 20, 10, unlock_lba_range_done, &ctx1);
1988 	CU_ASSERT(rc == -EINVAL);
1989 
1990 	/* Now create a new channel and submit a write I/O with it.  This should also be queued.
1991 	 * The new channel should inherit the active locks from the bdev's internal list.
1992 	 */
1993 	set_thread(2);
1994 	io_ch[2] = spdk_bdev_get_io_channel(desc);
1995 	bdev_ch[2] = spdk_io_channel_get_ctx(io_ch[2]);
1996 	CU_ASSERT(io_ch[2] != NULL);
1997 
1998 	g_io_done2 = false;
1999 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2000 	rc = spdk_bdev_write_blocks(desc, io_ch[2], buf, 22, 2, io_done2, &ctx2);
2001 	CU_ASSERT(rc == 0);
2002 	poll_threads();
2003 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
2004 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2005 	CU_ASSERT(g_io_done2 == false);
2006 
2007 	set_thread(0);
2008 	rc = bdev_unlock_lba_range(desc, io_ch[0], 20, 10, unlock_lba_range_done, &ctx0);
2009 	CU_ASSERT(rc == 0);
2010 	poll_threads();
2011 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->locked_ranges));
2012 
2013 	/* The LBA range is unlocked, so the write IOs should now have started execution. */
2014 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
2015 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2016 
2017 	set_thread(1);
2018 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2019 	stub_complete_io(io_target, 1);
2020 	set_thread(2);
2021 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2022 	stub_complete_io(io_target, 1);
2023 
2024 	poll_threads();
2025 	CU_ASSERT(g_io_done == true);
2026 	CU_ASSERT(g_io_done2 == true);
2027 
2028 	/* Tear down the channels */
2029 	set_thread(0);
2030 	spdk_put_io_channel(io_ch[0]);
2031 	set_thread(1);
2032 	spdk_put_io_channel(io_ch[1]);
2033 	set_thread(2);
2034 	spdk_put_io_channel(io_ch[2]);
2035 	poll_threads();
2036 	set_thread(0);
2037 	teardown_test();
2038 }
2039 
2040 /* spdk_bdev_reset() freezes and unfreezes I/O channels by using spdk_for_each_channel().
2041  * spdk_bdev_unregister() calls spdk_io_device_unregister() in the end. However
2042  * spdk_io_device_unregister() fails if it is called while executing spdk_for_each_channel().
2043  * Hence, in this case, spdk_io_device_unregister() is deferred until spdk_bdev_reset()
2044  * completes. Test this behavior.
2045  */
2046 static void
2047 unregister_during_reset(void)
2048 {
2049 	struct spdk_io_channel *io_ch[2];
2050 	bool done_reset = false, done_unregister = false;
2051 	int rc;
2052 
2053 	setup_test();
2054 	set_thread(0);
2055 
2056 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
2057 	SPDK_CU_ASSERT_FATAL(io_ch[0] != NULL);
2058 
2059 	set_thread(1);
2060 
2061 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
2062 	SPDK_CU_ASSERT_FATAL(io_ch[1] != NULL);
2063 
2064 	set_thread(0);
2065 
2066 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
2067 
2068 	rc = spdk_bdev_reset(g_desc, io_ch[0], reset_done, &done_reset);
2069 	CU_ASSERT(rc == 0);
2070 
2071 	set_thread(0);
2072 
2073 	poll_thread_times(0, 1);
2074 
2075 	spdk_bdev_close(g_desc);
2076 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done_unregister);
2077 
2078 	CU_ASSERT(done_reset == false);
2079 	CU_ASSERT(done_unregister == false);
2080 
2081 	poll_threads();
2082 
2083 	stub_complete_io(g_bdev.io_target, 0);
2084 
2085 	poll_threads();
2086 
2087 	CU_ASSERT(done_reset == true);
2088 	CU_ASSERT(done_unregister == false);
2089 
2090 	spdk_put_io_channel(io_ch[0]);
2091 
2092 	set_thread(1);
2093 
2094 	spdk_put_io_channel(io_ch[1]);
2095 
2096 	poll_threads();
2097 
2098 	CU_ASSERT(done_unregister == true);
2099 
2100 	/* Restore the original g_bdev so that we can use teardown_test(). */
2101 	set_thread(0);
2102 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
2103 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
2104 	teardown_test();
2105 }
2106 
2107 int
2108 main(int argc, char **argv)
2109 {
2110 	CU_pSuite	suite = NULL;
2111 	unsigned int	num_failures;
2112 
2113 	CU_set_error_action(CUEA_ABORT);
2114 	CU_initialize_registry();
2115 
2116 	suite = CU_add_suite("bdev", NULL, NULL);
2117 
2118 	CU_ADD_TEST(suite, basic);
2119 	CU_ADD_TEST(suite, unregister_and_close);
2120 	CU_ADD_TEST(suite, basic_qos);
2121 	CU_ADD_TEST(suite, put_channel_during_reset);
2122 	CU_ADD_TEST(suite, aborted_reset);
2123 	CU_ADD_TEST(suite, io_during_reset);
2124 	CU_ADD_TEST(suite, io_during_qos_queue);
2125 	CU_ADD_TEST(suite, io_during_qos_reset);
2126 	CU_ADD_TEST(suite, enomem);
2127 	CU_ADD_TEST(suite, enomem_multi_bdev);
2128 	CU_ADD_TEST(suite, enomem_multi_bdev_unregister);
2129 	CU_ADD_TEST(suite, enomem_multi_io_target);
2130 	CU_ADD_TEST(suite, qos_dynamic_enable);
2131 	CU_ADD_TEST(suite, bdev_histograms_mt);
2132 	CU_ADD_TEST(suite, bdev_set_io_timeout_mt);
2133 	CU_ADD_TEST(suite, lock_lba_range_then_submit_io);
2134 	CU_ADD_TEST(suite, unregister_during_reset);
2135 
2136 	CU_basic_set_mode(CU_BRM_VERBOSE);
2137 	CU_basic_run_tests();
2138 	num_failures = CU_get_number_of_failures();
2139 	CU_cleanup_registry();
2140 	return num_failures;
2141 }
2142