xref: /spdk/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c (revision 03e3fc4f5835983a4e6602b4e770922e798ce263)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_cunit.h"
35 
36 #include "common/lib/ut_multithread.c"
37 #include "unit/lib/json_mock.c"
38 
39 #include "spdk/config.h"
40 /* HACK: disable VTune integration so the unit test doesn't need VTune headers and libs to build */
41 #undef SPDK_CONFIG_VTUNE
42 
43 #include "bdev/bdev.c"
44 
45 #define BDEV_UT_NUM_THREADS 3
46 
47 DEFINE_STUB(spdk_conf_find_section, struct spdk_conf_section *, (struct spdk_conf *cp,
48 		const char *name), NULL);
49 DEFINE_STUB(spdk_conf_section_get_nmval, char *,
50 	    (struct spdk_conf_section *sp, const char *key, int idx1, int idx2), NULL);
51 DEFINE_STUB(spdk_conf_section_get_intval, int, (struct spdk_conf_section *sp, const char *key), -1);
52 
53 struct spdk_trace_histories *g_trace_histories;
54 DEFINE_STUB_V(spdk_trace_add_register_fn, (struct spdk_trace_register_fn *reg_fn));
55 DEFINE_STUB_V(spdk_trace_register_owner, (uint8_t type, char id_prefix));
56 DEFINE_STUB_V(spdk_trace_register_object, (uint8_t type, char id_prefix));
57 DEFINE_STUB_V(spdk_trace_register_description, (const char *name,
58 		uint16_t tpoint_id, uint8_t owner_type,
59 		uint8_t object_type, uint8_t new_object,
60 		uint8_t arg1_type, const char *arg1_name));
61 DEFINE_STUB_V(_spdk_trace_record, (uint64_t tsc, uint16_t tpoint_id, uint16_t poller_id,
62 				   uint32_t size, uint64_t object_id, uint64_t arg1));
63 DEFINE_STUB(spdk_notify_send, uint64_t, (const char *type, const char *ctx), 0);
64 DEFINE_STUB(spdk_notify_type_register, struct spdk_notify_type *, (const char *type), NULL);
65 
66 struct ut_bdev {
67 	struct spdk_bdev	bdev;
68 	void			*io_target;
69 };
70 
71 struct ut_bdev_channel {
72 	TAILQ_HEAD(, spdk_bdev_io)	outstanding_io;
73 	uint32_t			outstanding_cnt;
74 	uint32_t			avail_cnt;
75 };
76 
77 int g_io_device;
78 struct ut_bdev g_bdev;
79 struct spdk_bdev_desc *g_desc;
80 bool g_teardown_done = false;
81 bool g_get_io_channel = true;
82 bool g_create_ch = true;
83 bool g_init_complete_called = false;
84 bool g_fini_start_called = true;
85 int g_status = 0;
86 int g_count = 0;
87 struct spdk_histogram_data *g_histogram = NULL;
88 
89 static int
90 stub_create_ch(void *io_device, void *ctx_buf)
91 {
92 	struct ut_bdev_channel *ch = ctx_buf;
93 
94 	if (g_create_ch == false) {
95 		return -1;
96 	}
97 
98 	TAILQ_INIT(&ch->outstanding_io);
99 	ch->outstanding_cnt = 0;
100 	/*
101 	 * When avail gets to 0, the submit_request function will return ENOMEM.
102 	 *  Most tests to not want ENOMEM to occur, so by default set this to a
103 	 *  big value that won't get hit.  The ENOMEM tests can then override this
104 	 *  value to something much smaller to induce ENOMEM conditions.
105 	 */
106 	ch->avail_cnt = 2048;
107 	return 0;
108 }
109 
110 static void
111 stub_destroy_ch(void *io_device, void *ctx_buf)
112 {
113 }
114 
115 static struct spdk_io_channel *
116 stub_get_io_channel(void *ctx)
117 {
118 	struct ut_bdev *ut_bdev = ctx;
119 
120 	if (g_get_io_channel == true) {
121 		return spdk_get_io_channel(ut_bdev->io_target);
122 	} else {
123 		return NULL;
124 	}
125 }
126 
127 static int
128 stub_destruct(void *ctx)
129 {
130 	return 0;
131 }
132 
133 static void
134 stub_submit_request(struct spdk_io_channel *_ch, struct spdk_bdev_io *bdev_io)
135 {
136 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
137 	struct spdk_bdev_io *io;
138 
139 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
140 		while (!TAILQ_EMPTY(&ch->outstanding_io)) {
141 			io = TAILQ_FIRST(&ch->outstanding_io);
142 			TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
143 			ch->outstanding_cnt--;
144 			spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
145 			ch->avail_cnt++;
146 		}
147 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) {
148 		TAILQ_FOREACH(io, &ch->outstanding_io, module_link) {
149 			if (io == bdev_io->u.abort.bio_to_abort) {
150 				TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
151 				ch->outstanding_cnt--;
152 				spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
153 				ch->avail_cnt++;
154 
155 				spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
156 				return;
157 			}
158 		}
159 
160 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
161 		return;
162 	}
163 
164 	if (ch->avail_cnt > 0) {
165 		TAILQ_INSERT_TAIL(&ch->outstanding_io, bdev_io, module_link);
166 		ch->outstanding_cnt++;
167 		ch->avail_cnt--;
168 	} else {
169 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
170 	}
171 }
172 
173 static uint32_t
174 stub_complete_io(void *io_target, uint32_t num_to_complete)
175 {
176 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
177 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
178 	struct spdk_bdev_io *io;
179 	bool complete_all = (num_to_complete == 0);
180 	uint32_t num_completed = 0;
181 
182 	while (complete_all || num_completed < num_to_complete) {
183 		if (TAILQ_EMPTY(&ch->outstanding_io)) {
184 			break;
185 		}
186 		io = TAILQ_FIRST(&ch->outstanding_io);
187 		TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
188 		ch->outstanding_cnt--;
189 		spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_SUCCESS);
190 		ch->avail_cnt++;
191 		num_completed++;
192 	}
193 
194 	spdk_put_io_channel(_ch);
195 	return num_completed;
196 }
197 
198 static bool
199 stub_io_type_supported(void *ctx, enum spdk_bdev_io_type type)
200 {
201 	return true;
202 }
203 
204 static struct spdk_bdev_fn_table fn_table = {
205 	.get_io_channel =	stub_get_io_channel,
206 	.destruct =		stub_destruct,
207 	.submit_request =	stub_submit_request,
208 	.io_type_supported =	stub_io_type_supported,
209 };
210 
211 struct spdk_bdev_module bdev_ut_if;
212 
213 static int
214 module_init(void)
215 {
216 	spdk_bdev_module_init_done(&bdev_ut_if);
217 	return 0;
218 }
219 
220 static void
221 module_fini(void)
222 {
223 }
224 
225 static void
226 init_complete(void)
227 {
228 	g_init_complete_called = true;
229 }
230 
231 static void
232 fini_start(void)
233 {
234 	g_fini_start_called = true;
235 }
236 
237 struct spdk_bdev_module bdev_ut_if = {
238 	.name = "bdev_ut",
239 	.module_init = module_init,
240 	.module_fini = module_fini,
241 	.async_init = true,
242 	.init_complete = init_complete,
243 	.fini_start = fini_start,
244 };
245 
246 SPDK_BDEV_MODULE_REGISTER(bdev_ut, &bdev_ut_if)
247 
248 static void
249 register_bdev(struct ut_bdev *ut_bdev, char *name, void *io_target)
250 {
251 	memset(ut_bdev, 0, sizeof(*ut_bdev));
252 
253 	ut_bdev->io_target = io_target;
254 	ut_bdev->bdev.ctxt = ut_bdev;
255 	ut_bdev->bdev.name = name;
256 	ut_bdev->bdev.fn_table = &fn_table;
257 	ut_bdev->bdev.module = &bdev_ut_if;
258 	ut_bdev->bdev.blocklen = 4096;
259 	ut_bdev->bdev.blockcnt = 1024;
260 
261 	spdk_bdev_register(&ut_bdev->bdev);
262 }
263 
264 static void
265 unregister_bdev(struct ut_bdev *ut_bdev)
266 {
267 	/* Handle any deferred messages. */
268 	poll_threads();
269 	spdk_bdev_unregister(&ut_bdev->bdev, NULL, NULL);
270 }
271 
272 static void
273 bdev_init_cb(void *done, int rc)
274 {
275 	CU_ASSERT(rc == 0);
276 	*(bool *)done = true;
277 }
278 
279 static void
280 setup_test(void)
281 {
282 	bool done = false;
283 
284 	allocate_cores(BDEV_UT_NUM_THREADS);
285 	allocate_threads(BDEV_UT_NUM_THREADS);
286 	set_thread(0);
287 	spdk_bdev_initialize(bdev_init_cb, &done);
288 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
289 				sizeof(struct ut_bdev_channel), NULL);
290 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
291 	spdk_bdev_open(&g_bdev.bdev, true, NULL, NULL, &g_desc);
292 }
293 
294 static void
295 finish_cb(void *cb_arg)
296 {
297 	g_teardown_done = true;
298 }
299 
300 static void
301 teardown_test(void)
302 {
303 	set_thread(0);
304 	g_teardown_done = false;
305 	spdk_bdev_close(g_desc);
306 	g_desc = NULL;
307 	unregister_bdev(&g_bdev);
308 	spdk_io_device_unregister(&g_io_device, NULL);
309 	spdk_bdev_finish(finish_cb, NULL);
310 	poll_threads();
311 	memset(&g_bdev, 0, sizeof(g_bdev));
312 	CU_ASSERT(g_teardown_done == true);
313 	g_teardown_done = false;
314 	free_threads();
315 	free_cores();
316 }
317 
318 static uint32_t
319 bdev_io_tailq_cnt(bdev_io_tailq_t *tailq)
320 {
321 	struct spdk_bdev_io *io;
322 	uint32_t cnt = 0;
323 
324 	TAILQ_FOREACH(io, tailq, internal.link) {
325 		cnt++;
326 	}
327 
328 	return cnt;
329 }
330 
331 static void
332 basic(void)
333 {
334 	g_init_complete_called = false;
335 	setup_test();
336 	CU_ASSERT(g_init_complete_called == true);
337 
338 	set_thread(0);
339 
340 	g_get_io_channel = false;
341 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
342 	CU_ASSERT(g_ut_threads[0].ch == NULL);
343 
344 	g_get_io_channel = true;
345 	g_create_ch = false;
346 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
347 	CU_ASSERT(g_ut_threads[0].ch == NULL);
348 
349 	g_get_io_channel = true;
350 	g_create_ch = true;
351 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
352 	CU_ASSERT(g_ut_threads[0].ch != NULL);
353 	spdk_put_io_channel(g_ut_threads[0].ch);
354 
355 	g_fini_start_called = false;
356 	teardown_test();
357 	CU_ASSERT(g_fini_start_called == true);
358 }
359 
360 static void
361 _bdev_removed(void *done)
362 {
363 	*(bool *)done = true;
364 }
365 
366 static void
367 _bdev_unregistered(void *done, int rc)
368 {
369 	CU_ASSERT(rc == 0);
370 	*(bool *)done = true;
371 }
372 
373 static void
374 unregister_and_close(void)
375 {
376 	bool done, remove_notify;
377 	struct spdk_bdev_desc *desc = NULL;
378 
379 	setup_test();
380 	set_thread(0);
381 
382 	/* setup_test() automatically opens the bdev,
383 	 * but this test needs to do that in a different
384 	 * way. */
385 	spdk_bdev_close(g_desc);
386 	poll_threads();
387 
388 	/* Try hotremoving a bdev with descriptors which don't provide
389 	 * the notification callback */
390 	spdk_bdev_open(&g_bdev.bdev, true, NULL, NULL, &desc);
391 	SPDK_CU_ASSERT_FATAL(desc != NULL);
392 
393 	/* There is an open descriptor on the device. Unregister it,
394 	 * which can't proceed until the descriptor is closed. */
395 	done = false;
396 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
397 
398 	/* Poll the threads to allow all events to be processed */
399 	poll_threads();
400 
401 	/* Make sure the bdev was not unregistered. We still have a
402 	 * descriptor open */
403 	CU_ASSERT(done == false);
404 
405 	spdk_bdev_close(desc);
406 	poll_threads();
407 	desc = NULL;
408 
409 	/* The unregister should have completed */
410 	CU_ASSERT(done == true);
411 
412 
413 	/* Register the bdev again */
414 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
415 
416 	remove_notify = false;
417 	spdk_bdev_open(&g_bdev.bdev, true, _bdev_removed, &remove_notify, &desc);
418 	SPDK_CU_ASSERT_FATAL(desc != NULL);
419 	CU_ASSERT(remove_notify == false);
420 
421 	/* There is an open descriptor on the device. Unregister it,
422 	 * which can't proceed until the descriptor is closed. */
423 	done = false;
424 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
425 	/* No polling has occurred, so neither of these should execute */
426 	CU_ASSERT(remove_notify == false);
427 	CU_ASSERT(done == false);
428 
429 	/* Prior to the unregister completing, close the descriptor */
430 	spdk_bdev_close(desc);
431 
432 	/* Poll the threads to allow all events to be processed */
433 	poll_threads();
434 
435 	/* Remove notify should not have been called because the
436 	 * descriptor is already closed. */
437 	CU_ASSERT(remove_notify == false);
438 
439 	/* The unregister should have completed */
440 	CU_ASSERT(done == true);
441 
442 	/* Restore the original g_bdev so that we can use teardown_test(). */
443 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
444 	spdk_bdev_open(&g_bdev.bdev, true, NULL, NULL, &g_desc);
445 	teardown_test();
446 }
447 
448 static void
449 reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
450 {
451 	bool *done = cb_arg;
452 
453 	CU_ASSERT(success == true);
454 	*done = true;
455 	spdk_bdev_free_io(bdev_io);
456 }
457 
458 static void
459 put_channel_during_reset(void)
460 {
461 	struct spdk_io_channel *io_ch;
462 	bool done = false;
463 
464 	setup_test();
465 
466 	set_thread(0);
467 	io_ch = spdk_bdev_get_io_channel(g_desc);
468 	CU_ASSERT(io_ch != NULL);
469 
470 	/*
471 	 * Start a reset, but then put the I/O channel before
472 	 *  the deferred messages for the reset get a chance to
473 	 *  execute.
474 	 */
475 	spdk_bdev_reset(g_desc, io_ch, reset_done, &done);
476 	spdk_put_io_channel(io_ch);
477 	poll_threads();
478 	stub_complete_io(g_bdev.io_target, 0);
479 
480 	teardown_test();
481 }
482 
483 static void
484 aborted_reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
485 {
486 	enum spdk_bdev_io_status *status = cb_arg;
487 
488 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
489 	spdk_bdev_free_io(bdev_io);
490 }
491 
492 static void
493 aborted_reset(void)
494 {
495 	struct spdk_io_channel *io_ch[2];
496 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
497 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
498 
499 	setup_test();
500 
501 	set_thread(0);
502 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
503 	CU_ASSERT(io_ch[0] != NULL);
504 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
505 	poll_threads();
506 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
507 
508 	/*
509 	 * First reset has been submitted on ch0.  Now submit a second
510 	 *  reset on ch1 which will get queued since there is already a
511 	 *  reset in progress.
512 	 */
513 	set_thread(1);
514 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
515 	CU_ASSERT(io_ch[1] != NULL);
516 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
517 	poll_threads();
518 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
519 
520 	/*
521 	 * Now destroy ch1.  This will abort the queued reset.  Check that
522 	 *  the second reset was completed with failed status.  Also check
523 	 *  that bdev->internal.reset_in_progress != NULL, since the
524 	 *  original reset has not been completed yet.  This ensures that
525 	 *  the bdev code is correctly noticing that the failed reset is
526 	 *  *not* the one that had been submitted to the bdev module.
527 	 */
528 	set_thread(1);
529 	spdk_put_io_channel(io_ch[1]);
530 	poll_threads();
531 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_FAILED);
532 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
533 
534 	/*
535 	 * Now complete the first reset, verify that it completed with SUCCESS
536 	 *  status and that bdev->internal.reset_in_progress is also set back to NULL.
537 	 */
538 	set_thread(0);
539 	spdk_put_io_channel(io_ch[0]);
540 	stub_complete_io(g_bdev.io_target, 0);
541 	poll_threads();
542 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
543 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
544 
545 	teardown_test();
546 }
547 
548 static void
549 io_during_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
550 {
551 	enum spdk_bdev_io_status *status = cb_arg;
552 
553 	*status = bdev_io->internal.status;
554 	spdk_bdev_free_io(bdev_io);
555 }
556 
557 static void
558 io_during_reset(void)
559 {
560 	struct spdk_io_channel *io_ch[2];
561 	struct spdk_bdev_channel *bdev_ch[2];
562 	enum spdk_bdev_io_status status0, status1, status_reset;
563 	int rc;
564 
565 	setup_test();
566 
567 	/*
568 	 * First test normal case - submit an I/O on each of two channels (with no resets)
569 	 *  and verify they complete successfully.
570 	 */
571 	set_thread(0);
572 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
573 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
574 	CU_ASSERT(bdev_ch[0]->flags == 0);
575 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
576 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
577 	CU_ASSERT(rc == 0);
578 
579 	set_thread(1);
580 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
581 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
582 	CU_ASSERT(bdev_ch[1]->flags == 0);
583 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
584 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
585 	CU_ASSERT(rc == 0);
586 
587 	poll_threads();
588 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
589 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
590 
591 	set_thread(0);
592 	stub_complete_io(g_bdev.io_target, 0);
593 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
594 
595 	set_thread(1);
596 	stub_complete_io(g_bdev.io_target, 0);
597 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
598 
599 	/*
600 	 * Now submit a reset, and leave it pending while we submit I/O on two different
601 	 *  channels.  These I/O should be failed by the bdev layer since the reset is in
602 	 *  progress.
603 	 */
604 	set_thread(0);
605 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
606 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &status_reset);
607 	CU_ASSERT(rc == 0);
608 
609 	CU_ASSERT(bdev_ch[0]->flags == 0);
610 	CU_ASSERT(bdev_ch[1]->flags == 0);
611 	poll_threads();
612 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_RESET_IN_PROGRESS);
613 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_RESET_IN_PROGRESS);
614 
615 	set_thread(0);
616 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
617 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
618 	CU_ASSERT(rc == 0);
619 
620 	set_thread(1);
621 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
622 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
623 	CU_ASSERT(rc == 0);
624 
625 	/*
626 	 * A reset is in progress so these read I/O should complete with aborted.  Note that we
627 	 *  need to poll_threads() since I/O completed inline have their completion deferred.
628 	 */
629 	poll_threads();
630 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
631 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
632 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
633 
634 	/*
635 	 * Complete the reset
636 	 */
637 	set_thread(0);
638 	stub_complete_io(g_bdev.io_target, 0);
639 
640 	/*
641 	 * Only poll thread 0. We should not get a completion.
642 	 */
643 	poll_thread(0);
644 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
645 
646 	/*
647 	 * Poll both thread 0 and 1 so the messages can propagate and we
648 	 * get a completion.
649 	 */
650 	poll_threads();
651 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
652 
653 	spdk_put_io_channel(io_ch[0]);
654 	set_thread(1);
655 	spdk_put_io_channel(io_ch[1]);
656 	poll_threads();
657 
658 	teardown_test();
659 }
660 
661 static void
662 basic_qos(void)
663 {
664 	struct spdk_io_channel *io_ch[2];
665 	struct spdk_bdev_channel *bdev_ch[2];
666 	struct spdk_bdev *bdev;
667 	enum spdk_bdev_io_status status, abort_status;
668 	int rc;
669 
670 	setup_test();
671 
672 	/* Enable QoS */
673 	bdev = &g_bdev.bdev;
674 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
675 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
676 	TAILQ_INIT(&bdev->internal.qos->queued);
677 	/*
678 	 * Enable read/write IOPS, read only byte per second and
679 	 * read/write byte per second rate limits.
680 	 * In this case, all rate limits will take equal effect.
681 	 */
682 	/* 2000 read/write I/O per second, or 2 per millisecond */
683 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
684 	/* 8K read/write byte per millisecond with 4K block size */
685 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
686 	/* 8K read only byte per millisecond with 4K block size */
687 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 8192000;
688 
689 	g_get_io_channel = true;
690 
691 	set_thread(0);
692 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
693 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
694 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
695 
696 	set_thread(1);
697 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
698 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
699 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
700 
701 	/*
702 	 * Send an I/O on thread 0, which is where the QoS thread is running.
703 	 */
704 	set_thread(0);
705 	status = SPDK_BDEV_IO_STATUS_PENDING;
706 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
707 	CU_ASSERT(rc == 0);
708 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
709 	poll_threads();
710 	stub_complete_io(g_bdev.io_target, 0);
711 	poll_threads();
712 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
713 
714 	/* Send an I/O on thread 1. The QoS thread is not running here. */
715 	status = SPDK_BDEV_IO_STATUS_PENDING;
716 	set_thread(1);
717 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
718 	CU_ASSERT(rc == 0);
719 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
720 	poll_threads();
721 	/* Complete I/O on thread 1. This should not complete the I/O we submitted */
722 	stub_complete_io(g_bdev.io_target, 0);
723 	poll_threads();
724 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
725 	/* Now complete I/O on thread 0 */
726 	set_thread(0);
727 	poll_threads();
728 	stub_complete_io(g_bdev.io_target, 0);
729 	poll_threads();
730 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
731 
732 	/* Reset rate limit for the next test cases. */
733 	spdk_delay_us(SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
734 	poll_threads();
735 
736 	/*
737 	 * Test abort request when QoS is enabled.
738 	 */
739 
740 	/* Send an I/O on thread 0, which is where the QoS thread is running. */
741 	set_thread(0);
742 	status = SPDK_BDEV_IO_STATUS_PENDING;
743 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
744 	CU_ASSERT(rc == 0);
745 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
746 	/* Send an abort to the I/O on the same thread. */
747 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
748 	rc = spdk_bdev_abort(g_desc, io_ch[0], &status, io_during_io_done, &abort_status);
749 	CU_ASSERT(rc == 0);
750 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
751 	poll_threads();
752 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
753 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
754 
755 	/* Send an I/O on thread 1. The QoS thread is not running here. */
756 	status = SPDK_BDEV_IO_STATUS_PENDING;
757 	set_thread(1);
758 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
759 	CU_ASSERT(rc == 0);
760 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
761 	poll_threads();
762 	/* Send an abort to the I/O on the same thread. */
763 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
764 	rc = spdk_bdev_abort(g_desc, io_ch[1], &status, io_during_io_done, &abort_status);
765 	CU_ASSERT(rc == 0);
766 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
767 	poll_threads();
768 	/* Complete the I/O with failure and the abort with success on thread 1. */
769 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
770 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
771 
772 	set_thread(0);
773 
774 	/*
775 	 * Close the descriptor only, which should stop the qos channel as
776 	 * the last descriptor removed.
777 	 */
778 	spdk_bdev_close(g_desc);
779 	poll_threads();
780 	CU_ASSERT(bdev->internal.qos->ch == NULL);
781 
782 	/*
783 	 * Open the bdev again which shall setup the qos channel as the
784 	 * channels are valid.
785 	 */
786 	spdk_bdev_open(bdev, true, NULL, NULL, &g_desc);
787 	poll_threads();
788 	CU_ASSERT(bdev->internal.qos->ch != NULL);
789 
790 	/* Tear down the channels */
791 	set_thread(0);
792 	spdk_put_io_channel(io_ch[0]);
793 	set_thread(1);
794 	spdk_put_io_channel(io_ch[1]);
795 	poll_threads();
796 	set_thread(0);
797 
798 	/* Close the descriptor, which should stop the qos channel */
799 	spdk_bdev_close(g_desc);
800 	poll_threads();
801 	CU_ASSERT(bdev->internal.qos->ch == NULL);
802 
803 	/* Open the bdev again, no qos channel setup without valid channels. */
804 	spdk_bdev_open(bdev, true, NULL, NULL, &g_desc);
805 	poll_threads();
806 	CU_ASSERT(bdev->internal.qos->ch == NULL);
807 
808 	/* Create the channels in reverse order. */
809 	set_thread(1);
810 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
811 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
812 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
813 
814 	set_thread(0);
815 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
816 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
817 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
818 
819 	/* Confirm that the qos thread is now thread 1 */
820 	CU_ASSERT(bdev->internal.qos->ch == bdev_ch[1]);
821 
822 	/* Tear down the channels */
823 	set_thread(0);
824 	spdk_put_io_channel(io_ch[0]);
825 	set_thread(1);
826 	spdk_put_io_channel(io_ch[1]);
827 	poll_threads();
828 
829 	set_thread(0);
830 
831 	teardown_test();
832 }
833 
834 static void
835 io_during_qos_queue(void)
836 {
837 	struct spdk_io_channel *io_ch[2];
838 	struct spdk_bdev_channel *bdev_ch[2];
839 	struct spdk_bdev *bdev;
840 	enum spdk_bdev_io_status status0, status1, status2;
841 	int rc;
842 
843 	setup_test();
844 	MOCK_SET(spdk_get_ticks, 0);
845 
846 	/* Enable QoS */
847 	bdev = &g_bdev.bdev;
848 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
849 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
850 	TAILQ_INIT(&bdev->internal.qos->queued);
851 	/*
852 	 * Enable read/write IOPS, read only byte per sec, write only
853 	 * byte per sec and read/write byte per sec rate limits.
854 	 * In this case, both read only and write only byte per sec
855 	 * rate limit will take effect.
856 	 */
857 	/* 4000 read/write I/O per second, or 4 per millisecond */
858 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 4000;
859 	/* 8K byte per millisecond with 4K block size */
860 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
861 	/* 4K byte per millisecond with 4K block size */
862 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 4096000;
863 	/* 4K byte per millisecond with 4K block size */
864 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 4096000;
865 
866 	g_get_io_channel = true;
867 
868 	/* Create channels */
869 	set_thread(0);
870 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
871 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
872 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
873 
874 	set_thread(1);
875 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
876 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
877 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
878 
879 	/* Send two read I/Os */
880 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
881 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
882 	CU_ASSERT(rc == 0);
883 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
884 	set_thread(0);
885 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
886 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
887 	CU_ASSERT(rc == 0);
888 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
889 	/* Send one write I/O */
890 	status2 = SPDK_BDEV_IO_STATUS_PENDING;
891 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status2);
892 	CU_ASSERT(rc == 0);
893 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_PENDING);
894 
895 	/* Complete any I/O that arrived at the disk */
896 	poll_threads();
897 	set_thread(1);
898 	stub_complete_io(g_bdev.io_target, 0);
899 	set_thread(0);
900 	stub_complete_io(g_bdev.io_target, 0);
901 	poll_threads();
902 
903 	/* Only one of the two read I/Os should complete. (logical XOR) */
904 	if (status0 == SPDK_BDEV_IO_STATUS_SUCCESS) {
905 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
906 	} else {
907 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
908 	}
909 	/* The write I/O should complete. */
910 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
911 
912 	/* Advance in time by a millisecond */
913 	spdk_delay_us(1000);
914 
915 	/* Complete more I/O */
916 	poll_threads();
917 	set_thread(1);
918 	stub_complete_io(g_bdev.io_target, 0);
919 	set_thread(0);
920 	stub_complete_io(g_bdev.io_target, 0);
921 	poll_threads();
922 
923 	/* Now the second read I/O should be done */
924 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
925 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
926 
927 	/* Tear down the channels */
928 	set_thread(1);
929 	spdk_put_io_channel(io_ch[1]);
930 	set_thread(0);
931 	spdk_put_io_channel(io_ch[0]);
932 	poll_threads();
933 
934 	teardown_test();
935 }
936 
937 static void
938 io_during_qos_reset(void)
939 {
940 	struct spdk_io_channel *io_ch[2];
941 	struct spdk_bdev_channel *bdev_ch[2];
942 	struct spdk_bdev *bdev;
943 	enum spdk_bdev_io_status status0, status1, reset_status;
944 	int rc;
945 
946 	setup_test();
947 	MOCK_SET(spdk_get_ticks, 0);
948 
949 	/* Enable QoS */
950 	bdev = &g_bdev.bdev;
951 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
952 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
953 	TAILQ_INIT(&bdev->internal.qos->queued);
954 	/*
955 	 * Enable read/write IOPS, write only byte per sec and
956 	 * read/write byte per second rate limits.
957 	 * In this case, read/write byte per second rate limit will
958 	 * take effect first.
959 	 */
960 	/* 2000 read/write I/O per second, or 2 per millisecond */
961 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
962 	/* 4K byte per millisecond with 4K block size */
963 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 4096000;
964 	/* 8K byte per millisecond with 4K block size */
965 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 8192000;
966 
967 	g_get_io_channel = true;
968 
969 	/* Create channels */
970 	set_thread(0);
971 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
972 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
973 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
974 
975 	set_thread(1);
976 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
977 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
978 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
979 
980 	/* Send two I/O. One of these gets queued by QoS. The other is sitting at the disk. */
981 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
982 	rc = spdk_bdev_write_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
983 	CU_ASSERT(rc == 0);
984 	set_thread(0);
985 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
986 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
987 	CU_ASSERT(rc == 0);
988 
989 	poll_threads();
990 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
991 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
992 
993 	/* Reset the bdev. */
994 	reset_status = SPDK_BDEV_IO_STATUS_PENDING;
995 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &reset_status);
996 	CU_ASSERT(rc == 0);
997 
998 	/* Complete any I/O that arrived at the disk */
999 	poll_threads();
1000 	set_thread(1);
1001 	stub_complete_io(g_bdev.io_target, 0);
1002 	set_thread(0);
1003 	stub_complete_io(g_bdev.io_target, 0);
1004 	poll_threads();
1005 
1006 	CU_ASSERT(reset_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1007 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
1008 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
1009 
1010 	/* Tear down the channels */
1011 	set_thread(1);
1012 	spdk_put_io_channel(io_ch[1]);
1013 	set_thread(0);
1014 	spdk_put_io_channel(io_ch[0]);
1015 	poll_threads();
1016 
1017 	teardown_test();
1018 }
1019 
1020 static void
1021 enomem_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1022 {
1023 	enum spdk_bdev_io_status *status = cb_arg;
1024 
1025 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1026 	spdk_bdev_free_io(bdev_io);
1027 }
1028 
1029 static void
1030 enomem(void)
1031 {
1032 	struct spdk_io_channel *io_ch;
1033 	struct spdk_bdev_channel *bdev_ch;
1034 	struct spdk_bdev_shared_resource *shared_resource;
1035 	struct ut_bdev_channel *ut_ch;
1036 	const uint32_t IO_ARRAY_SIZE = 64;
1037 	const uint32_t AVAIL = 20;
1038 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE], status_reset;
1039 	uint32_t nomem_cnt, i;
1040 	struct spdk_bdev_io *first_io;
1041 	int rc;
1042 
1043 	setup_test();
1044 
1045 	set_thread(0);
1046 	io_ch = spdk_bdev_get_io_channel(g_desc);
1047 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1048 	shared_resource = bdev_ch->shared_resource;
1049 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1050 	ut_ch->avail_cnt = AVAIL;
1051 
1052 	/* First submit a number of IOs equal to what the channel can support. */
1053 	for (i = 0; i < AVAIL; i++) {
1054 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1055 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1056 		CU_ASSERT(rc == 0);
1057 	}
1058 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1059 
1060 	/*
1061 	 * Next, submit one additional I/O.  This one should fail with ENOMEM and then go onto
1062 	 *  the enomem_io list.
1063 	 */
1064 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1065 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1066 	CU_ASSERT(rc == 0);
1067 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1068 	first_io = TAILQ_FIRST(&shared_resource->nomem_io);
1069 
1070 	/*
1071 	 * Now submit a bunch more I/O.  These should all fail with ENOMEM and get queued behind
1072 	 *  the first_io above.
1073 	 */
1074 	for (i = AVAIL + 1; i < IO_ARRAY_SIZE; i++) {
1075 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1076 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1077 		CU_ASSERT(rc == 0);
1078 	}
1079 
1080 	/* Assert that first_io is still at the head of the list. */
1081 	CU_ASSERT(TAILQ_FIRST(&shared_resource->nomem_io) == first_io);
1082 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == (IO_ARRAY_SIZE - AVAIL));
1083 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1084 	CU_ASSERT(shared_resource->nomem_threshold == (AVAIL - NOMEM_THRESHOLD_COUNT));
1085 
1086 	/*
1087 	 * Complete 1 I/O only.  The key check here is bdev_io_tailq_cnt - this should not have
1088 	 *  changed since completing just 1 I/O should not trigger retrying the queued nomem_io
1089 	 *  list.
1090 	 */
1091 	stub_complete_io(g_bdev.io_target, 1);
1092 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1093 
1094 	/*
1095 	 * Complete enough I/O to hit the nomem_theshold.  This should trigger retrying nomem_io,
1096 	 *  and we should see I/O get resubmitted to the test bdev module.
1097 	 */
1098 	stub_complete_io(g_bdev.io_target, NOMEM_THRESHOLD_COUNT - 1);
1099 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) < nomem_cnt);
1100 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1101 
1102 	/* Complete 1 I/O only.  This should not trigger retrying the queued nomem_io. */
1103 	stub_complete_io(g_bdev.io_target, 1);
1104 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1105 
1106 	/*
1107 	 * Send a reset and confirm that all I/O are completed, including the ones that
1108 	 *  were queued on the nomem_io list.
1109 	 */
1110 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
1111 	rc = spdk_bdev_reset(g_desc, io_ch, enomem_done, &status_reset);
1112 	poll_threads();
1113 	CU_ASSERT(rc == 0);
1114 	/* This will complete the reset. */
1115 	stub_complete_io(g_bdev.io_target, 0);
1116 
1117 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == 0);
1118 	CU_ASSERT(shared_resource->io_outstanding == 0);
1119 
1120 	spdk_put_io_channel(io_ch);
1121 	poll_threads();
1122 	teardown_test();
1123 }
1124 
1125 static void
1126 enomem_multi_bdev(void)
1127 {
1128 	struct spdk_io_channel *io_ch;
1129 	struct spdk_bdev_channel *bdev_ch;
1130 	struct spdk_bdev_shared_resource *shared_resource;
1131 	struct ut_bdev_channel *ut_ch;
1132 	const uint32_t IO_ARRAY_SIZE = 64;
1133 	const uint32_t AVAIL = 20;
1134 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1135 	uint32_t i;
1136 	struct ut_bdev *second_bdev;
1137 	struct spdk_bdev_desc *second_desc = NULL;
1138 	struct spdk_bdev_channel *second_bdev_ch;
1139 	struct spdk_io_channel *second_ch;
1140 	int rc;
1141 
1142 	setup_test();
1143 
1144 	/* Register second bdev with the same io_target  */
1145 	second_bdev = calloc(1, sizeof(*second_bdev));
1146 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1147 	register_bdev(second_bdev, "ut_bdev2", g_bdev.io_target);
1148 	spdk_bdev_open(&second_bdev->bdev, true, NULL, NULL, &second_desc);
1149 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1150 
1151 	set_thread(0);
1152 	io_ch = spdk_bdev_get_io_channel(g_desc);
1153 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1154 	shared_resource = bdev_ch->shared_resource;
1155 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1156 	ut_ch->avail_cnt = AVAIL;
1157 
1158 	second_ch = spdk_bdev_get_io_channel(second_desc);
1159 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1160 	SPDK_CU_ASSERT_FATAL(shared_resource == second_bdev_ch->shared_resource);
1161 
1162 	/* Saturate io_target through bdev A. */
1163 	for (i = 0; i < AVAIL; i++) {
1164 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1165 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1166 		CU_ASSERT(rc == 0);
1167 	}
1168 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1169 
1170 	/*
1171 	 * Now submit I/O through the second bdev. This should fail with ENOMEM
1172 	 * and then go onto the nomem_io list.
1173 	 */
1174 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1175 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1176 	CU_ASSERT(rc == 0);
1177 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1178 
1179 	/* Complete first bdev's I/O. This should retry sending second bdev's nomem_io */
1180 	stub_complete_io(g_bdev.io_target, AVAIL);
1181 
1182 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1183 	CU_ASSERT(shared_resource->io_outstanding == 1);
1184 
1185 	/* Now complete our retried I/O  */
1186 	stub_complete_io(g_bdev.io_target, 1);
1187 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1188 
1189 	spdk_put_io_channel(io_ch);
1190 	spdk_put_io_channel(second_ch);
1191 	spdk_bdev_close(second_desc);
1192 	unregister_bdev(second_bdev);
1193 	poll_threads();
1194 	free(second_bdev);
1195 	teardown_test();
1196 }
1197 
1198 
1199 static void
1200 enomem_multi_io_target(void)
1201 {
1202 	struct spdk_io_channel *io_ch;
1203 	struct spdk_bdev_channel *bdev_ch;
1204 	struct ut_bdev_channel *ut_ch;
1205 	const uint32_t IO_ARRAY_SIZE = 64;
1206 	const uint32_t AVAIL = 20;
1207 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1208 	uint32_t i;
1209 	int new_io_device;
1210 	struct ut_bdev *second_bdev;
1211 	struct spdk_bdev_desc *second_desc = NULL;
1212 	struct spdk_bdev_channel *second_bdev_ch;
1213 	struct spdk_io_channel *second_ch;
1214 	int rc;
1215 
1216 	setup_test();
1217 
1218 	/* Create new io_target and a second bdev using it */
1219 	spdk_io_device_register(&new_io_device, stub_create_ch, stub_destroy_ch,
1220 				sizeof(struct ut_bdev_channel), NULL);
1221 	second_bdev = calloc(1, sizeof(*second_bdev));
1222 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1223 	register_bdev(second_bdev, "ut_bdev2", &new_io_device);
1224 	spdk_bdev_open(&second_bdev->bdev, true, NULL, NULL, &second_desc);
1225 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1226 
1227 	set_thread(0);
1228 	io_ch = spdk_bdev_get_io_channel(g_desc);
1229 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1230 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1231 	ut_ch->avail_cnt = AVAIL;
1232 
1233 	/* Different io_target should imply a different shared_resource */
1234 	second_ch = spdk_bdev_get_io_channel(second_desc);
1235 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1236 	SPDK_CU_ASSERT_FATAL(bdev_ch->shared_resource != second_bdev_ch->shared_resource);
1237 
1238 	/* Saturate io_target through bdev A. */
1239 	for (i = 0; i < AVAIL; i++) {
1240 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1241 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1242 		CU_ASSERT(rc == 0);
1243 	}
1244 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1245 
1246 	/* Issue one more I/O to fill ENOMEM list. */
1247 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1248 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1249 	CU_ASSERT(rc == 0);
1250 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1251 
1252 	/*
1253 	 * Now submit I/O through the second bdev. This should go through and complete
1254 	 * successfully because we're using a different io_device underneath.
1255 	 */
1256 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1257 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1258 	CU_ASSERT(rc == 0);
1259 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&second_bdev_ch->shared_resource->nomem_io));
1260 	stub_complete_io(second_bdev->io_target, 1);
1261 
1262 	/* Cleanup; Complete outstanding I/O. */
1263 	stub_complete_io(g_bdev.io_target, AVAIL);
1264 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1265 	/* Complete the ENOMEM I/O */
1266 	stub_complete_io(g_bdev.io_target, 1);
1267 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1268 
1269 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1270 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1271 	spdk_put_io_channel(io_ch);
1272 	spdk_put_io_channel(second_ch);
1273 	spdk_bdev_close(second_desc);
1274 	unregister_bdev(second_bdev);
1275 	spdk_io_device_unregister(&new_io_device, NULL);
1276 	poll_threads();
1277 	free(second_bdev);
1278 	teardown_test();
1279 }
1280 
1281 static void
1282 qos_dynamic_enable_done(void *cb_arg, int status)
1283 {
1284 	int *rc = cb_arg;
1285 	*rc = status;
1286 }
1287 
1288 static void
1289 qos_dynamic_enable(void)
1290 {
1291 	struct spdk_io_channel *io_ch[2];
1292 	struct spdk_bdev_channel *bdev_ch[2];
1293 	struct spdk_bdev *bdev;
1294 	enum spdk_bdev_io_status bdev_io_status[2];
1295 	uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
1296 	int status, second_status, rc, i;
1297 
1298 	setup_test();
1299 	MOCK_SET(spdk_get_ticks, 0);
1300 
1301 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1302 		limits[i] = UINT64_MAX;
1303 	}
1304 
1305 	bdev = &g_bdev.bdev;
1306 
1307 	g_get_io_channel = true;
1308 
1309 	/* Create channels */
1310 	set_thread(0);
1311 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1312 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1313 	CU_ASSERT(bdev_ch[0]->flags == 0);
1314 
1315 	set_thread(1);
1316 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1317 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1318 	CU_ASSERT(bdev_ch[1]->flags == 0);
1319 
1320 	set_thread(0);
1321 
1322 	/*
1323 	 * Enable QoS: Read/Write IOPS, Read/Write byte,
1324 	 * Read only byte and Write only byte per second
1325 	 * rate limits.
1326 	 * More than 10 I/Os allowed per timeslice.
1327 	 */
1328 	status = -1;
1329 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1330 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 100;
1331 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 100;
1332 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 10;
1333 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1334 	poll_threads();
1335 	CU_ASSERT(status == 0);
1336 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1337 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1338 
1339 	/*
1340 	 * Submit and complete 10 I/O to fill the QoS allotment for this timeslice.
1341 	 * Additional I/O will then be queued.
1342 	 */
1343 	set_thread(0);
1344 	for (i = 0; i < 10; i++) {
1345 		bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1346 		rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1347 		CU_ASSERT(rc == 0);
1348 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1349 		poll_thread(0);
1350 		stub_complete_io(g_bdev.io_target, 0);
1351 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1352 	}
1353 
1354 	/*
1355 	 * Send two more I/O.  These I/O will be queued since the current timeslice allotment has been
1356 	 * filled already.  We want to test that when QoS is disabled that these two I/O:
1357 	 *  1) are not aborted
1358 	 *  2) are sent back to their original thread for resubmission
1359 	 */
1360 	bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1361 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1362 	CU_ASSERT(rc == 0);
1363 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1364 	set_thread(1);
1365 	bdev_io_status[1] = SPDK_BDEV_IO_STATUS_PENDING;
1366 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &bdev_io_status[1]);
1367 	CU_ASSERT(rc == 0);
1368 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1369 	poll_threads();
1370 
1371 	/*
1372 	 * Disable QoS: Read/Write IOPS, Read/Write byte,
1373 	 * Read only byte rate limits
1374 	 */
1375 	status = -1;
1376 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1377 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 0;
1378 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 0;
1379 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1380 	poll_threads();
1381 	CU_ASSERT(status == 0);
1382 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1383 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1384 
1385 	/* Disable QoS: Write only Byte per second rate limit */
1386 	status = -1;
1387 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 0;
1388 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1389 	poll_threads();
1390 	CU_ASSERT(status == 0);
1391 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1392 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1393 
1394 	/*
1395 	 * All I/O should have been resubmitted back on their original thread.  Complete
1396 	 *  all I/O on thread 0, and ensure that only the thread 0 I/O was completed.
1397 	 */
1398 	set_thread(0);
1399 	stub_complete_io(g_bdev.io_target, 0);
1400 	poll_threads();
1401 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1402 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1403 
1404 	/* Now complete all I/O on thread 1 and ensure the thread 1 I/O was completed. */
1405 	set_thread(1);
1406 	stub_complete_io(g_bdev.io_target, 0);
1407 	poll_threads();
1408 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_SUCCESS);
1409 
1410 	/* Disable QoS again */
1411 	status = -1;
1412 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1413 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1414 	poll_threads();
1415 	CU_ASSERT(status == 0); /* This should succeed */
1416 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1417 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1418 
1419 	/* Enable QoS on thread 0 */
1420 	status = -1;
1421 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1422 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1423 	poll_threads();
1424 	CU_ASSERT(status == 0);
1425 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1426 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1427 
1428 	/* Disable QoS on thread 1 */
1429 	set_thread(1);
1430 	status = -1;
1431 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1432 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1433 	/* Don't poll yet. This should leave the channels with QoS enabled */
1434 	CU_ASSERT(status == -1);
1435 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1436 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1437 
1438 	/* Enable QoS. This should immediately fail because the previous disable QoS hasn't completed. */
1439 	second_status = 0;
1440 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 10;
1441 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &second_status);
1442 	poll_threads();
1443 	CU_ASSERT(status == 0); /* The disable should succeed */
1444 	CU_ASSERT(second_status < 0); /* The enable should fail */
1445 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1446 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1447 
1448 	/* Enable QoS on thread 1. This should succeed now that the disable has completed. */
1449 	status = -1;
1450 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1451 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1452 	poll_threads();
1453 	CU_ASSERT(status == 0);
1454 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1455 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1456 
1457 	/* Tear down the channels */
1458 	set_thread(0);
1459 	spdk_put_io_channel(io_ch[0]);
1460 	set_thread(1);
1461 	spdk_put_io_channel(io_ch[1]);
1462 	poll_threads();
1463 
1464 	set_thread(0);
1465 	teardown_test();
1466 }
1467 
1468 static void
1469 histogram_status_cb(void *cb_arg, int status)
1470 {
1471 	g_status = status;
1472 }
1473 
1474 static void
1475 histogram_data_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
1476 {
1477 	g_status = status;
1478 	g_histogram = histogram;
1479 }
1480 
1481 static void
1482 histogram_io_count(void *ctx, uint64_t start, uint64_t end, uint64_t count,
1483 		   uint64_t total, uint64_t so_far)
1484 {
1485 	g_count += count;
1486 }
1487 
1488 static void
1489 bdev_histograms_mt(void)
1490 {
1491 	struct spdk_io_channel *ch[2];
1492 	struct spdk_histogram_data *histogram;
1493 	uint8_t buf[4096];
1494 	int status = false;
1495 	int rc;
1496 
1497 
1498 	setup_test();
1499 
1500 	set_thread(0);
1501 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1502 	CU_ASSERT(ch[0] != NULL);
1503 
1504 	set_thread(1);
1505 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1506 	CU_ASSERT(ch[1] != NULL);
1507 
1508 
1509 	/* Enable histogram */
1510 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, true);
1511 	poll_threads();
1512 	CU_ASSERT(g_status == 0);
1513 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1514 
1515 	/* Allocate histogram */
1516 	histogram = spdk_histogram_data_alloc();
1517 
1518 	/* Check if histogram is zeroed */
1519 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1520 	poll_threads();
1521 	CU_ASSERT(g_status == 0);
1522 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1523 
1524 	g_count = 0;
1525 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1526 
1527 	CU_ASSERT(g_count == 0);
1528 
1529 	set_thread(0);
1530 	rc = spdk_bdev_write_blocks(g_desc, ch[0], &buf, 0, 1, io_during_io_done, &status);
1531 	CU_ASSERT(rc == 0);
1532 
1533 	spdk_delay_us(10);
1534 	stub_complete_io(g_bdev.io_target, 1);
1535 	poll_threads();
1536 	CU_ASSERT(status == true);
1537 
1538 
1539 	set_thread(1);
1540 	rc = spdk_bdev_read_blocks(g_desc, ch[1], &buf, 0, 1, io_during_io_done, &status);
1541 	CU_ASSERT(rc == 0);
1542 
1543 	spdk_delay_us(10);
1544 	stub_complete_io(g_bdev.io_target, 1);
1545 	poll_threads();
1546 	CU_ASSERT(status == true);
1547 
1548 	set_thread(0);
1549 
1550 	/* Check if histogram gathered data from all I/O channels */
1551 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1552 	poll_threads();
1553 	CU_ASSERT(g_status == 0);
1554 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1555 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1556 
1557 	g_count = 0;
1558 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1559 	CU_ASSERT(g_count == 2);
1560 
1561 	/* Disable histogram */
1562 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, false);
1563 	poll_threads();
1564 	CU_ASSERT(g_status == 0);
1565 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == false);
1566 
1567 	spdk_histogram_data_free(histogram);
1568 
1569 	/* Tear down the channels */
1570 	set_thread(0);
1571 	spdk_put_io_channel(ch[0]);
1572 	set_thread(1);
1573 	spdk_put_io_channel(ch[1]);
1574 	poll_threads();
1575 	set_thread(0);
1576 	teardown_test();
1577 
1578 }
1579 
1580 struct timeout_io_cb_arg {
1581 	struct iovec iov;
1582 	uint8_t type;
1583 };
1584 
1585 static int
1586 bdev_channel_count_submitted_io(struct spdk_bdev_channel *ch)
1587 {
1588 	struct spdk_bdev_io *bdev_io;
1589 	int n = 0;
1590 
1591 	if (!ch) {
1592 		return -1;
1593 	}
1594 
1595 	TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) {
1596 		n++;
1597 	}
1598 
1599 	return n;
1600 }
1601 
1602 static void
1603 bdev_channel_io_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1604 {
1605 	struct timeout_io_cb_arg *ctx = cb_arg;
1606 
1607 	ctx->type = bdev_io->type;
1608 	ctx->iov.iov_base = bdev_io->iov.iov_base;
1609 	ctx->iov.iov_len = bdev_io->iov.iov_len;
1610 }
1611 
1612 static bool g_io_done;
1613 
1614 static void
1615 io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1616 {
1617 	g_io_done = true;
1618 	spdk_bdev_free_io(bdev_io);
1619 }
1620 
1621 static void
1622 bdev_set_io_timeout_mt(void)
1623 {
1624 	struct spdk_io_channel *ch[3];
1625 	struct spdk_bdev_channel *bdev_ch[3];
1626 	struct timeout_io_cb_arg cb_arg;
1627 
1628 	setup_test();
1629 
1630 	g_bdev.bdev.optimal_io_boundary = 16;
1631 	g_bdev.bdev.split_on_optimal_io_boundary = true;
1632 
1633 	set_thread(0);
1634 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1635 	CU_ASSERT(ch[0] != NULL);
1636 
1637 	set_thread(1);
1638 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1639 	CU_ASSERT(ch[1] != NULL);
1640 
1641 	set_thread(2);
1642 	ch[2] = spdk_bdev_get_io_channel(g_desc);
1643 	CU_ASSERT(ch[2] != NULL);
1644 
1645 	/* Multi-thread mode
1646 	 * 1, Check the poller was registered successfully
1647 	 * 2, Check the timeout IO and ensure the IO was the submitted by user
1648 	 * 3, Check the link int the bdev_ch works right.
1649 	 * 4, Close desc and put io channel during the timeout poller is polling
1650 	 */
1651 
1652 	/* In desc thread set the timeout */
1653 	set_thread(0);
1654 	CU_ASSERT(spdk_bdev_set_timeout(g_desc, 5, bdev_channel_io_timeout_cb, &cb_arg) == 0);
1655 	CU_ASSERT(g_desc->io_timeout_poller != NULL);
1656 	CU_ASSERT(g_desc->cb_fn == bdev_channel_io_timeout_cb);
1657 	CU_ASSERT(g_desc->cb_arg == &cb_arg);
1658 
1659 	/* check the IO submitted list and timeout handler */
1660 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x2000, 0, 1, io_done, NULL) == 0);
1661 	bdev_ch[0] = spdk_io_channel_get_ctx(ch[0]);
1662 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 1);
1663 
1664 	set_thread(1);
1665 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1666 	bdev_ch[1] = spdk_io_channel_get_ctx(ch[1]);
1667 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 1);
1668 
1669 	/* Now test that a single-vector command is split correctly.
1670 	 * Offset 14, length 8, payload 0xF000
1671 	 *  Child - Offset 14, length 2, payload 0xF000
1672 	 *  Child - Offset 16, length 6, payload 0xF000 + 2 * 512
1673 	 *
1674 	 * Set up the expected values before calling spdk_bdev_read_blocks
1675 	 */
1676 	set_thread(2);
1677 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0xF000, 14, 8, io_done, NULL) == 0);
1678 	bdev_ch[2] = spdk_io_channel_get_ctx(ch[2]);
1679 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 3);
1680 
1681 	set_thread(0);
1682 	memset(&cb_arg, 0, sizeof(cb_arg));
1683 	spdk_delay_us(3 * spdk_get_ticks_hz());
1684 	poll_threads();
1685 	CU_ASSERT(cb_arg.type == 0);
1686 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1687 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1688 
1689 	/* Now the time reach the limit */
1690 	spdk_delay_us(3 * spdk_get_ticks_hz());
1691 	poll_thread(0);
1692 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1693 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
1694 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1695 	stub_complete_io(g_bdev.io_target, 1);
1696 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 0);
1697 
1698 	memset(&cb_arg, 0, sizeof(cb_arg));
1699 	set_thread(1);
1700 	poll_thread(1);
1701 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
1702 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x1000);
1703 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1704 	stub_complete_io(g_bdev.io_target, 1);
1705 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 0);
1706 
1707 	memset(&cb_arg, 0, sizeof(cb_arg));
1708 	set_thread(2);
1709 	poll_thread(2);
1710 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1711 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0xF000);
1712 	CU_ASSERT(cb_arg.iov.iov_len == 8 * g_bdev.bdev.blocklen);
1713 	stub_complete_io(g_bdev.io_target, 1);
1714 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 2);
1715 	stub_complete_io(g_bdev.io_target, 1);
1716 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 0);
1717 
1718 	/* Run poll_timeout_done() it means complete the timeout poller */
1719 	set_thread(0);
1720 	poll_thread(0);
1721 	CU_ASSERT(g_desc->refs == 0);
1722 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1723 	set_thread(1);
1724 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x2000, 0, 2, io_done, NULL) == 0);
1725 	set_thread(2);
1726 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0x3000, 0, 3, io_done, NULL) == 0);
1727 
1728 	/* Trigger timeout poller to run again, desc->refs is incremented.
1729 	 * In thread 0 we destroy the io channel before timeout poller runs.
1730 	 * Timeout callback is not called on thread 0.
1731 	 */
1732 	spdk_delay_us(6 * spdk_get_ticks_hz());
1733 	memset(&cb_arg, 0, sizeof(cb_arg));
1734 	set_thread(0);
1735 	stub_complete_io(g_bdev.io_target, 1);
1736 	spdk_put_io_channel(ch[0]);
1737 	poll_thread(0);
1738 	CU_ASSERT(g_desc->refs == 1)
1739 	CU_ASSERT(cb_arg.type == 0);
1740 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1741 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1742 
1743 	/* In thread 1 timeout poller runs then we destroy the io channel
1744 	 * Timeout callback is called on thread 1.
1745 	 */
1746 	memset(&cb_arg, 0, sizeof(cb_arg));
1747 	set_thread(1);
1748 	poll_thread(1);
1749 	stub_complete_io(g_bdev.io_target, 1);
1750 	spdk_put_io_channel(ch[1]);
1751 	poll_thread(1);
1752 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
1753 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
1754 	CU_ASSERT(cb_arg.iov.iov_len == 2 * g_bdev.bdev.blocklen);
1755 
1756 	/* Close the desc.
1757 	 * Unregister the timeout poller first.
1758 	 * Then decrement desc->refs but it's not zero yet so desc is not freed.
1759 	 */
1760 	set_thread(0);
1761 	spdk_bdev_close(g_desc);
1762 	CU_ASSERT(g_desc->refs == 1);
1763 	CU_ASSERT(g_desc->io_timeout_poller == NULL);
1764 
1765 	/* Timeout poller runs on thread 2 then we destroy the io channel.
1766 	 * Desc is closed so we would exit the timeout poller directly.
1767 	 * timeout callback is not called on thread 2.
1768 	 */
1769 	memset(&cb_arg, 0, sizeof(cb_arg));
1770 	set_thread(2);
1771 	poll_thread(2);
1772 	stub_complete_io(g_bdev.io_target, 1);
1773 	spdk_put_io_channel(ch[2]);
1774 	poll_thread(2);
1775 	CU_ASSERT(cb_arg.type == 0);
1776 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1777 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1778 
1779 	set_thread(0);
1780 	poll_thread(0);
1781 	g_teardown_done = false;
1782 	unregister_bdev(&g_bdev);
1783 	spdk_io_device_unregister(&g_io_device, NULL);
1784 	spdk_bdev_finish(finish_cb, NULL);
1785 	poll_threads();
1786 	memset(&g_bdev, 0, sizeof(g_bdev));
1787 	CU_ASSERT(g_teardown_done == true);
1788 	g_teardown_done = false;
1789 	free_threads();
1790 	free_cores();
1791 }
1792 
1793 static bool g_io_done2;
1794 static bool g_lock_lba_range_done;
1795 static bool g_unlock_lba_range_done;
1796 
1797 static void
1798 io_done2(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1799 {
1800 	g_io_done2 = true;
1801 	spdk_bdev_free_io(bdev_io);
1802 }
1803 
1804 static void
1805 lock_lba_range_done(void *ctx, int status)
1806 {
1807 	g_lock_lba_range_done = true;
1808 }
1809 
1810 static void
1811 unlock_lba_range_done(void *ctx, int status)
1812 {
1813 	g_unlock_lba_range_done = true;
1814 }
1815 
1816 static uint32_t
1817 stub_channel_outstanding_cnt(void *io_target)
1818 {
1819 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
1820 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1821 	uint32_t outstanding_cnt;
1822 
1823 	outstanding_cnt = ch->outstanding_cnt;
1824 
1825 	spdk_put_io_channel(_ch);
1826 	return outstanding_cnt;
1827 }
1828 
1829 static void
1830 lock_lba_range_then_submit_io(void)
1831 {
1832 	struct spdk_bdev_desc *desc = NULL;
1833 	void *io_target;
1834 	struct spdk_io_channel *io_ch[3];
1835 	struct spdk_bdev_channel *bdev_ch[3];
1836 	struct lba_range *range;
1837 	char buf[4096];
1838 	int ctx0, ctx1, ctx2;
1839 	int rc;
1840 
1841 	setup_test();
1842 
1843 	io_target = g_bdev.io_target;
1844 	desc = g_desc;
1845 
1846 	set_thread(0);
1847 	io_ch[0] = spdk_bdev_get_io_channel(desc);
1848 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1849 	CU_ASSERT(io_ch[0] != NULL);
1850 
1851 	set_thread(1);
1852 	io_ch[1] = spdk_bdev_get_io_channel(desc);
1853 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1854 	CU_ASSERT(io_ch[1] != NULL);
1855 
1856 	set_thread(0);
1857 	g_lock_lba_range_done = false;
1858 	rc = bdev_lock_lba_range(desc, io_ch[0], 20, 10, lock_lba_range_done, &ctx0);
1859 	CU_ASSERT(rc == 0);
1860 	poll_threads();
1861 
1862 	/* The lock should immediately become valid, since there are no outstanding
1863 	 * write I/O.
1864 	 */
1865 	CU_ASSERT(g_lock_lba_range_done == true);
1866 	range = TAILQ_FIRST(&bdev_ch[0]->locked_ranges);
1867 	SPDK_CU_ASSERT_FATAL(range != NULL);
1868 	CU_ASSERT(range->offset == 20);
1869 	CU_ASSERT(range->length == 10);
1870 	CU_ASSERT(range->owner_ch == bdev_ch[0]);
1871 
1872 	g_io_done = false;
1873 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1874 	rc = spdk_bdev_read_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
1875 	CU_ASSERT(rc == 0);
1876 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1877 
1878 	stub_complete_io(io_target, 1);
1879 	poll_threads();
1880 	CU_ASSERT(g_io_done == true);
1881 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1882 
1883 	/* Try a write I/O.  This should actually be allowed to execute, since the channel
1884 	 * holding the lock is submitting the write I/O.
1885 	 */
1886 	g_io_done = false;
1887 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1888 	rc = spdk_bdev_write_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
1889 	CU_ASSERT(rc == 0);
1890 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1891 
1892 	stub_complete_io(io_target, 1);
1893 	poll_threads();
1894 	CU_ASSERT(g_io_done == true);
1895 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1896 
1897 	/* Try a write I/O.  This should get queued in the io_locked tailq. */
1898 	set_thread(1);
1899 	g_io_done = false;
1900 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1901 	rc = spdk_bdev_write_blocks(desc, io_ch[1], buf, 20, 1, io_done, &ctx1);
1902 	CU_ASSERT(rc == 0);
1903 	poll_threads();
1904 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
1905 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1906 	CU_ASSERT(g_io_done == false);
1907 
1908 	/* Try to unlock the lba range using thread 1's io_ch.  This should fail. */
1909 	rc = bdev_unlock_lba_range(desc, io_ch[1], 20, 10, unlock_lba_range_done, &ctx1);
1910 	CU_ASSERT(rc == -EINVAL);
1911 
1912 	/* Now create a new channel and submit a write I/O with it.  This should also be queued.
1913 	 * The new channel should inherit the active locks from the bdev's internal list.
1914 	 */
1915 	set_thread(2);
1916 	io_ch[2] = spdk_bdev_get_io_channel(desc);
1917 	bdev_ch[2] = spdk_io_channel_get_ctx(io_ch[2]);
1918 	CU_ASSERT(io_ch[2] != NULL);
1919 
1920 	g_io_done2 = false;
1921 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1922 	rc = spdk_bdev_write_blocks(desc, io_ch[2], buf, 22, 2, io_done2, &ctx2);
1923 	CU_ASSERT(rc == 0);
1924 	poll_threads();
1925 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
1926 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1927 	CU_ASSERT(g_io_done2 == false);
1928 
1929 	set_thread(0);
1930 	rc = bdev_unlock_lba_range(desc, io_ch[0], 20, 10, unlock_lba_range_done, &ctx0);
1931 	CU_ASSERT(rc == 0);
1932 	poll_threads();
1933 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->locked_ranges));
1934 
1935 	/* The LBA range is unlocked, so the write IOs should now have started execution. */
1936 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1937 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1938 
1939 	set_thread(1);
1940 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1941 	stub_complete_io(io_target, 1);
1942 	set_thread(2);
1943 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1944 	stub_complete_io(io_target, 1);
1945 
1946 	poll_threads();
1947 	CU_ASSERT(g_io_done == true);
1948 	CU_ASSERT(g_io_done2 == true);
1949 
1950 	/* Tear down the channels */
1951 	set_thread(0);
1952 	spdk_put_io_channel(io_ch[0]);
1953 	set_thread(1);
1954 	spdk_put_io_channel(io_ch[1]);
1955 	set_thread(2);
1956 	spdk_put_io_channel(io_ch[2]);
1957 	poll_threads();
1958 	set_thread(0);
1959 	teardown_test();
1960 }
1961 
1962 int
1963 main(int argc, char **argv)
1964 {
1965 	CU_pSuite	suite = NULL;
1966 	unsigned int	num_failures;
1967 
1968 	CU_set_error_action(CUEA_ABORT);
1969 	CU_initialize_registry();
1970 
1971 	suite = CU_add_suite("bdev", NULL, NULL);
1972 
1973 	CU_ADD_TEST(suite, basic);
1974 	CU_ADD_TEST(suite, unregister_and_close);
1975 	CU_ADD_TEST(suite, basic_qos);
1976 	CU_ADD_TEST(suite, put_channel_during_reset);
1977 	CU_ADD_TEST(suite, aborted_reset);
1978 	CU_ADD_TEST(suite, io_during_reset);
1979 	CU_ADD_TEST(suite, io_during_qos_queue);
1980 	CU_ADD_TEST(suite, io_during_qos_reset);
1981 	CU_ADD_TEST(suite, enomem);
1982 	CU_ADD_TEST(suite, enomem_multi_bdev);
1983 	CU_ADD_TEST(suite, enomem_multi_io_target);
1984 	CU_ADD_TEST(suite, qos_dynamic_enable);
1985 	CU_ADD_TEST(suite, bdev_histograms_mt);
1986 	CU_ADD_TEST(suite, bdev_set_io_timeout_mt);
1987 	CU_ADD_TEST(suite, lock_lba_range_then_submit_io);
1988 
1989 	CU_basic_set_mode(CU_BRM_VERBOSE);
1990 	CU_basic_run_tests();
1991 	num_failures = CU_get_number_of_failures();
1992 	CU_cleanup_registry();
1993 	return num_failures;
1994 }
1995