xref: /spdk/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c (revision fc3e40618c36027f4e30d6f46e8cf2a55654390e)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_cunit.h"
35 
36 #include "common/lib/ut_multithread.c"
37 #include "unit/lib/json_mock.c"
38 
39 #include "spdk/config.h"
40 /* HACK: disable VTune integration so the unit test doesn't need VTune headers and libs to build */
41 #undef SPDK_CONFIG_VTUNE
42 
43 #include "bdev/bdev.c"
44 
45 #define BDEV_UT_NUM_THREADS 3
46 
47 DEFINE_STUB(spdk_conf_find_section, struct spdk_conf_section *, (struct spdk_conf *cp,
48 		const char *name), NULL);
49 DEFINE_STUB(spdk_conf_section_get_nmval, char *,
50 	    (struct spdk_conf_section *sp, const char *key, int idx1, int idx2), NULL);
51 DEFINE_STUB(spdk_conf_section_get_intval, int, (struct spdk_conf_section *sp, const char *key), -1);
52 
53 struct spdk_trace_histories *g_trace_histories;
54 DEFINE_STUB_V(spdk_trace_add_register_fn, (struct spdk_trace_register_fn *reg_fn));
55 DEFINE_STUB_V(spdk_trace_register_owner, (uint8_t type, char id_prefix));
56 DEFINE_STUB_V(spdk_trace_register_object, (uint8_t type, char id_prefix));
57 DEFINE_STUB_V(spdk_trace_register_description, (const char *name,
58 		uint16_t tpoint_id, uint8_t owner_type,
59 		uint8_t object_type, uint8_t new_object,
60 		uint8_t arg1_type, const char *arg1_name));
61 DEFINE_STUB_V(_spdk_trace_record, (uint64_t tsc, uint16_t tpoint_id, uint16_t poller_id,
62 				   uint32_t size, uint64_t object_id, uint64_t arg1));
63 DEFINE_STUB(spdk_notify_send, uint64_t, (const char *type, const char *ctx), 0);
64 DEFINE_STUB(spdk_notify_type_register, struct spdk_notify_type *, (const char *type), NULL);
65 
66 struct ut_bdev {
67 	struct spdk_bdev	bdev;
68 	void			*io_target;
69 };
70 
71 struct ut_bdev_channel {
72 	TAILQ_HEAD(, spdk_bdev_io)	outstanding_io;
73 	uint32_t			outstanding_cnt;
74 	uint32_t			avail_cnt;
75 };
76 
77 int g_io_device;
78 struct ut_bdev g_bdev;
79 struct spdk_bdev_desc *g_desc;
80 bool g_teardown_done = false;
81 bool g_get_io_channel = true;
82 bool g_create_ch = true;
83 bool g_init_complete_called = false;
84 bool g_fini_start_called = true;
85 int g_status = 0;
86 int g_count = 0;
87 struct spdk_histogram_data *g_histogram = NULL;
88 
89 static int
90 stub_create_ch(void *io_device, void *ctx_buf)
91 {
92 	struct ut_bdev_channel *ch = ctx_buf;
93 
94 	if (g_create_ch == false) {
95 		return -1;
96 	}
97 
98 	TAILQ_INIT(&ch->outstanding_io);
99 	ch->outstanding_cnt = 0;
100 	/*
101 	 * When avail gets to 0, the submit_request function will return ENOMEM.
102 	 *  Most tests to not want ENOMEM to occur, so by default set this to a
103 	 *  big value that won't get hit.  The ENOMEM tests can then override this
104 	 *  value to something much smaller to induce ENOMEM conditions.
105 	 */
106 	ch->avail_cnt = 2048;
107 	return 0;
108 }
109 
110 static void
111 stub_destroy_ch(void *io_device, void *ctx_buf)
112 {
113 }
114 
115 static struct spdk_io_channel *
116 stub_get_io_channel(void *ctx)
117 {
118 	struct ut_bdev *ut_bdev = ctx;
119 
120 	if (g_get_io_channel == true) {
121 		return spdk_get_io_channel(ut_bdev->io_target);
122 	} else {
123 		return NULL;
124 	}
125 }
126 
127 static int
128 stub_destruct(void *ctx)
129 {
130 	return 0;
131 }
132 
133 static void
134 stub_submit_request(struct spdk_io_channel *_ch, struct spdk_bdev_io *bdev_io)
135 {
136 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
137 	struct spdk_bdev_io *io;
138 
139 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
140 		while (!TAILQ_EMPTY(&ch->outstanding_io)) {
141 			io = TAILQ_FIRST(&ch->outstanding_io);
142 			TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
143 			ch->outstanding_cnt--;
144 			spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_FAILED);
145 			ch->avail_cnt++;
146 		}
147 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) {
148 		TAILQ_FOREACH(io, &ch->outstanding_io, module_link) {
149 			if (io == bdev_io->u.abort.bio_to_abort) {
150 				TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
151 				ch->outstanding_cnt--;
152 				spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
153 				ch->avail_cnt++;
154 
155 				spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
156 				return;
157 			}
158 		}
159 
160 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
161 		return;
162 	}
163 
164 	if (ch->avail_cnt > 0) {
165 		TAILQ_INSERT_TAIL(&ch->outstanding_io, bdev_io, module_link);
166 		ch->outstanding_cnt++;
167 		ch->avail_cnt--;
168 	} else {
169 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
170 	}
171 }
172 
173 static uint32_t
174 stub_complete_io(void *io_target, uint32_t num_to_complete)
175 {
176 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
177 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
178 	struct spdk_bdev_io *io;
179 	bool complete_all = (num_to_complete == 0);
180 	uint32_t num_completed = 0;
181 
182 	while (complete_all || num_completed < num_to_complete) {
183 		if (TAILQ_EMPTY(&ch->outstanding_io)) {
184 			break;
185 		}
186 		io = TAILQ_FIRST(&ch->outstanding_io);
187 		TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
188 		ch->outstanding_cnt--;
189 		spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_SUCCESS);
190 		ch->avail_cnt++;
191 		num_completed++;
192 	}
193 
194 	spdk_put_io_channel(_ch);
195 	return num_completed;
196 }
197 
198 static bool
199 stub_io_type_supported(void *ctx, enum spdk_bdev_io_type type)
200 {
201 	return true;
202 }
203 
204 static struct spdk_bdev_fn_table fn_table = {
205 	.get_io_channel =	stub_get_io_channel,
206 	.destruct =		stub_destruct,
207 	.submit_request =	stub_submit_request,
208 	.io_type_supported =	stub_io_type_supported,
209 };
210 
211 struct spdk_bdev_module bdev_ut_if;
212 
213 static int
214 module_init(void)
215 {
216 	spdk_bdev_module_init_done(&bdev_ut_if);
217 	return 0;
218 }
219 
220 static void
221 module_fini(void)
222 {
223 }
224 
225 static void
226 init_complete(void)
227 {
228 	g_init_complete_called = true;
229 }
230 
231 static void
232 fini_start(void)
233 {
234 	g_fini_start_called = true;
235 }
236 
237 struct spdk_bdev_module bdev_ut_if = {
238 	.name = "bdev_ut",
239 	.module_init = module_init,
240 	.module_fini = module_fini,
241 	.async_init = true,
242 	.init_complete = init_complete,
243 	.fini_start = fini_start,
244 };
245 
246 SPDK_BDEV_MODULE_REGISTER(bdev_ut, &bdev_ut_if)
247 
248 static void
249 register_bdev(struct ut_bdev *ut_bdev, char *name, void *io_target)
250 {
251 	memset(ut_bdev, 0, sizeof(*ut_bdev));
252 
253 	ut_bdev->io_target = io_target;
254 	ut_bdev->bdev.ctxt = ut_bdev;
255 	ut_bdev->bdev.name = name;
256 	ut_bdev->bdev.fn_table = &fn_table;
257 	ut_bdev->bdev.module = &bdev_ut_if;
258 	ut_bdev->bdev.blocklen = 4096;
259 	ut_bdev->bdev.blockcnt = 1024;
260 
261 	spdk_bdev_register(&ut_bdev->bdev);
262 }
263 
264 static void
265 unregister_bdev(struct ut_bdev *ut_bdev)
266 {
267 	/* Handle any deferred messages. */
268 	poll_threads();
269 	spdk_bdev_unregister(&ut_bdev->bdev, NULL, NULL);
270 }
271 
272 static void
273 bdev_init_cb(void *done, int rc)
274 {
275 	CU_ASSERT(rc == 0);
276 	*(bool *)done = true;
277 }
278 
279 static void
280 setup_test(void)
281 {
282 	bool done = false;
283 
284 	allocate_threads(BDEV_UT_NUM_THREADS);
285 	set_thread(0);
286 	spdk_bdev_initialize(bdev_init_cb, &done);
287 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
288 				sizeof(struct ut_bdev_channel), NULL);
289 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
290 	spdk_bdev_open(&g_bdev.bdev, true, NULL, NULL, &g_desc);
291 }
292 
293 static void
294 finish_cb(void *cb_arg)
295 {
296 	g_teardown_done = true;
297 }
298 
299 static void
300 teardown_test(void)
301 {
302 	set_thread(0);
303 	g_teardown_done = false;
304 	spdk_bdev_close(g_desc);
305 	g_desc = NULL;
306 	unregister_bdev(&g_bdev);
307 	spdk_io_device_unregister(&g_io_device, NULL);
308 	spdk_bdev_finish(finish_cb, NULL);
309 	poll_threads();
310 	memset(&g_bdev, 0, sizeof(g_bdev));
311 	CU_ASSERT(g_teardown_done == true);
312 	g_teardown_done = false;
313 	free_threads();
314 }
315 
316 static uint32_t
317 bdev_io_tailq_cnt(bdev_io_tailq_t *tailq)
318 {
319 	struct spdk_bdev_io *io;
320 	uint32_t cnt = 0;
321 
322 	TAILQ_FOREACH(io, tailq, internal.link) {
323 		cnt++;
324 	}
325 
326 	return cnt;
327 }
328 
329 static void
330 basic(void)
331 {
332 	g_init_complete_called = false;
333 	setup_test();
334 	CU_ASSERT(g_init_complete_called == true);
335 
336 	set_thread(0);
337 
338 	g_get_io_channel = false;
339 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
340 	CU_ASSERT(g_ut_threads[0].ch == NULL);
341 
342 	g_get_io_channel = true;
343 	g_create_ch = false;
344 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
345 	CU_ASSERT(g_ut_threads[0].ch == NULL);
346 
347 	g_get_io_channel = true;
348 	g_create_ch = true;
349 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
350 	CU_ASSERT(g_ut_threads[0].ch != NULL);
351 	spdk_put_io_channel(g_ut_threads[0].ch);
352 
353 	g_fini_start_called = false;
354 	teardown_test();
355 	CU_ASSERT(g_fini_start_called == true);
356 }
357 
358 static void
359 _bdev_removed(void *done)
360 {
361 	*(bool *)done = true;
362 }
363 
364 static void
365 _bdev_unregistered(void *done, int rc)
366 {
367 	CU_ASSERT(rc == 0);
368 	*(bool *)done = true;
369 }
370 
371 static void
372 unregister_and_close(void)
373 {
374 	bool done, remove_notify;
375 	struct spdk_bdev_desc *desc = NULL;
376 
377 	setup_test();
378 	set_thread(0);
379 
380 	/* setup_test() automatically opens the bdev,
381 	 * but this test needs to do that in a different
382 	 * way. */
383 	spdk_bdev_close(g_desc);
384 	poll_threads();
385 
386 	/* Try hotremoving a bdev with descriptors which don't provide
387 	 * the notification callback */
388 	spdk_bdev_open(&g_bdev.bdev, true, NULL, NULL, &desc);
389 	SPDK_CU_ASSERT_FATAL(desc != NULL);
390 
391 	/* There is an open descriptor on the device. Unregister it,
392 	 * which can't proceed until the descriptor is closed. */
393 	done = false;
394 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
395 
396 	/* Poll the threads to allow all events to be processed */
397 	poll_threads();
398 
399 	/* Make sure the bdev was not unregistered. We still have a
400 	 * descriptor open */
401 	CU_ASSERT(done == false);
402 
403 	spdk_bdev_close(desc);
404 	poll_threads();
405 	desc = NULL;
406 
407 	/* The unregister should have completed */
408 	CU_ASSERT(done == true);
409 
410 
411 	/* Register the bdev again */
412 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
413 
414 	remove_notify = false;
415 	spdk_bdev_open(&g_bdev.bdev, true, _bdev_removed, &remove_notify, &desc);
416 	SPDK_CU_ASSERT_FATAL(desc != NULL);
417 	CU_ASSERT(remove_notify == false);
418 
419 	/* There is an open descriptor on the device. Unregister it,
420 	 * which can't proceed until the descriptor is closed. */
421 	done = false;
422 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
423 	/* No polling has occurred, so neither of these should execute */
424 	CU_ASSERT(remove_notify == false);
425 	CU_ASSERT(done == false);
426 
427 	/* Prior to the unregister completing, close the descriptor */
428 	spdk_bdev_close(desc);
429 
430 	/* Poll the threads to allow all events to be processed */
431 	poll_threads();
432 
433 	/* Remove notify should not have been called because the
434 	 * descriptor is already closed. */
435 	CU_ASSERT(remove_notify == false);
436 
437 	/* The unregister should have completed */
438 	CU_ASSERT(done == true);
439 
440 	/* Restore the original g_bdev so that we can use teardown_test(). */
441 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
442 	spdk_bdev_open(&g_bdev.bdev, true, NULL, NULL, &g_desc);
443 	teardown_test();
444 }
445 
446 static void
447 reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
448 {
449 	bool *done = cb_arg;
450 
451 	CU_ASSERT(success == true);
452 	*done = true;
453 	spdk_bdev_free_io(bdev_io);
454 }
455 
456 static void
457 put_channel_during_reset(void)
458 {
459 	struct spdk_io_channel *io_ch;
460 	bool done = false;
461 
462 	setup_test();
463 
464 	set_thread(0);
465 	io_ch = spdk_bdev_get_io_channel(g_desc);
466 	CU_ASSERT(io_ch != NULL);
467 
468 	/*
469 	 * Start a reset, but then put the I/O channel before
470 	 *  the deferred messages for the reset get a chance to
471 	 *  execute.
472 	 */
473 	spdk_bdev_reset(g_desc, io_ch, reset_done, &done);
474 	spdk_put_io_channel(io_ch);
475 	poll_threads();
476 	stub_complete_io(g_bdev.io_target, 0);
477 
478 	teardown_test();
479 }
480 
481 static void
482 aborted_reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
483 {
484 	enum spdk_bdev_io_status *status = cb_arg;
485 
486 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
487 	spdk_bdev_free_io(bdev_io);
488 }
489 
490 static void
491 aborted_reset(void)
492 {
493 	struct spdk_io_channel *io_ch[2];
494 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
495 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
496 
497 	setup_test();
498 
499 	set_thread(0);
500 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
501 	CU_ASSERT(io_ch[0] != NULL);
502 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
503 	poll_threads();
504 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
505 
506 	/*
507 	 * First reset has been submitted on ch0.  Now submit a second
508 	 *  reset on ch1 which will get queued since there is already a
509 	 *  reset in progress.
510 	 */
511 	set_thread(1);
512 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
513 	CU_ASSERT(io_ch[1] != NULL);
514 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
515 	poll_threads();
516 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
517 
518 	/*
519 	 * Now destroy ch1.  This will abort the queued reset.  Check that
520 	 *  the second reset was completed with failed status.  Also check
521 	 *  that bdev->internal.reset_in_progress != NULL, since the
522 	 *  original reset has not been completed yet.  This ensures that
523 	 *  the bdev code is correctly noticing that the failed reset is
524 	 *  *not* the one that had been submitted to the bdev module.
525 	 */
526 	set_thread(1);
527 	spdk_put_io_channel(io_ch[1]);
528 	poll_threads();
529 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_FAILED);
530 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
531 
532 	/*
533 	 * Now complete the first reset, verify that it completed with SUCCESS
534 	 *  status and that bdev->internal.reset_in_progress is also set back to NULL.
535 	 */
536 	set_thread(0);
537 	spdk_put_io_channel(io_ch[0]);
538 	stub_complete_io(g_bdev.io_target, 0);
539 	poll_threads();
540 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
541 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
542 
543 	teardown_test();
544 }
545 
546 static void
547 io_during_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
548 {
549 	enum spdk_bdev_io_status *status = cb_arg;
550 
551 	*status = bdev_io->internal.status;
552 	spdk_bdev_free_io(bdev_io);
553 }
554 
555 static void
556 io_during_reset(void)
557 {
558 	struct spdk_io_channel *io_ch[2];
559 	struct spdk_bdev_channel *bdev_ch[2];
560 	enum spdk_bdev_io_status status0, status1, status_reset;
561 	int rc;
562 
563 	setup_test();
564 
565 	/*
566 	 * First test normal case - submit an I/O on each of two channels (with no resets)
567 	 *  and verify they complete successfully.
568 	 */
569 	set_thread(0);
570 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
571 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
572 	CU_ASSERT(bdev_ch[0]->flags == 0);
573 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
574 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
575 	CU_ASSERT(rc == 0);
576 
577 	set_thread(1);
578 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
579 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
580 	CU_ASSERT(bdev_ch[1]->flags == 0);
581 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
582 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
583 	CU_ASSERT(rc == 0);
584 
585 	poll_threads();
586 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
587 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
588 
589 	set_thread(0);
590 	stub_complete_io(g_bdev.io_target, 0);
591 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
592 
593 	set_thread(1);
594 	stub_complete_io(g_bdev.io_target, 0);
595 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
596 
597 	/*
598 	 * Now submit a reset, and leave it pending while we submit I/O on two different
599 	 *  channels.  These I/O should be failed by the bdev layer since the reset is in
600 	 *  progress.
601 	 */
602 	set_thread(0);
603 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
604 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &status_reset);
605 	CU_ASSERT(rc == 0);
606 
607 	CU_ASSERT(bdev_ch[0]->flags == 0);
608 	CU_ASSERT(bdev_ch[1]->flags == 0);
609 	poll_threads();
610 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_RESET_IN_PROGRESS);
611 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_RESET_IN_PROGRESS);
612 
613 	set_thread(0);
614 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
615 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
616 	CU_ASSERT(rc == 0);
617 
618 	set_thread(1);
619 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
620 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
621 	CU_ASSERT(rc == 0);
622 
623 	/*
624 	 * A reset is in progress so these read I/O should complete with failure.  Note that we
625 	 *  need to poll_threads() since I/O completed inline have their completion deferred.
626 	 */
627 	poll_threads();
628 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
629 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_FAILED);
630 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_FAILED);
631 
632 	/*
633 	 * Complete the reset
634 	 */
635 	set_thread(0);
636 	stub_complete_io(g_bdev.io_target, 0);
637 
638 	/*
639 	 * Only poll thread 0. We should not get a completion.
640 	 */
641 	poll_thread(0);
642 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
643 
644 	/*
645 	 * Poll both thread 0 and 1 so the messages can propagate and we
646 	 * get a completion.
647 	 */
648 	poll_threads();
649 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
650 
651 	spdk_put_io_channel(io_ch[0]);
652 	set_thread(1);
653 	spdk_put_io_channel(io_ch[1]);
654 	poll_threads();
655 
656 	teardown_test();
657 }
658 
659 static void
660 basic_qos(void)
661 {
662 	struct spdk_io_channel *io_ch[2];
663 	struct spdk_bdev_channel *bdev_ch[2];
664 	struct spdk_bdev *bdev;
665 	enum spdk_bdev_io_status status, abort_status;
666 	int rc;
667 
668 	setup_test();
669 
670 	/* Enable QoS */
671 	bdev = &g_bdev.bdev;
672 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
673 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
674 	TAILQ_INIT(&bdev->internal.qos->queued);
675 	/*
676 	 * Enable read/write IOPS, read only byte per second and
677 	 * read/write byte per second rate limits.
678 	 * In this case, all rate limits will take equal effect.
679 	 */
680 	/* 2000 read/write I/O per second, or 2 per millisecond */
681 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
682 	/* 8K read/write byte per millisecond with 4K block size */
683 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
684 	/* 8K read only byte per millisecond with 4K block size */
685 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 8192000;
686 
687 	g_get_io_channel = true;
688 
689 	set_thread(0);
690 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
691 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
692 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
693 
694 	set_thread(1);
695 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
696 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
697 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
698 
699 	/*
700 	 * Send an I/O on thread 0, which is where the QoS thread is running.
701 	 */
702 	set_thread(0);
703 	status = SPDK_BDEV_IO_STATUS_PENDING;
704 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
705 	CU_ASSERT(rc == 0);
706 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
707 	poll_threads();
708 	stub_complete_io(g_bdev.io_target, 0);
709 	poll_threads();
710 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
711 
712 	/* Send an I/O on thread 1. The QoS thread is not running here. */
713 	status = SPDK_BDEV_IO_STATUS_PENDING;
714 	set_thread(1);
715 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
716 	CU_ASSERT(rc == 0);
717 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
718 	poll_threads();
719 	/* Complete I/O on thread 1. This should not complete the I/O we submitted */
720 	stub_complete_io(g_bdev.io_target, 0);
721 	poll_threads();
722 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
723 	/* Now complete I/O on thread 0 */
724 	set_thread(0);
725 	poll_threads();
726 	stub_complete_io(g_bdev.io_target, 0);
727 	poll_threads();
728 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
729 
730 	/* Reset rate limit for the next test cases. */
731 	spdk_delay_us(SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
732 	poll_threads();
733 
734 	/*
735 	 * Test abort request when QoS is enabled.
736 	 */
737 
738 	/* Send an I/O on thread 0, which is where the QoS thread is running. */
739 	set_thread(0);
740 	status = SPDK_BDEV_IO_STATUS_PENDING;
741 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
742 	CU_ASSERT(rc == 0);
743 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
744 	/* Send an abort to the I/O on the same thread. */
745 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
746 	rc = spdk_bdev_abort(g_desc, io_ch[0], &status, io_during_io_done, &abort_status);
747 	CU_ASSERT(rc == 0);
748 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
749 	poll_threads();
750 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
751 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
752 
753 	/* Send an I/O on thread 1. The QoS thread is not running here. */
754 	status = SPDK_BDEV_IO_STATUS_PENDING;
755 	set_thread(1);
756 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
757 	CU_ASSERT(rc == 0);
758 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
759 	poll_threads();
760 	/* Send an abort to the I/O on the same thread. */
761 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
762 	rc = spdk_bdev_abort(g_desc, io_ch[1], &status, io_during_io_done, &abort_status);
763 	CU_ASSERT(rc == 0);
764 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
765 	poll_threads();
766 	/* Complete the I/O with failure and the abort with success on thread 1. */
767 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
768 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
769 
770 	set_thread(0);
771 
772 	/*
773 	 * Close the descriptor only, which should stop the qos channel as
774 	 * the last descriptor removed.
775 	 */
776 	spdk_bdev_close(g_desc);
777 	poll_threads();
778 	CU_ASSERT(bdev->internal.qos->ch == NULL);
779 
780 	/*
781 	 * Open the bdev again which shall setup the qos channel as the
782 	 * channels are valid.
783 	 */
784 	spdk_bdev_open(bdev, true, NULL, NULL, &g_desc);
785 	poll_threads();
786 	CU_ASSERT(bdev->internal.qos->ch != NULL);
787 
788 	/* Tear down the channels */
789 	set_thread(0);
790 	spdk_put_io_channel(io_ch[0]);
791 	set_thread(1);
792 	spdk_put_io_channel(io_ch[1]);
793 	poll_threads();
794 	set_thread(0);
795 
796 	/* Close the descriptor, which should stop the qos channel */
797 	spdk_bdev_close(g_desc);
798 	poll_threads();
799 	CU_ASSERT(bdev->internal.qos->ch == NULL);
800 
801 	/* Open the bdev again, no qos channel setup without valid channels. */
802 	spdk_bdev_open(bdev, true, NULL, NULL, &g_desc);
803 	poll_threads();
804 	CU_ASSERT(bdev->internal.qos->ch == NULL);
805 
806 	/* Create the channels in reverse order. */
807 	set_thread(1);
808 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
809 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
810 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
811 
812 	set_thread(0);
813 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
814 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
815 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
816 
817 	/* Confirm that the qos thread is now thread 1 */
818 	CU_ASSERT(bdev->internal.qos->ch == bdev_ch[1]);
819 
820 	/* Tear down the channels */
821 	set_thread(0);
822 	spdk_put_io_channel(io_ch[0]);
823 	set_thread(1);
824 	spdk_put_io_channel(io_ch[1]);
825 	poll_threads();
826 
827 	set_thread(0);
828 
829 	teardown_test();
830 }
831 
832 static void
833 io_during_qos_queue(void)
834 {
835 	struct spdk_io_channel *io_ch[2];
836 	struct spdk_bdev_channel *bdev_ch[2];
837 	struct spdk_bdev *bdev;
838 	enum spdk_bdev_io_status status0, status1, status2;
839 	int rc;
840 
841 	setup_test();
842 	MOCK_SET(spdk_get_ticks, 0);
843 
844 	/* Enable QoS */
845 	bdev = &g_bdev.bdev;
846 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
847 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
848 	TAILQ_INIT(&bdev->internal.qos->queued);
849 	/*
850 	 * Enable read/write IOPS, read only byte per sec, write only
851 	 * byte per sec and read/write byte per sec rate limits.
852 	 * In this case, both read only and write only byte per sec
853 	 * rate limit will take effect.
854 	 */
855 	/* 4000 read/write I/O per second, or 4 per millisecond */
856 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 4000;
857 	/* 8K byte per millisecond with 4K block size */
858 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
859 	/* 4K byte per millisecond with 4K block size */
860 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 4096000;
861 	/* 4K byte per millisecond with 4K block size */
862 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 4096000;
863 
864 	g_get_io_channel = true;
865 
866 	/* Create channels */
867 	set_thread(0);
868 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
869 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
870 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
871 
872 	set_thread(1);
873 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
874 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
875 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
876 
877 	/* Send two read I/Os */
878 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
879 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
880 	CU_ASSERT(rc == 0);
881 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
882 	set_thread(0);
883 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
884 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
885 	CU_ASSERT(rc == 0);
886 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
887 	/* Send one write I/O */
888 	status2 = SPDK_BDEV_IO_STATUS_PENDING;
889 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status2);
890 	CU_ASSERT(rc == 0);
891 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_PENDING);
892 
893 	/* Complete any I/O that arrived at the disk */
894 	poll_threads();
895 	set_thread(1);
896 	stub_complete_io(g_bdev.io_target, 0);
897 	set_thread(0);
898 	stub_complete_io(g_bdev.io_target, 0);
899 	poll_threads();
900 
901 	/* Only one of the two read I/Os should complete. (logical XOR) */
902 	if (status0 == SPDK_BDEV_IO_STATUS_SUCCESS) {
903 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
904 	} else {
905 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
906 	}
907 	/* The write I/O should complete. */
908 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
909 
910 	/* Advance in time by a millisecond */
911 	spdk_delay_us(1000);
912 
913 	/* Complete more I/O */
914 	poll_threads();
915 	set_thread(1);
916 	stub_complete_io(g_bdev.io_target, 0);
917 	set_thread(0);
918 	stub_complete_io(g_bdev.io_target, 0);
919 	poll_threads();
920 
921 	/* Now the second read I/O should be done */
922 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
923 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
924 
925 	/* Tear down the channels */
926 	set_thread(1);
927 	spdk_put_io_channel(io_ch[1]);
928 	set_thread(0);
929 	spdk_put_io_channel(io_ch[0]);
930 	poll_threads();
931 
932 	teardown_test();
933 }
934 
935 static void
936 io_during_qos_reset(void)
937 {
938 	struct spdk_io_channel *io_ch[2];
939 	struct spdk_bdev_channel *bdev_ch[2];
940 	struct spdk_bdev *bdev;
941 	enum spdk_bdev_io_status status0, status1, reset_status;
942 	int rc;
943 
944 	setup_test();
945 	MOCK_SET(spdk_get_ticks, 0);
946 
947 	/* Enable QoS */
948 	bdev = &g_bdev.bdev;
949 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
950 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
951 	TAILQ_INIT(&bdev->internal.qos->queued);
952 	/*
953 	 * Enable read/write IOPS, write only byte per sec and
954 	 * read/write byte per second rate limits.
955 	 * In this case, read/write byte per second rate limit will
956 	 * take effect first.
957 	 */
958 	/* 2000 read/write I/O per second, or 2 per millisecond */
959 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
960 	/* 4K byte per millisecond with 4K block size */
961 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 4096000;
962 	/* 8K byte per millisecond with 4K block size */
963 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 8192000;
964 
965 	g_get_io_channel = true;
966 
967 	/* Create channels */
968 	set_thread(0);
969 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
970 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
971 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
972 
973 	set_thread(1);
974 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
975 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
976 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
977 
978 	/* Send two I/O. One of these gets queued by QoS. The other is sitting at the disk. */
979 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
980 	rc = spdk_bdev_write_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
981 	CU_ASSERT(rc == 0);
982 	set_thread(0);
983 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
984 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
985 	CU_ASSERT(rc == 0);
986 
987 	poll_threads();
988 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
989 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
990 
991 	/* Reset the bdev. */
992 	reset_status = SPDK_BDEV_IO_STATUS_PENDING;
993 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &reset_status);
994 	CU_ASSERT(rc == 0);
995 
996 	/* Complete any I/O that arrived at the disk */
997 	poll_threads();
998 	set_thread(1);
999 	stub_complete_io(g_bdev.io_target, 0);
1000 	set_thread(0);
1001 	stub_complete_io(g_bdev.io_target, 0);
1002 	poll_threads();
1003 
1004 	CU_ASSERT(reset_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1005 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_FAILED);
1006 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_FAILED);
1007 
1008 	/* Tear down the channels */
1009 	set_thread(1);
1010 	spdk_put_io_channel(io_ch[1]);
1011 	set_thread(0);
1012 	spdk_put_io_channel(io_ch[0]);
1013 	poll_threads();
1014 
1015 	teardown_test();
1016 }
1017 
1018 static void
1019 enomem_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1020 {
1021 	enum spdk_bdev_io_status *status = cb_arg;
1022 
1023 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1024 	spdk_bdev_free_io(bdev_io);
1025 }
1026 
1027 static void
1028 enomem(void)
1029 {
1030 	struct spdk_io_channel *io_ch;
1031 	struct spdk_bdev_channel *bdev_ch;
1032 	struct spdk_bdev_shared_resource *shared_resource;
1033 	struct ut_bdev_channel *ut_ch;
1034 	const uint32_t IO_ARRAY_SIZE = 64;
1035 	const uint32_t AVAIL = 20;
1036 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE], status_reset;
1037 	uint32_t nomem_cnt, i;
1038 	struct spdk_bdev_io *first_io;
1039 	int rc;
1040 
1041 	setup_test();
1042 
1043 	set_thread(0);
1044 	io_ch = spdk_bdev_get_io_channel(g_desc);
1045 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1046 	shared_resource = bdev_ch->shared_resource;
1047 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1048 	ut_ch->avail_cnt = AVAIL;
1049 
1050 	/* First submit a number of IOs equal to what the channel can support. */
1051 	for (i = 0; i < AVAIL; i++) {
1052 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1053 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1054 		CU_ASSERT(rc == 0);
1055 	}
1056 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1057 
1058 	/*
1059 	 * Next, submit one additional I/O.  This one should fail with ENOMEM and then go onto
1060 	 *  the enomem_io list.
1061 	 */
1062 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1063 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1064 	CU_ASSERT(rc == 0);
1065 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1066 	first_io = TAILQ_FIRST(&shared_resource->nomem_io);
1067 
1068 	/*
1069 	 * Now submit a bunch more I/O.  These should all fail with ENOMEM and get queued behind
1070 	 *  the first_io above.
1071 	 */
1072 	for (i = AVAIL + 1; i < IO_ARRAY_SIZE; i++) {
1073 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1074 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1075 		CU_ASSERT(rc == 0);
1076 	}
1077 
1078 	/* Assert that first_io is still at the head of the list. */
1079 	CU_ASSERT(TAILQ_FIRST(&shared_resource->nomem_io) == first_io);
1080 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == (IO_ARRAY_SIZE - AVAIL));
1081 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1082 	CU_ASSERT(shared_resource->nomem_threshold == (AVAIL - NOMEM_THRESHOLD_COUNT));
1083 
1084 	/*
1085 	 * Complete 1 I/O only.  The key check here is bdev_io_tailq_cnt - this should not have
1086 	 *  changed since completing just 1 I/O should not trigger retrying the queued nomem_io
1087 	 *  list.
1088 	 */
1089 	stub_complete_io(g_bdev.io_target, 1);
1090 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1091 
1092 	/*
1093 	 * Complete enough I/O to hit the nomem_theshold.  This should trigger retrying nomem_io,
1094 	 *  and we should see I/O get resubmitted to the test bdev module.
1095 	 */
1096 	stub_complete_io(g_bdev.io_target, NOMEM_THRESHOLD_COUNT - 1);
1097 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) < nomem_cnt);
1098 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1099 
1100 	/* Complete 1 I/O only.  This should not trigger retrying the queued nomem_io. */
1101 	stub_complete_io(g_bdev.io_target, 1);
1102 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1103 
1104 	/*
1105 	 * Send a reset and confirm that all I/O are completed, including the ones that
1106 	 *  were queued on the nomem_io list.
1107 	 */
1108 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
1109 	rc = spdk_bdev_reset(g_desc, io_ch, enomem_done, &status_reset);
1110 	poll_threads();
1111 	CU_ASSERT(rc == 0);
1112 	/* This will complete the reset. */
1113 	stub_complete_io(g_bdev.io_target, 0);
1114 
1115 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == 0);
1116 	CU_ASSERT(shared_resource->io_outstanding == 0);
1117 
1118 	spdk_put_io_channel(io_ch);
1119 	poll_threads();
1120 	teardown_test();
1121 }
1122 
1123 static void
1124 enomem_multi_bdev(void)
1125 {
1126 	struct spdk_io_channel *io_ch;
1127 	struct spdk_bdev_channel *bdev_ch;
1128 	struct spdk_bdev_shared_resource *shared_resource;
1129 	struct ut_bdev_channel *ut_ch;
1130 	const uint32_t IO_ARRAY_SIZE = 64;
1131 	const uint32_t AVAIL = 20;
1132 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1133 	uint32_t i;
1134 	struct ut_bdev *second_bdev;
1135 	struct spdk_bdev_desc *second_desc = NULL;
1136 	struct spdk_bdev_channel *second_bdev_ch;
1137 	struct spdk_io_channel *second_ch;
1138 	int rc;
1139 
1140 	setup_test();
1141 
1142 	/* Register second bdev with the same io_target  */
1143 	second_bdev = calloc(1, sizeof(*second_bdev));
1144 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1145 	register_bdev(second_bdev, "ut_bdev2", g_bdev.io_target);
1146 	spdk_bdev_open(&second_bdev->bdev, true, NULL, NULL, &second_desc);
1147 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1148 
1149 	set_thread(0);
1150 	io_ch = spdk_bdev_get_io_channel(g_desc);
1151 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1152 	shared_resource = bdev_ch->shared_resource;
1153 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1154 	ut_ch->avail_cnt = AVAIL;
1155 
1156 	second_ch = spdk_bdev_get_io_channel(second_desc);
1157 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1158 	SPDK_CU_ASSERT_FATAL(shared_resource == second_bdev_ch->shared_resource);
1159 
1160 	/* Saturate io_target through bdev A. */
1161 	for (i = 0; i < AVAIL; i++) {
1162 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1163 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1164 		CU_ASSERT(rc == 0);
1165 	}
1166 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1167 
1168 	/*
1169 	 * Now submit I/O through the second bdev. This should fail with ENOMEM
1170 	 * and then go onto the nomem_io list.
1171 	 */
1172 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1173 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1174 	CU_ASSERT(rc == 0);
1175 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1176 
1177 	/* Complete first bdev's I/O. This should retry sending second bdev's nomem_io */
1178 	stub_complete_io(g_bdev.io_target, AVAIL);
1179 
1180 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1181 	CU_ASSERT(shared_resource->io_outstanding == 1);
1182 
1183 	/* Now complete our retried I/O  */
1184 	stub_complete_io(g_bdev.io_target, 1);
1185 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1186 
1187 	spdk_put_io_channel(io_ch);
1188 	spdk_put_io_channel(second_ch);
1189 	spdk_bdev_close(second_desc);
1190 	unregister_bdev(second_bdev);
1191 	poll_threads();
1192 	free(second_bdev);
1193 	teardown_test();
1194 }
1195 
1196 
1197 static void
1198 enomem_multi_io_target(void)
1199 {
1200 	struct spdk_io_channel *io_ch;
1201 	struct spdk_bdev_channel *bdev_ch;
1202 	struct ut_bdev_channel *ut_ch;
1203 	const uint32_t IO_ARRAY_SIZE = 64;
1204 	const uint32_t AVAIL = 20;
1205 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1206 	uint32_t i;
1207 	int new_io_device;
1208 	struct ut_bdev *second_bdev;
1209 	struct spdk_bdev_desc *second_desc = NULL;
1210 	struct spdk_bdev_channel *second_bdev_ch;
1211 	struct spdk_io_channel *second_ch;
1212 	int rc;
1213 
1214 	setup_test();
1215 
1216 	/* Create new io_target and a second bdev using it */
1217 	spdk_io_device_register(&new_io_device, stub_create_ch, stub_destroy_ch,
1218 				sizeof(struct ut_bdev_channel), NULL);
1219 	second_bdev = calloc(1, sizeof(*second_bdev));
1220 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1221 	register_bdev(second_bdev, "ut_bdev2", &new_io_device);
1222 	spdk_bdev_open(&second_bdev->bdev, true, NULL, NULL, &second_desc);
1223 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1224 
1225 	set_thread(0);
1226 	io_ch = spdk_bdev_get_io_channel(g_desc);
1227 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1228 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1229 	ut_ch->avail_cnt = AVAIL;
1230 
1231 	/* Different io_target should imply a different shared_resource */
1232 	second_ch = spdk_bdev_get_io_channel(second_desc);
1233 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1234 	SPDK_CU_ASSERT_FATAL(bdev_ch->shared_resource != second_bdev_ch->shared_resource);
1235 
1236 	/* Saturate io_target through bdev A. */
1237 	for (i = 0; i < AVAIL; i++) {
1238 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1239 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1240 		CU_ASSERT(rc == 0);
1241 	}
1242 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1243 
1244 	/* Issue one more I/O to fill ENOMEM list. */
1245 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1246 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1247 	CU_ASSERT(rc == 0);
1248 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1249 
1250 	/*
1251 	 * Now submit I/O through the second bdev. This should go through and complete
1252 	 * successfully because we're using a different io_device underneath.
1253 	 */
1254 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1255 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1256 	CU_ASSERT(rc == 0);
1257 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&second_bdev_ch->shared_resource->nomem_io));
1258 	stub_complete_io(second_bdev->io_target, 1);
1259 
1260 	/* Cleanup; Complete outstanding I/O. */
1261 	stub_complete_io(g_bdev.io_target, AVAIL);
1262 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1263 	/* Complete the ENOMEM I/O */
1264 	stub_complete_io(g_bdev.io_target, 1);
1265 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1266 
1267 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1268 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1269 	spdk_put_io_channel(io_ch);
1270 	spdk_put_io_channel(second_ch);
1271 	spdk_bdev_close(second_desc);
1272 	unregister_bdev(second_bdev);
1273 	spdk_io_device_unregister(&new_io_device, NULL);
1274 	poll_threads();
1275 	free(second_bdev);
1276 	teardown_test();
1277 }
1278 
1279 static void
1280 qos_dynamic_enable_done(void *cb_arg, int status)
1281 {
1282 	int *rc = cb_arg;
1283 	*rc = status;
1284 }
1285 
1286 static void
1287 qos_dynamic_enable(void)
1288 {
1289 	struct spdk_io_channel *io_ch[2];
1290 	struct spdk_bdev_channel *bdev_ch[2];
1291 	struct spdk_bdev *bdev;
1292 	enum spdk_bdev_io_status bdev_io_status[2];
1293 	uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
1294 	int status, second_status, rc, i;
1295 
1296 	setup_test();
1297 	MOCK_SET(spdk_get_ticks, 0);
1298 
1299 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1300 		limits[i] = UINT64_MAX;
1301 	}
1302 
1303 	bdev = &g_bdev.bdev;
1304 
1305 	g_get_io_channel = true;
1306 
1307 	/* Create channels */
1308 	set_thread(0);
1309 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1310 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1311 	CU_ASSERT(bdev_ch[0]->flags == 0);
1312 
1313 	set_thread(1);
1314 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1315 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1316 	CU_ASSERT(bdev_ch[1]->flags == 0);
1317 
1318 	set_thread(0);
1319 
1320 	/*
1321 	 * Enable QoS: Read/Write IOPS, Read/Write byte,
1322 	 * Read only byte and Write only byte per second
1323 	 * rate limits.
1324 	 * More than 10 I/Os allowed per timeslice.
1325 	 */
1326 	status = -1;
1327 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1328 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 100;
1329 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 100;
1330 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 10;
1331 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1332 	poll_threads();
1333 	CU_ASSERT(status == 0);
1334 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1335 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1336 
1337 	/*
1338 	 * Submit and complete 10 I/O to fill the QoS allotment for this timeslice.
1339 	 * Additional I/O will then be queued.
1340 	 */
1341 	set_thread(0);
1342 	for (i = 0; i < 10; i++) {
1343 		bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1344 		rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1345 		CU_ASSERT(rc == 0);
1346 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1347 		poll_thread(0);
1348 		stub_complete_io(g_bdev.io_target, 0);
1349 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1350 	}
1351 
1352 	/*
1353 	 * Send two more I/O.  These I/O will be queued since the current timeslice allotment has been
1354 	 * filled already.  We want to test that when QoS is disabled that these two I/O:
1355 	 *  1) are not aborted
1356 	 *  2) are sent back to their original thread for resubmission
1357 	 */
1358 	bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1359 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1360 	CU_ASSERT(rc == 0);
1361 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1362 	set_thread(1);
1363 	bdev_io_status[1] = SPDK_BDEV_IO_STATUS_PENDING;
1364 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &bdev_io_status[1]);
1365 	CU_ASSERT(rc == 0);
1366 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1367 	poll_threads();
1368 
1369 	/*
1370 	 * Disable QoS: Read/Write IOPS, Read/Write byte,
1371 	 * Read only byte rate limits
1372 	 */
1373 	status = -1;
1374 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1375 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 0;
1376 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 0;
1377 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1378 	poll_threads();
1379 	CU_ASSERT(status == 0);
1380 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1381 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1382 
1383 	/* Disable QoS: Write only Byte per second rate limit */
1384 	status = -1;
1385 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 0;
1386 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1387 	poll_threads();
1388 	CU_ASSERT(status == 0);
1389 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1390 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1391 
1392 	/*
1393 	 * All I/O should have been resubmitted back on their original thread.  Complete
1394 	 *  all I/O on thread 0, and ensure that only the thread 0 I/O was completed.
1395 	 */
1396 	set_thread(0);
1397 	stub_complete_io(g_bdev.io_target, 0);
1398 	poll_threads();
1399 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1400 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1401 
1402 	/* Now complete all I/O on thread 1 and ensure the thread 1 I/O was completed. */
1403 	set_thread(1);
1404 	stub_complete_io(g_bdev.io_target, 0);
1405 	poll_threads();
1406 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_SUCCESS);
1407 
1408 	/* Disable QoS again */
1409 	status = -1;
1410 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1411 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1412 	poll_threads();
1413 	CU_ASSERT(status == 0); /* This should succeed */
1414 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1415 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1416 
1417 	/* Enable QoS on thread 0 */
1418 	status = -1;
1419 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1420 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1421 	poll_threads();
1422 	CU_ASSERT(status == 0);
1423 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1424 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1425 
1426 	/* Disable QoS on thread 1 */
1427 	set_thread(1);
1428 	status = -1;
1429 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1430 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1431 	/* Don't poll yet. This should leave the channels with QoS enabled */
1432 	CU_ASSERT(status == -1);
1433 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1434 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1435 
1436 	/* Enable QoS. This should immediately fail because the previous disable QoS hasn't completed. */
1437 	second_status = 0;
1438 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 10;
1439 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &second_status);
1440 	poll_threads();
1441 	CU_ASSERT(status == 0); /* The disable should succeed */
1442 	CU_ASSERT(second_status < 0); /* The enable should fail */
1443 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1444 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1445 
1446 	/* Enable QoS on thread 1. This should succeed now that the disable has completed. */
1447 	status = -1;
1448 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1449 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1450 	poll_threads();
1451 	CU_ASSERT(status == 0);
1452 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1453 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1454 
1455 	/* Tear down the channels */
1456 	set_thread(0);
1457 	spdk_put_io_channel(io_ch[0]);
1458 	set_thread(1);
1459 	spdk_put_io_channel(io_ch[1]);
1460 	poll_threads();
1461 
1462 	set_thread(0);
1463 	teardown_test();
1464 }
1465 
1466 static void
1467 histogram_status_cb(void *cb_arg, int status)
1468 {
1469 	g_status = status;
1470 }
1471 
1472 static void
1473 histogram_data_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
1474 {
1475 	g_status = status;
1476 	g_histogram = histogram;
1477 }
1478 
1479 static void
1480 histogram_io_count(void *ctx, uint64_t start, uint64_t end, uint64_t count,
1481 		   uint64_t total, uint64_t so_far)
1482 {
1483 	g_count += count;
1484 }
1485 
1486 static void
1487 bdev_histograms_mt(void)
1488 {
1489 	struct spdk_io_channel *ch[2];
1490 	struct spdk_histogram_data *histogram;
1491 	uint8_t buf[4096];
1492 	int status = false;
1493 	int rc;
1494 
1495 
1496 	setup_test();
1497 
1498 	set_thread(0);
1499 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1500 	CU_ASSERT(ch[0] != NULL);
1501 
1502 	set_thread(1);
1503 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1504 	CU_ASSERT(ch[1] != NULL);
1505 
1506 
1507 	/* Enable histogram */
1508 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, true);
1509 	poll_threads();
1510 	CU_ASSERT(g_status == 0);
1511 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1512 
1513 	/* Allocate histogram */
1514 	histogram = spdk_histogram_data_alloc();
1515 
1516 	/* Check if histogram is zeroed */
1517 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1518 	poll_threads();
1519 	CU_ASSERT(g_status == 0);
1520 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1521 
1522 	g_count = 0;
1523 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1524 
1525 	CU_ASSERT(g_count == 0);
1526 
1527 	set_thread(0);
1528 	rc = spdk_bdev_write_blocks(g_desc, ch[0], &buf, 0, 1, io_during_io_done, &status);
1529 	CU_ASSERT(rc == 0);
1530 
1531 	spdk_delay_us(10);
1532 	stub_complete_io(g_bdev.io_target, 1);
1533 	poll_threads();
1534 	CU_ASSERT(status == true);
1535 
1536 
1537 	set_thread(1);
1538 	rc = spdk_bdev_read_blocks(g_desc, ch[1], &buf, 0, 1, io_during_io_done, &status);
1539 	CU_ASSERT(rc == 0);
1540 
1541 	spdk_delay_us(10);
1542 	stub_complete_io(g_bdev.io_target, 1);
1543 	poll_threads();
1544 	CU_ASSERT(status == true);
1545 
1546 	set_thread(0);
1547 
1548 	/* Check if histogram gathered data from all I/O channels */
1549 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1550 	poll_threads();
1551 	CU_ASSERT(g_status == 0);
1552 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1553 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1554 
1555 	g_count = 0;
1556 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1557 	CU_ASSERT(g_count == 2);
1558 
1559 	/* Disable histogram */
1560 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, false);
1561 	poll_threads();
1562 	CU_ASSERT(g_status == 0);
1563 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == false);
1564 
1565 	spdk_histogram_data_free(histogram);
1566 
1567 	/* Tear down the channels */
1568 	set_thread(0);
1569 	spdk_put_io_channel(ch[0]);
1570 	set_thread(1);
1571 	spdk_put_io_channel(ch[1]);
1572 	poll_threads();
1573 	set_thread(0);
1574 	teardown_test();
1575 
1576 }
1577 
1578 struct timeout_io_cb_arg {
1579 	struct iovec iov;
1580 	uint8_t type;
1581 };
1582 
1583 static int
1584 bdev_channel_count_submitted_io(struct spdk_bdev_channel *ch)
1585 {
1586 	struct spdk_bdev_io *bdev_io;
1587 	int n = 0;
1588 
1589 	if (!ch) {
1590 		return -1;
1591 	}
1592 
1593 	TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) {
1594 		n++;
1595 	}
1596 
1597 	return n;
1598 }
1599 
1600 static void
1601 bdev_channel_io_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1602 {
1603 	struct timeout_io_cb_arg *ctx = cb_arg;
1604 
1605 	ctx->type = bdev_io->type;
1606 	ctx->iov.iov_base = bdev_io->iov.iov_base;
1607 	ctx->iov.iov_len = bdev_io->iov.iov_len;
1608 }
1609 
1610 static bool g_io_done;
1611 
1612 static void
1613 io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1614 {
1615 	g_io_done = true;
1616 	spdk_bdev_free_io(bdev_io);
1617 }
1618 
1619 static void
1620 bdev_set_io_timeout_mt(void)
1621 {
1622 	struct spdk_io_channel *ch[3];
1623 	struct spdk_bdev_channel *bdev_ch[3];
1624 	struct timeout_io_cb_arg cb_arg;
1625 
1626 	setup_test();
1627 
1628 	g_bdev.bdev.optimal_io_boundary = 16;
1629 	g_bdev.bdev.split_on_optimal_io_boundary = true;
1630 
1631 	set_thread(0);
1632 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1633 	CU_ASSERT(ch[0] != NULL);
1634 
1635 	set_thread(1);
1636 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1637 	CU_ASSERT(ch[1] != NULL);
1638 
1639 	set_thread(2);
1640 	ch[2] = spdk_bdev_get_io_channel(g_desc);
1641 	CU_ASSERT(ch[2] != NULL);
1642 
1643 	/* Multi-thread mode
1644 	 * 1, Check the poller was registered successfully
1645 	 * 2, Check the timeout IO and ensure the IO was the submitted by user
1646 	 * 3, Check the link int the bdev_ch works right.
1647 	 * 4, Close desc and put io channel during the timeout poller is polling
1648 	 */
1649 
1650 	/* In desc thread set the timeout */
1651 	set_thread(0);
1652 	CU_ASSERT(spdk_bdev_set_timeout(g_desc, 5, bdev_channel_io_timeout_cb, &cb_arg) == 0);
1653 	CU_ASSERT(g_desc->io_timeout_poller != NULL);
1654 	CU_ASSERT(g_desc->cb_fn == bdev_channel_io_timeout_cb);
1655 	CU_ASSERT(g_desc->cb_arg == &cb_arg);
1656 
1657 	/* check the IO submitted list and timeout handler */
1658 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x2000, 0, 1, io_done, NULL) == 0);
1659 	bdev_ch[0] = spdk_io_channel_get_ctx(ch[0]);
1660 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 1);
1661 
1662 	set_thread(1);
1663 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1664 	bdev_ch[1] = spdk_io_channel_get_ctx(ch[1]);
1665 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 1);
1666 
1667 	/* Now test that a single-vector command is split correctly.
1668 	 * Offset 14, length 8, payload 0xF000
1669 	 *  Child - Offset 14, length 2, payload 0xF000
1670 	 *  Child - Offset 16, length 6, payload 0xF000 + 2 * 512
1671 	 *
1672 	 * Set up the expected values before calling spdk_bdev_read_blocks
1673 	 */
1674 	set_thread(2);
1675 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0xF000, 14, 8, io_done, NULL) == 0);
1676 	bdev_ch[2] = spdk_io_channel_get_ctx(ch[2]);
1677 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 3);
1678 
1679 	set_thread(0);
1680 	memset(&cb_arg, 0, sizeof(cb_arg));
1681 	spdk_delay_us(3 * spdk_get_ticks_hz());
1682 	poll_threads();
1683 	CU_ASSERT(cb_arg.type == 0);
1684 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1685 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1686 
1687 	/* Now the time reach the limit */
1688 	spdk_delay_us(3 * spdk_get_ticks_hz());
1689 	poll_thread(0);
1690 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1691 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
1692 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1693 	stub_complete_io(g_bdev.io_target, 1);
1694 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 0);
1695 
1696 	memset(&cb_arg, 0, sizeof(cb_arg));
1697 	set_thread(1);
1698 	poll_thread(1);
1699 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
1700 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x1000);
1701 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1702 	stub_complete_io(g_bdev.io_target, 1);
1703 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 0);
1704 
1705 	memset(&cb_arg, 0, sizeof(cb_arg));
1706 	set_thread(2);
1707 	poll_thread(2);
1708 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1709 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0xF000);
1710 	CU_ASSERT(cb_arg.iov.iov_len == 8 * g_bdev.bdev.blocklen);
1711 	stub_complete_io(g_bdev.io_target, 1);
1712 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 2);
1713 	stub_complete_io(g_bdev.io_target, 1);
1714 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 0);
1715 
1716 	/* Run poll_timeout_done() it means complete the timeout poller */
1717 	set_thread(0);
1718 	poll_thread(0);
1719 	CU_ASSERT(g_desc->refs == 0);
1720 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1721 	set_thread(1);
1722 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x2000, 0, 2, io_done, NULL) == 0);
1723 	set_thread(2);
1724 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0x3000, 0, 3, io_done, NULL) == 0);
1725 
1726 	/* Trigger timeout poller to run again, desc->refs is incremented.
1727 	 * In thread 0 we destroy the io channel before timeout poller runs.
1728 	 * Timeout callback is not called on thread 0.
1729 	 */
1730 	spdk_delay_us(6 * spdk_get_ticks_hz());
1731 	memset(&cb_arg, 0, sizeof(cb_arg));
1732 	set_thread(0);
1733 	stub_complete_io(g_bdev.io_target, 1);
1734 	spdk_put_io_channel(ch[0]);
1735 	poll_thread(0);
1736 	CU_ASSERT(g_desc->refs == 1)
1737 	CU_ASSERT(cb_arg.type == 0);
1738 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1739 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1740 
1741 	/* In thread 1 timeout poller runs then we destroy the io channel
1742 	 * Timeout callback is called on thread 1.
1743 	 */
1744 	memset(&cb_arg, 0, sizeof(cb_arg));
1745 	set_thread(1);
1746 	poll_thread(1);
1747 	stub_complete_io(g_bdev.io_target, 1);
1748 	spdk_put_io_channel(ch[1]);
1749 	poll_thread(1);
1750 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
1751 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
1752 	CU_ASSERT(cb_arg.iov.iov_len == 2 * g_bdev.bdev.blocklen);
1753 
1754 	/* Close the desc.
1755 	 * Unregister the timeout poller first.
1756 	 * Then decrement desc->refs but it's not zero yet so desc is not freed.
1757 	 */
1758 	set_thread(0);
1759 	spdk_bdev_close(g_desc);
1760 	CU_ASSERT(g_desc->refs == 1);
1761 	CU_ASSERT(g_desc->io_timeout_poller == NULL);
1762 
1763 	/* Timeout poller runs on thread 2 then we destroy the io channel.
1764 	 * Desc is closed so we would exit the timeout poller directly.
1765 	 * timeout callback is not called on thread 2.
1766 	 */
1767 	memset(&cb_arg, 0, sizeof(cb_arg));
1768 	set_thread(2);
1769 	poll_thread(2);
1770 	stub_complete_io(g_bdev.io_target, 1);
1771 	spdk_put_io_channel(ch[2]);
1772 	poll_thread(2);
1773 	CU_ASSERT(cb_arg.type == 0);
1774 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1775 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1776 
1777 	set_thread(0);
1778 	poll_thread(0);
1779 	g_teardown_done = false;
1780 	unregister_bdev(&g_bdev);
1781 	spdk_io_device_unregister(&g_io_device, NULL);
1782 	spdk_bdev_finish(finish_cb, NULL);
1783 	poll_threads();
1784 	memset(&g_bdev, 0, sizeof(g_bdev));
1785 	CU_ASSERT(g_teardown_done == true);
1786 	g_teardown_done = false;
1787 	free_threads();
1788 }
1789 
1790 static bool g_io_done2;
1791 static bool g_lock_lba_range_done;
1792 static bool g_unlock_lba_range_done;
1793 
1794 static void
1795 io_done2(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1796 {
1797 	g_io_done2 = true;
1798 	spdk_bdev_free_io(bdev_io);
1799 }
1800 
1801 static void
1802 lock_lba_range_done(void *ctx, int status)
1803 {
1804 	g_lock_lba_range_done = true;
1805 }
1806 
1807 static void
1808 unlock_lba_range_done(void *ctx, int status)
1809 {
1810 	g_unlock_lba_range_done = true;
1811 }
1812 
1813 static uint32_t
1814 stub_channel_outstanding_cnt(void *io_target)
1815 {
1816 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
1817 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1818 	uint32_t outstanding_cnt;
1819 
1820 	outstanding_cnt = ch->outstanding_cnt;
1821 
1822 	spdk_put_io_channel(_ch);
1823 	return outstanding_cnt;
1824 }
1825 
1826 static void
1827 lock_lba_range_then_submit_io(void)
1828 {
1829 	struct spdk_bdev_desc *desc = NULL;
1830 	void *io_target;
1831 	struct spdk_io_channel *io_ch[3];
1832 	struct spdk_bdev_channel *bdev_ch[3];
1833 	struct lba_range *range;
1834 	char buf[4096];
1835 	int ctx0, ctx1, ctx2;
1836 	int rc;
1837 
1838 	setup_test();
1839 
1840 	io_target = g_bdev.io_target;
1841 	desc = g_desc;
1842 
1843 	set_thread(0);
1844 	io_ch[0] = spdk_bdev_get_io_channel(desc);
1845 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1846 	CU_ASSERT(io_ch[0] != NULL);
1847 
1848 	set_thread(1);
1849 	io_ch[1] = spdk_bdev_get_io_channel(desc);
1850 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1851 	CU_ASSERT(io_ch[1] != NULL);
1852 
1853 	set_thread(0);
1854 	g_lock_lba_range_done = false;
1855 	rc = bdev_lock_lba_range(desc, io_ch[0], 20, 10, lock_lba_range_done, &ctx0);
1856 	CU_ASSERT(rc == 0);
1857 	poll_threads();
1858 
1859 	/* The lock should immediately become valid, since there are no outstanding
1860 	 * write I/O.
1861 	 */
1862 	CU_ASSERT(g_lock_lba_range_done == true);
1863 	range = TAILQ_FIRST(&bdev_ch[0]->locked_ranges);
1864 	SPDK_CU_ASSERT_FATAL(range != NULL);
1865 	CU_ASSERT(range->offset == 20);
1866 	CU_ASSERT(range->length == 10);
1867 	CU_ASSERT(range->owner_ch == bdev_ch[0]);
1868 
1869 	g_io_done = false;
1870 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1871 	rc = spdk_bdev_read_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
1872 	CU_ASSERT(rc == 0);
1873 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1874 
1875 	stub_complete_io(io_target, 1);
1876 	poll_threads();
1877 	CU_ASSERT(g_io_done == true);
1878 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1879 
1880 	/* Try a write I/O.  This should actually be allowed to execute, since the channel
1881 	 * holding the lock is submitting the write I/O.
1882 	 */
1883 	g_io_done = false;
1884 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1885 	rc = spdk_bdev_write_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
1886 	CU_ASSERT(rc == 0);
1887 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1888 
1889 	stub_complete_io(io_target, 1);
1890 	poll_threads();
1891 	CU_ASSERT(g_io_done == true);
1892 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1893 
1894 	/* Try a write I/O.  This should get queued in the io_locked tailq. */
1895 	set_thread(1);
1896 	g_io_done = false;
1897 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1898 	rc = spdk_bdev_write_blocks(desc, io_ch[1], buf, 20, 1, io_done, &ctx1);
1899 	CU_ASSERT(rc == 0);
1900 	poll_threads();
1901 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
1902 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1903 	CU_ASSERT(g_io_done == false);
1904 
1905 	/* Try to unlock the lba range using thread 1's io_ch.  This should fail. */
1906 	rc = bdev_unlock_lba_range(desc, io_ch[1], 20, 10, unlock_lba_range_done, &ctx1);
1907 	CU_ASSERT(rc == -EINVAL);
1908 
1909 	/* Now create a new channel and submit a write I/O with it.  This should also be queued.
1910 	 * The new channel should inherit the active locks from the bdev's internal list.
1911 	 */
1912 	set_thread(2);
1913 	io_ch[2] = spdk_bdev_get_io_channel(desc);
1914 	bdev_ch[2] = spdk_io_channel_get_ctx(io_ch[2]);
1915 	CU_ASSERT(io_ch[2] != NULL);
1916 
1917 	g_io_done2 = false;
1918 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1919 	rc = spdk_bdev_write_blocks(desc, io_ch[2], buf, 22, 2, io_done2, &ctx2);
1920 	CU_ASSERT(rc == 0);
1921 	poll_threads();
1922 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
1923 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1924 	CU_ASSERT(g_io_done2 == false);
1925 
1926 	set_thread(0);
1927 	rc = bdev_unlock_lba_range(desc, io_ch[0], 20, 10, unlock_lba_range_done, &ctx0);
1928 	CU_ASSERT(rc == 0);
1929 	poll_threads();
1930 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->locked_ranges));
1931 
1932 	/* The LBA range is unlocked, so the write IOs should now have started execution. */
1933 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1934 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1935 
1936 	set_thread(1);
1937 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1938 	stub_complete_io(io_target, 1);
1939 	set_thread(2);
1940 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1941 	stub_complete_io(io_target, 1);
1942 
1943 	poll_threads();
1944 	CU_ASSERT(g_io_done == true);
1945 	CU_ASSERT(g_io_done2 == true);
1946 
1947 	/* Tear down the channels */
1948 	set_thread(0);
1949 	spdk_put_io_channel(io_ch[0]);
1950 	set_thread(1);
1951 	spdk_put_io_channel(io_ch[1]);
1952 	set_thread(2);
1953 	spdk_put_io_channel(io_ch[2]);
1954 	poll_threads();
1955 	set_thread(0);
1956 	teardown_test();
1957 }
1958 
1959 int
1960 main(int argc, char **argv)
1961 {
1962 	CU_pSuite	suite = NULL;
1963 	unsigned int	num_failures;
1964 
1965 	CU_set_error_action(CUEA_ABORT);
1966 	CU_initialize_registry();
1967 
1968 	suite = CU_add_suite("bdev", NULL, NULL);
1969 
1970 	CU_ADD_TEST(suite, basic);
1971 	CU_ADD_TEST(suite, unregister_and_close);
1972 	CU_ADD_TEST(suite, basic_qos);
1973 	CU_ADD_TEST(suite, put_channel_during_reset);
1974 	CU_ADD_TEST(suite, aborted_reset);
1975 	CU_ADD_TEST(suite, io_during_reset);
1976 	CU_ADD_TEST(suite, io_during_qos_queue);
1977 	CU_ADD_TEST(suite, io_during_qos_reset);
1978 	CU_ADD_TEST(suite, enomem);
1979 	CU_ADD_TEST(suite, enomem_multi_bdev);
1980 	CU_ADD_TEST(suite, enomem_multi_io_target);
1981 	CU_ADD_TEST(suite, qos_dynamic_enable);
1982 	CU_ADD_TEST(suite, bdev_histograms_mt);
1983 	CU_ADD_TEST(suite, bdev_set_io_timeout_mt);
1984 	CU_ADD_TEST(suite, lock_lba_range_then_submit_io);
1985 
1986 	CU_basic_set_mode(CU_BRM_VERBOSE);
1987 	CU_basic_run_tests();
1988 	num_failures = CU_get_number_of_failures();
1989 	CU_cleanup_registry();
1990 	return num_failures;
1991 }
1992