xref: /spdk/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c (revision 2f5c602574a98ede645991abe279a96e19c50196)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_cunit.h"
35 
36 #include "common/lib/ut_multithread.c"
37 #include "unit/lib/json_mock.c"
38 
39 #include "spdk/config.h"
40 /* HACK: disable VTune integration so the unit test doesn't need VTune headers and libs to build */
41 #undef SPDK_CONFIG_VTUNE
42 
43 #include "bdev/bdev.c"
44 
45 #define BDEV_UT_NUM_THREADS 3
46 
47 DEFINE_STUB(spdk_notify_send, uint64_t, (const char *type, const char *ctx), 0);
48 DEFINE_STUB(spdk_notify_type_register, struct spdk_notify_type *, (const char *type), NULL);
49 DEFINE_STUB_V(spdk_scsi_nvme_translate, (const struct spdk_bdev_io *bdev_io, int *sc, int *sk,
50 		int *asc, int *ascq));
51 
52 struct ut_bdev {
53 	struct spdk_bdev	bdev;
54 	void			*io_target;
55 };
56 
57 struct ut_bdev_channel {
58 	TAILQ_HEAD(, spdk_bdev_io)	outstanding_io;
59 	uint32_t			outstanding_cnt;
60 	uint32_t			avail_cnt;
61 };
62 
63 int g_io_device;
64 struct ut_bdev g_bdev;
65 struct spdk_bdev_desc *g_desc;
66 bool g_teardown_done = false;
67 bool g_get_io_channel = true;
68 bool g_create_ch = true;
69 bool g_init_complete_called = false;
70 bool g_fini_start_called = true;
71 int g_status = 0;
72 int g_count = 0;
73 struct spdk_histogram_data *g_histogram = NULL;
74 
75 static int
76 stub_create_ch(void *io_device, void *ctx_buf)
77 {
78 	struct ut_bdev_channel *ch = ctx_buf;
79 
80 	if (g_create_ch == false) {
81 		return -1;
82 	}
83 
84 	TAILQ_INIT(&ch->outstanding_io);
85 	ch->outstanding_cnt = 0;
86 	/*
87 	 * When avail gets to 0, the submit_request function will return ENOMEM.
88 	 *  Most tests to not want ENOMEM to occur, so by default set this to a
89 	 *  big value that won't get hit.  The ENOMEM tests can then override this
90 	 *  value to something much smaller to induce ENOMEM conditions.
91 	 */
92 	ch->avail_cnt = 2048;
93 	return 0;
94 }
95 
96 static void
97 stub_destroy_ch(void *io_device, void *ctx_buf)
98 {
99 }
100 
101 static struct spdk_io_channel *
102 stub_get_io_channel(void *ctx)
103 {
104 	struct ut_bdev *ut_bdev = ctx;
105 
106 	if (g_get_io_channel == true) {
107 		return spdk_get_io_channel(ut_bdev->io_target);
108 	} else {
109 		return NULL;
110 	}
111 }
112 
113 static int
114 stub_destruct(void *ctx)
115 {
116 	return 0;
117 }
118 
119 static void
120 stub_submit_request(struct spdk_io_channel *_ch, struct spdk_bdev_io *bdev_io)
121 {
122 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
123 	struct spdk_bdev_io *io;
124 
125 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
126 		while (!TAILQ_EMPTY(&ch->outstanding_io)) {
127 			io = TAILQ_FIRST(&ch->outstanding_io);
128 			TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
129 			ch->outstanding_cnt--;
130 			spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
131 			ch->avail_cnt++;
132 		}
133 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) {
134 		TAILQ_FOREACH(io, &ch->outstanding_io, module_link) {
135 			if (io == bdev_io->u.abort.bio_to_abort) {
136 				TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
137 				ch->outstanding_cnt--;
138 				spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
139 				ch->avail_cnt++;
140 
141 				spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
142 				return;
143 			}
144 		}
145 
146 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
147 		return;
148 	}
149 
150 	if (ch->avail_cnt > 0) {
151 		TAILQ_INSERT_TAIL(&ch->outstanding_io, bdev_io, module_link);
152 		ch->outstanding_cnt++;
153 		ch->avail_cnt--;
154 	} else {
155 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
156 	}
157 }
158 
159 static uint32_t
160 stub_complete_io(void *io_target, uint32_t num_to_complete)
161 {
162 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
163 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
164 	struct spdk_bdev_io *io;
165 	bool complete_all = (num_to_complete == 0);
166 	uint32_t num_completed = 0;
167 
168 	while (complete_all || num_completed < num_to_complete) {
169 		if (TAILQ_EMPTY(&ch->outstanding_io)) {
170 			break;
171 		}
172 		io = TAILQ_FIRST(&ch->outstanding_io);
173 		TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
174 		ch->outstanding_cnt--;
175 		spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_SUCCESS);
176 		ch->avail_cnt++;
177 		num_completed++;
178 	}
179 
180 	spdk_put_io_channel(_ch);
181 	return num_completed;
182 }
183 
184 static bool
185 stub_io_type_supported(void *ctx, enum spdk_bdev_io_type type)
186 {
187 	return true;
188 }
189 
190 static struct spdk_bdev_fn_table fn_table = {
191 	.get_io_channel =	stub_get_io_channel,
192 	.destruct =		stub_destruct,
193 	.submit_request =	stub_submit_request,
194 	.io_type_supported =	stub_io_type_supported,
195 };
196 
197 struct spdk_bdev_module bdev_ut_if;
198 
199 static int
200 module_init(void)
201 {
202 	spdk_bdev_module_init_done(&bdev_ut_if);
203 	return 0;
204 }
205 
206 static void
207 module_fini(void)
208 {
209 }
210 
211 static void
212 init_complete(void)
213 {
214 	g_init_complete_called = true;
215 }
216 
217 static void
218 fini_start(void)
219 {
220 	g_fini_start_called = true;
221 }
222 
223 struct spdk_bdev_module bdev_ut_if = {
224 	.name = "bdev_ut",
225 	.module_init = module_init,
226 	.module_fini = module_fini,
227 	.async_init = true,
228 	.init_complete = init_complete,
229 	.fini_start = fini_start,
230 };
231 
232 SPDK_BDEV_MODULE_REGISTER(bdev_ut, &bdev_ut_if)
233 
234 static void
235 register_bdev(struct ut_bdev *ut_bdev, char *name, void *io_target)
236 {
237 	memset(ut_bdev, 0, sizeof(*ut_bdev));
238 
239 	ut_bdev->io_target = io_target;
240 	ut_bdev->bdev.ctxt = ut_bdev;
241 	ut_bdev->bdev.name = name;
242 	ut_bdev->bdev.fn_table = &fn_table;
243 	ut_bdev->bdev.module = &bdev_ut_if;
244 	ut_bdev->bdev.blocklen = 4096;
245 	ut_bdev->bdev.blockcnt = 1024;
246 
247 	spdk_bdev_register(&ut_bdev->bdev);
248 }
249 
250 static void
251 unregister_bdev(struct ut_bdev *ut_bdev)
252 {
253 	/* Handle any deferred messages. */
254 	poll_threads();
255 	spdk_bdev_unregister(&ut_bdev->bdev, NULL, NULL);
256 }
257 
258 static void
259 bdev_init_cb(void *done, int rc)
260 {
261 	CU_ASSERT(rc == 0);
262 	*(bool *)done = true;
263 }
264 
265 static void
266 _bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
267 	       void *event_ctx)
268 {
269 	switch (type) {
270 	case SPDK_BDEV_EVENT_REMOVE:
271 		if (event_ctx != NULL) {
272 			*(bool *)event_ctx = true;
273 		}
274 		break;
275 	default:
276 		CU_ASSERT(false);
277 		break;
278 	}
279 }
280 
281 static void
282 setup_test(void)
283 {
284 	bool done = false;
285 
286 	allocate_cores(BDEV_UT_NUM_THREADS);
287 	allocate_threads(BDEV_UT_NUM_THREADS);
288 	set_thread(0);
289 	spdk_bdev_initialize(bdev_init_cb, &done);
290 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
291 				sizeof(struct ut_bdev_channel), NULL);
292 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
293 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
294 }
295 
296 static void
297 finish_cb(void *cb_arg)
298 {
299 	g_teardown_done = true;
300 }
301 
302 static void
303 teardown_test(void)
304 {
305 	set_thread(0);
306 	g_teardown_done = false;
307 	spdk_bdev_close(g_desc);
308 	g_desc = NULL;
309 	unregister_bdev(&g_bdev);
310 	spdk_io_device_unregister(&g_io_device, NULL);
311 	spdk_bdev_finish(finish_cb, NULL);
312 	poll_threads();
313 	memset(&g_bdev, 0, sizeof(g_bdev));
314 	CU_ASSERT(g_teardown_done == true);
315 	g_teardown_done = false;
316 	free_threads();
317 	free_cores();
318 }
319 
320 static uint32_t
321 bdev_io_tailq_cnt(bdev_io_tailq_t *tailq)
322 {
323 	struct spdk_bdev_io *io;
324 	uint32_t cnt = 0;
325 
326 	TAILQ_FOREACH(io, tailq, internal.link) {
327 		cnt++;
328 	}
329 
330 	return cnt;
331 }
332 
333 static void
334 basic(void)
335 {
336 	g_init_complete_called = false;
337 	setup_test();
338 	CU_ASSERT(g_init_complete_called == true);
339 
340 	set_thread(0);
341 
342 	g_get_io_channel = false;
343 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
344 	CU_ASSERT(g_ut_threads[0].ch == NULL);
345 
346 	g_get_io_channel = true;
347 	g_create_ch = false;
348 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
349 	CU_ASSERT(g_ut_threads[0].ch == NULL);
350 
351 	g_get_io_channel = true;
352 	g_create_ch = true;
353 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
354 	CU_ASSERT(g_ut_threads[0].ch != NULL);
355 	spdk_put_io_channel(g_ut_threads[0].ch);
356 
357 	g_fini_start_called = false;
358 	teardown_test();
359 	CU_ASSERT(g_fini_start_called == true);
360 }
361 
362 static void
363 _bdev_unregistered(void *done, int rc)
364 {
365 	CU_ASSERT(rc == 0);
366 	*(bool *)done = true;
367 }
368 
369 static void
370 unregister_and_close(void)
371 {
372 	bool done, remove_notify;
373 	struct spdk_bdev_desc *desc = NULL;
374 
375 	setup_test();
376 	set_thread(0);
377 
378 	/* setup_test() automatically opens the bdev,
379 	 * but this test needs to do that in a different
380 	 * way. */
381 	spdk_bdev_close(g_desc);
382 	poll_threads();
383 
384 	/* Try hotremoving a bdev with descriptors which don't provide
385 	 * any context to the notification callback */
386 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &desc);
387 	SPDK_CU_ASSERT_FATAL(desc != NULL);
388 
389 	/* There is an open descriptor on the device. Unregister it,
390 	 * which can't proceed until the descriptor is closed. */
391 	done = false;
392 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
393 
394 	/* Poll the threads to allow all events to be processed */
395 	poll_threads();
396 
397 	/* Make sure the bdev was not unregistered. We still have a
398 	 * descriptor open */
399 	CU_ASSERT(done == false);
400 
401 	spdk_bdev_close(desc);
402 	poll_threads();
403 	desc = NULL;
404 
405 	/* The unregister should have completed */
406 	CU_ASSERT(done == true);
407 
408 
409 	/* Register the bdev again */
410 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
411 
412 	remove_notify = false;
413 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, &remove_notify, &desc);
414 	SPDK_CU_ASSERT_FATAL(desc != NULL);
415 	CU_ASSERT(remove_notify == false);
416 
417 	/* There is an open descriptor on the device. Unregister it,
418 	 * which can't proceed until the descriptor is closed. */
419 	done = false;
420 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
421 	/* No polling has occurred, so neither of these should execute */
422 	CU_ASSERT(remove_notify == false);
423 	CU_ASSERT(done == false);
424 
425 	/* Prior to the unregister completing, close the descriptor */
426 	spdk_bdev_close(desc);
427 
428 	/* Poll the threads to allow all events to be processed */
429 	poll_threads();
430 
431 	/* Remove notify should not have been called because the
432 	 * descriptor is already closed. */
433 	CU_ASSERT(remove_notify == false);
434 
435 	/* The unregister should have completed */
436 	CU_ASSERT(done == true);
437 
438 	/* Restore the original g_bdev so that we can use teardown_test(). */
439 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
440 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
441 	teardown_test();
442 }
443 
444 static void
445 reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
446 {
447 	bool *done = cb_arg;
448 
449 	CU_ASSERT(success == true);
450 	*done = true;
451 	spdk_bdev_free_io(bdev_io);
452 }
453 
454 static void
455 put_channel_during_reset(void)
456 {
457 	struct spdk_io_channel *io_ch;
458 	bool done = false;
459 
460 	setup_test();
461 
462 	set_thread(0);
463 	io_ch = spdk_bdev_get_io_channel(g_desc);
464 	CU_ASSERT(io_ch != NULL);
465 
466 	/*
467 	 * Start a reset, but then put the I/O channel before
468 	 *  the deferred messages for the reset get a chance to
469 	 *  execute.
470 	 */
471 	spdk_bdev_reset(g_desc, io_ch, reset_done, &done);
472 	spdk_put_io_channel(io_ch);
473 	poll_threads();
474 	stub_complete_io(g_bdev.io_target, 0);
475 
476 	teardown_test();
477 }
478 
479 static void
480 aborted_reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
481 {
482 	enum spdk_bdev_io_status *status = cb_arg;
483 
484 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
485 	spdk_bdev_free_io(bdev_io);
486 }
487 
488 static void
489 aborted_reset(void)
490 {
491 	struct spdk_io_channel *io_ch[2];
492 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
493 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
494 
495 	setup_test();
496 
497 	set_thread(0);
498 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
499 	CU_ASSERT(io_ch[0] != NULL);
500 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
501 	poll_threads();
502 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
503 
504 	/*
505 	 * First reset has been submitted on ch0.  Now submit a second
506 	 *  reset on ch1 which will get queued since there is already a
507 	 *  reset in progress.
508 	 */
509 	set_thread(1);
510 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
511 	CU_ASSERT(io_ch[1] != NULL);
512 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
513 	poll_threads();
514 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
515 
516 	/*
517 	 * Now destroy ch1.  This will abort the queued reset.  Check that
518 	 *  the second reset was completed with failed status.  Also check
519 	 *  that bdev->internal.reset_in_progress != NULL, since the
520 	 *  original reset has not been completed yet.  This ensures that
521 	 *  the bdev code is correctly noticing that the failed reset is
522 	 *  *not* the one that had been submitted to the bdev module.
523 	 */
524 	set_thread(1);
525 	spdk_put_io_channel(io_ch[1]);
526 	poll_threads();
527 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_FAILED);
528 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
529 
530 	/*
531 	 * Now complete the first reset, verify that it completed with SUCCESS
532 	 *  status and that bdev->internal.reset_in_progress is also set back to NULL.
533 	 */
534 	set_thread(0);
535 	spdk_put_io_channel(io_ch[0]);
536 	stub_complete_io(g_bdev.io_target, 0);
537 	poll_threads();
538 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
539 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
540 
541 	teardown_test();
542 }
543 
544 static void
545 io_during_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
546 {
547 	enum spdk_bdev_io_status *status = cb_arg;
548 
549 	*status = bdev_io->internal.status;
550 	spdk_bdev_free_io(bdev_io);
551 }
552 
553 static void
554 io_during_reset(void)
555 {
556 	struct spdk_io_channel *io_ch[2];
557 	struct spdk_bdev_channel *bdev_ch[2];
558 	enum spdk_bdev_io_status status0, status1, status_reset;
559 	int rc;
560 
561 	setup_test();
562 
563 	/*
564 	 * First test normal case - submit an I/O on each of two channels (with no resets)
565 	 *  and verify they complete successfully.
566 	 */
567 	set_thread(0);
568 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
569 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
570 	CU_ASSERT(bdev_ch[0]->flags == 0);
571 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
572 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
573 	CU_ASSERT(rc == 0);
574 
575 	set_thread(1);
576 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
577 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
578 	CU_ASSERT(bdev_ch[1]->flags == 0);
579 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
580 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
581 	CU_ASSERT(rc == 0);
582 
583 	poll_threads();
584 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
585 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
586 
587 	set_thread(0);
588 	stub_complete_io(g_bdev.io_target, 0);
589 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
590 
591 	set_thread(1);
592 	stub_complete_io(g_bdev.io_target, 0);
593 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
594 
595 	/*
596 	 * Now submit a reset, and leave it pending while we submit I/O on two different
597 	 *  channels.  These I/O should be failed by the bdev layer since the reset is in
598 	 *  progress.
599 	 */
600 	set_thread(0);
601 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
602 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &status_reset);
603 	CU_ASSERT(rc == 0);
604 
605 	CU_ASSERT(bdev_ch[0]->flags == 0);
606 	CU_ASSERT(bdev_ch[1]->flags == 0);
607 	poll_threads();
608 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_RESET_IN_PROGRESS);
609 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_RESET_IN_PROGRESS);
610 
611 	set_thread(0);
612 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
613 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
614 	CU_ASSERT(rc == 0);
615 
616 	set_thread(1);
617 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
618 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
619 	CU_ASSERT(rc == 0);
620 
621 	/*
622 	 * A reset is in progress so these read I/O should complete with aborted.  Note that we
623 	 *  need to poll_threads() since I/O completed inline have their completion deferred.
624 	 */
625 	poll_threads();
626 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
627 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
628 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
629 
630 	/*
631 	 * Complete the reset
632 	 */
633 	set_thread(0);
634 	stub_complete_io(g_bdev.io_target, 0);
635 
636 	/*
637 	 * Only poll thread 0. We should not get a completion.
638 	 */
639 	poll_thread(0);
640 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
641 
642 	/*
643 	 * Poll both thread 0 and 1 so the messages can propagate and we
644 	 * get a completion.
645 	 */
646 	poll_threads();
647 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
648 
649 	spdk_put_io_channel(io_ch[0]);
650 	set_thread(1);
651 	spdk_put_io_channel(io_ch[1]);
652 	poll_threads();
653 
654 	teardown_test();
655 }
656 
657 static void
658 basic_qos(void)
659 {
660 	struct spdk_io_channel *io_ch[2];
661 	struct spdk_bdev_channel *bdev_ch[2];
662 	struct spdk_bdev *bdev;
663 	enum spdk_bdev_io_status status, abort_status;
664 	int rc;
665 
666 	setup_test();
667 
668 	/* Enable QoS */
669 	bdev = &g_bdev.bdev;
670 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
671 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
672 	TAILQ_INIT(&bdev->internal.qos->queued);
673 	/*
674 	 * Enable read/write IOPS, read only byte per second and
675 	 * read/write byte per second rate limits.
676 	 * In this case, all rate limits will take equal effect.
677 	 */
678 	/* 2000 read/write I/O per second, or 2 per millisecond */
679 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
680 	/* 8K read/write byte per millisecond with 4K block size */
681 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
682 	/* 8K read only byte per millisecond with 4K block size */
683 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 8192000;
684 
685 	g_get_io_channel = true;
686 
687 	set_thread(0);
688 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
689 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
690 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
691 
692 	set_thread(1);
693 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
694 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
695 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
696 
697 	/*
698 	 * Send an I/O on thread 0, which is where the QoS thread is running.
699 	 */
700 	set_thread(0);
701 	status = SPDK_BDEV_IO_STATUS_PENDING;
702 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
703 	CU_ASSERT(rc == 0);
704 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
705 	poll_threads();
706 	stub_complete_io(g_bdev.io_target, 0);
707 	poll_threads();
708 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
709 
710 	/* Send an I/O on thread 1. The QoS thread is not running here. */
711 	status = SPDK_BDEV_IO_STATUS_PENDING;
712 	set_thread(1);
713 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
714 	CU_ASSERT(rc == 0);
715 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
716 	poll_threads();
717 	/* Complete I/O on thread 1. This should not complete the I/O we submitted */
718 	stub_complete_io(g_bdev.io_target, 0);
719 	poll_threads();
720 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
721 	/* Now complete I/O on thread 0 */
722 	set_thread(0);
723 	poll_threads();
724 	stub_complete_io(g_bdev.io_target, 0);
725 	poll_threads();
726 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
727 
728 	/* Reset rate limit for the next test cases. */
729 	spdk_delay_us(SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
730 	poll_threads();
731 
732 	/*
733 	 * Test abort request when QoS is enabled.
734 	 */
735 
736 	/* Send an I/O on thread 0, which is where the QoS thread is running. */
737 	set_thread(0);
738 	status = SPDK_BDEV_IO_STATUS_PENDING;
739 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
740 	CU_ASSERT(rc == 0);
741 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
742 	/* Send an abort to the I/O on the same thread. */
743 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
744 	rc = spdk_bdev_abort(g_desc, io_ch[0], &status, io_during_io_done, &abort_status);
745 	CU_ASSERT(rc == 0);
746 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
747 	poll_threads();
748 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
749 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
750 
751 	/* Send an I/O on thread 1. The QoS thread is not running here. */
752 	status = SPDK_BDEV_IO_STATUS_PENDING;
753 	set_thread(1);
754 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
755 	CU_ASSERT(rc == 0);
756 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
757 	poll_threads();
758 	/* Send an abort to the I/O on the same thread. */
759 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
760 	rc = spdk_bdev_abort(g_desc, io_ch[1], &status, io_during_io_done, &abort_status);
761 	CU_ASSERT(rc == 0);
762 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
763 	poll_threads();
764 	/* Complete the I/O with failure and the abort with success on thread 1. */
765 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
766 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
767 
768 	set_thread(0);
769 
770 	/*
771 	 * Close the descriptor only, which should stop the qos channel as
772 	 * the last descriptor removed.
773 	 */
774 	spdk_bdev_close(g_desc);
775 	poll_threads();
776 	CU_ASSERT(bdev->internal.qos->ch == NULL);
777 
778 	/*
779 	 * Open the bdev again which shall setup the qos channel as the
780 	 * channels are valid.
781 	 */
782 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
783 	poll_threads();
784 	CU_ASSERT(bdev->internal.qos->ch != NULL);
785 
786 	/* Tear down the channels */
787 	set_thread(0);
788 	spdk_put_io_channel(io_ch[0]);
789 	set_thread(1);
790 	spdk_put_io_channel(io_ch[1]);
791 	poll_threads();
792 	set_thread(0);
793 
794 	/* Close the descriptor, which should stop the qos channel */
795 	spdk_bdev_close(g_desc);
796 	poll_threads();
797 	CU_ASSERT(bdev->internal.qos->ch == NULL);
798 
799 	/* Open the bdev again, no qos channel setup without valid channels. */
800 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
801 	poll_threads();
802 	CU_ASSERT(bdev->internal.qos->ch == NULL);
803 
804 	/* Create the channels in reverse order. */
805 	set_thread(1);
806 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
807 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
808 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
809 
810 	set_thread(0);
811 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
812 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
813 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
814 
815 	/* Confirm that the qos thread is now thread 1 */
816 	CU_ASSERT(bdev->internal.qos->ch == bdev_ch[1]);
817 
818 	/* Tear down the channels */
819 	set_thread(0);
820 	spdk_put_io_channel(io_ch[0]);
821 	set_thread(1);
822 	spdk_put_io_channel(io_ch[1]);
823 	poll_threads();
824 
825 	set_thread(0);
826 
827 	teardown_test();
828 }
829 
830 static void
831 io_during_qos_queue(void)
832 {
833 	struct spdk_io_channel *io_ch[2];
834 	struct spdk_bdev_channel *bdev_ch[2];
835 	struct spdk_bdev *bdev;
836 	enum spdk_bdev_io_status status0, status1, status2;
837 	int rc;
838 
839 	setup_test();
840 	MOCK_SET(spdk_get_ticks, 0);
841 
842 	/* Enable QoS */
843 	bdev = &g_bdev.bdev;
844 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
845 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
846 	TAILQ_INIT(&bdev->internal.qos->queued);
847 	/*
848 	 * Enable read/write IOPS, read only byte per sec, write only
849 	 * byte per sec and read/write byte per sec rate limits.
850 	 * In this case, both read only and write only byte per sec
851 	 * rate limit will take effect.
852 	 */
853 	/* 4000 read/write I/O per second, or 4 per millisecond */
854 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 4000;
855 	/* 8K byte per millisecond with 4K block size */
856 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
857 	/* 4K byte per millisecond with 4K block size */
858 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 4096000;
859 	/* 4K byte per millisecond with 4K block size */
860 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 4096000;
861 
862 	g_get_io_channel = true;
863 
864 	/* Create channels */
865 	set_thread(0);
866 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
867 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
868 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
869 
870 	set_thread(1);
871 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
872 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
873 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
874 
875 	/* Send two read I/Os */
876 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
877 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
878 	CU_ASSERT(rc == 0);
879 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
880 	set_thread(0);
881 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
882 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
883 	CU_ASSERT(rc == 0);
884 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
885 	/* Send one write I/O */
886 	status2 = SPDK_BDEV_IO_STATUS_PENDING;
887 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status2);
888 	CU_ASSERT(rc == 0);
889 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_PENDING);
890 
891 	/* Complete any I/O that arrived at the disk */
892 	poll_threads();
893 	set_thread(1);
894 	stub_complete_io(g_bdev.io_target, 0);
895 	set_thread(0);
896 	stub_complete_io(g_bdev.io_target, 0);
897 	poll_threads();
898 
899 	/* Only one of the two read I/Os should complete. (logical XOR) */
900 	if (status0 == SPDK_BDEV_IO_STATUS_SUCCESS) {
901 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
902 	} else {
903 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
904 	}
905 	/* The write I/O should complete. */
906 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
907 
908 	/* Advance in time by a millisecond */
909 	spdk_delay_us(1000);
910 
911 	/* Complete more I/O */
912 	poll_threads();
913 	set_thread(1);
914 	stub_complete_io(g_bdev.io_target, 0);
915 	set_thread(0);
916 	stub_complete_io(g_bdev.io_target, 0);
917 	poll_threads();
918 
919 	/* Now the second read I/O should be done */
920 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
921 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
922 
923 	/* Tear down the channels */
924 	set_thread(1);
925 	spdk_put_io_channel(io_ch[1]);
926 	set_thread(0);
927 	spdk_put_io_channel(io_ch[0]);
928 	poll_threads();
929 
930 	teardown_test();
931 }
932 
933 static void
934 io_during_qos_reset(void)
935 {
936 	struct spdk_io_channel *io_ch[2];
937 	struct spdk_bdev_channel *bdev_ch[2];
938 	struct spdk_bdev *bdev;
939 	enum spdk_bdev_io_status status0, status1, reset_status;
940 	int rc;
941 
942 	setup_test();
943 	MOCK_SET(spdk_get_ticks, 0);
944 
945 	/* Enable QoS */
946 	bdev = &g_bdev.bdev;
947 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
948 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
949 	TAILQ_INIT(&bdev->internal.qos->queued);
950 	/*
951 	 * Enable read/write IOPS, write only byte per sec and
952 	 * read/write byte per second rate limits.
953 	 * In this case, read/write byte per second rate limit will
954 	 * take effect first.
955 	 */
956 	/* 2000 read/write I/O per second, or 2 per millisecond */
957 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
958 	/* 4K byte per millisecond with 4K block size */
959 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 4096000;
960 	/* 8K byte per millisecond with 4K block size */
961 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 8192000;
962 
963 	g_get_io_channel = true;
964 
965 	/* Create channels */
966 	set_thread(0);
967 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
968 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
969 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
970 
971 	set_thread(1);
972 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
973 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
974 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
975 
976 	/* Send two I/O. One of these gets queued by QoS. The other is sitting at the disk. */
977 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
978 	rc = spdk_bdev_write_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
979 	CU_ASSERT(rc == 0);
980 	set_thread(0);
981 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
982 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
983 	CU_ASSERT(rc == 0);
984 
985 	poll_threads();
986 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
987 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
988 
989 	/* Reset the bdev. */
990 	reset_status = SPDK_BDEV_IO_STATUS_PENDING;
991 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &reset_status);
992 	CU_ASSERT(rc == 0);
993 
994 	/* Complete any I/O that arrived at the disk */
995 	poll_threads();
996 	set_thread(1);
997 	stub_complete_io(g_bdev.io_target, 0);
998 	set_thread(0);
999 	stub_complete_io(g_bdev.io_target, 0);
1000 	poll_threads();
1001 
1002 	CU_ASSERT(reset_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1003 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
1004 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
1005 
1006 	/* Tear down the channels */
1007 	set_thread(1);
1008 	spdk_put_io_channel(io_ch[1]);
1009 	set_thread(0);
1010 	spdk_put_io_channel(io_ch[0]);
1011 	poll_threads();
1012 
1013 	teardown_test();
1014 }
1015 
1016 static void
1017 enomem_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1018 {
1019 	enum spdk_bdev_io_status *status = cb_arg;
1020 
1021 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1022 	spdk_bdev_free_io(bdev_io);
1023 }
1024 
1025 static void
1026 enomem(void)
1027 {
1028 	struct spdk_io_channel *io_ch;
1029 	struct spdk_bdev_channel *bdev_ch;
1030 	struct spdk_bdev_shared_resource *shared_resource;
1031 	struct ut_bdev_channel *ut_ch;
1032 	const uint32_t IO_ARRAY_SIZE = 64;
1033 	const uint32_t AVAIL = 20;
1034 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE], status_reset;
1035 	uint32_t nomem_cnt, i;
1036 	struct spdk_bdev_io *first_io;
1037 	int rc;
1038 
1039 	setup_test();
1040 
1041 	set_thread(0);
1042 	io_ch = spdk_bdev_get_io_channel(g_desc);
1043 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1044 	shared_resource = bdev_ch->shared_resource;
1045 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1046 	ut_ch->avail_cnt = AVAIL;
1047 
1048 	/* First submit a number of IOs equal to what the channel can support. */
1049 	for (i = 0; i < AVAIL; i++) {
1050 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1051 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1052 		CU_ASSERT(rc == 0);
1053 	}
1054 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1055 
1056 	/*
1057 	 * Next, submit one additional I/O.  This one should fail with ENOMEM and then go onto
1058 	 *  the enomem_io list.
1059 	 */
1060 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1061 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1062 	CU_ASSERT(rc == 0);
1063 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1064 	first_io = TAILQ_FIRST(&shared_resource->nomem_io);
1065 
1066 	/*
1067 	 * Now submit a bunch more I/O.  These should all fail with ENOMEM and get queued behind
1068 	 *  the first_io above.
1069 	 */
1070 	for (i = AVAIL + 1; i < IO_ARRAY_SIZE; i++) {
1071 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1072 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1073 		CU_ASSERT(rc == 0);
1074 	}
1075 
1076 	/* Assert that first_io is still at the head of the list. */
1077 	CU_ASSERT(TAILQ_FIRST(&shared_resource->nomem_io) == first_io);
1078 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == (IO_ARRAY_SIZE - AVAIL));
1079 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1080 	CU_ASSERT(shared_resource->nomem_threshold == (AVAIL - NOMEM_THRESHOLD_COUNT));
1081 
1082 	/*
1083 	 * Complete 1 I/O only.  The key check here is bdev_io_tailq_cnt - this should not have
1084 	 *  changed since completing just 1 I/O should not trigger retrying the queued nomem_io
1085 	 *  list.
1086 	 */
1087 	stub_complete_io(g_bdev.io_target, 1);
1088 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1089 
1090 	/*
1091 	 * Complete enough I/O to hit the nomem_theshold.  This should trigger retrying nomem_io,
1092 	 *  and we should see I/O get resubmitted to the test bdev module.
1093 	 */
1094 	stub_complete_io(g_bdev.io_target, NOMEM_THRESHOLD_COUNT - 1);
1095 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) < nomem_cnt);
1096 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1097 
1098 	/* Complete 1 I/O only.  This should not trigger retrying the queued nomem_io. */
1099 	stub_complete_io(g_bdev.io_target, 1);
1100 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1101 
1102 	/*
1103 	 * Send a reset and confirm that all I/O are completed, including the ones that
1104 	 *  were queued on the nomem_io list.
1105 	 */
1106 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
1107 	rc = spdk_bdev_reset(g_desc, io_ch, enomem_done, &status_reset);
1108 	poll_threads();
1109 	CU_ASSERT(rc == 0);
1110 	/* This will complete the reset. */
1111 	stub_complete_io(g_bdev.io_target, 0);
1112 
1113 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == 0);
1114 	CU_ASSERT(shared_resource->io_outstanding == 0);
1115 
1116 	spdk_put_io_channel(io_ch);
1117 	poll_threads();
1118 	teardown_test();
1119 }
1120 
1121 static void
1122 enomem_multi_bdev(void)
1123 {
1124 	struct spdk_io_channel *io_ch;
1125 	struct spdk_bdev_channel *bdev_ch;
1126 	struct spdk_bdev_shared_resource *shared_resource;
1127 	struct ut_bdev_channel *ut_ch;
1128 	const uint32_t IO_ARRAY_SIZE = 64;
1129 	const uint32_t AVAIL = 20;
1130 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1131 	uint32_t i;
1132 	struct ut_bdev *second_bdev;
1133 	struct spdk_bdev_desc *second_desc = NULL;
1134 	struct spdk_bdev_channel *second_bdev_ch;
1135 	struct spdk_io_channel *second_ch;
1136 	int rc;
1137 
1138 	setup_test();
1139 
1140 	/* Register second bdev with the same io_target  */
1141 	second_bdev = calloc(1, sizeof(*second_bdev));
1142 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1143 	register_bdev(second_bdev, "ut_bdev2", g_bdev.io_target);
1144 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1145 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1146 
1147 	set_thread(0);
1148 	io_ch = spdk_bdev_get_io_channel(g_desc);
1149 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1150 	shared_resource = bdev_ch->shared_resource;
1151 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1152 	ut_ch->avail_cnt = AVAIL;
1153 
1154 	second_ch = spdk_bdev_get_io_channel(second_desc);
1155 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1156 	SPDK_CU_ASSERT_FATAL(shared_resource == second_bdev_ch->shared_resource);
1157 
1158 	/* Saturate io_target through bdev A. */
1159 	for (i = 0; i < AVAIL; i++) {
1160 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1161 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1162 		CU_ASSERT(rc == 0);
1163 	}
1164 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1165 
1166 	/*
1167 	 * Now submit I/O through the second bdev. This should fail with ENOMEM
1168 	 * and then go onto the nomem_io list.
1169 	 */
1170 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1171 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1172 	CU_ASSERT(rc == 0);
1173 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1174 
1175 	/* Complete first bdev's I/O. This should retry sending second bdev's nomem_io */
1176 	stub_complete_io(g_bdev.io_target, AVAIL);
1177 
1178 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1179 	CU_ASSERT(shared_resource->io_outstanding == 1);
1180 
1181 	/* Now complete our retried I/O  */
1182 	stub_complete_io(g_bdev.io_target, 1);
1183 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1184 
1185 	spdk_put_io_channel(io_ch);
1186 	spdk_put_io_channel(second_ch);
1187 	spdk_bdev_close(second_desc);
1188 	unregister_bdev(second_bdev);
1189 	poll_threads();
1190 	free(second_bdev);
1191 	teardown_test();
1192 }
1193 
1194 
1195 static void
1196 enomem_multi_io_target(void)
1197 {
1198 	struct spdk_io_channel *io_ch;
1199 	struct spdk_bdev_channel *bdev_ch;
1200 	struct ut_bdev_channel *ut_ch;
1201 	const uint32_t IO_ARRAY_SIZE = 64;
1202 	const uint32_t AVAIL = 20;
1203 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1204 	uint32_t i;
1205 	int new_io_device;
1206 	struct ut_bdev *second_bdev;
1207 	struct spdk_bdev_desc *second_desc = NULL;
1208 	struct spdk_bdev_channel *second_bdev_ch;
1209 	struct spdk_io_channel *second_ch;
1210 	int rc;
1211 
1212 	setup_test();
1213 
1214 	/* Create new io_target and a second bdev using it */
1215 	spdk_io_device_register(&new_io_device, stub_create_ch, stub_destroy_ch,
1216 				sizeof(struct ut_bdev_channel), NULL);
1217 	second_bdev = calloc(1, sizeof(*second_bdev));
1218 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1219 	register_bdev(second_bdev, "ut_bdev2", &new_io_device);
1220 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1221 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1222 
1223 	set_thread(0);
1224 	io_ch = spdk_bdev_get_io_channel(g_desc);
1225 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1226 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1227 	ut_ch->avail_cnt = AVAIL;
1228 
1229 	/* Different io_target should imply a different shared_resource */
1230 	second_ch = spdk_bdev_get_io_channel(second_desc);
1231 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1232 	SPDK_CU_ASSERT_FATAL(bdev_ch->shared_resource != second_bdev_ch->shared_resource);
1233 
1234 	/* Saturate io_target through bdev A. */
1235 	for (i = 0; i < AVAIL; i++) {
1236 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1237 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1238 		CU_ASSERT(rc == 0);
1239 	}
1240 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1241 
1242 	/* Issue one more I/O to fill ENOMEM list. */
1243 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1244 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1245 	CU_ASSERT(rc == 0);
1246 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1247 
1248 	/*
1249 	 * Now submit I/O through the second bdev. This should go through and complete
1250 	 * successfully because we're using a different io_device underneath.
1251 	 */
1252 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1253 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1254 	CU_ASSERT(rc == 0);
1255 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&second_bdev_ch->shared_resource->nomem_io));
1256 	stub_complete_io(second_bdev->io_target, 1);
1257 
1258 	/* Cleanup; Complete outstanding I/O. */
1259 	stub_complete_io(g_bdev.io_target, AVAIL);
1260 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1261 	/* Complete the ENOMEM I/O */
1262 	stub_complete_io(g_bdev.io_target, 1);
1263 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1264 
1265 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1266 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1267 	spdk_put_io_channel(io_ch);
1268 	spdk_put_io_channel(second_ch);
1269 	spdk_bdev_close(second_desc);
1270 	unregister_bdev(second_bdev);
1271 	spdk_io_device_unregister(&new_io_device, NULL);
1272 	poll_threads();
1273 	free(second_bdev);
1274 	teardown_test();
1275 }
1276 
1277 static void
1278 qos_dynamic_enable_done(void *cb_arg, int status)
1279 {
1280 	int *rc = cb_arg;
1281 	*rc = status;
1282 }
1283 
1284 static void
1285 qos_dynamic_enable(void)
1286 {
1287 	struct spdk_io_channel *io_ch[2];
1288 	struct spdk_bdev_channel *bdev_ch[2];
1289 	struct spdk_bdev *bdev;
1290 	enum spdk_bdev_io_status bdev_io_status[2];
1291 	uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
1292 	int status, second_status, rc, i;
1293 
1294 	setup_test();
1295 	MOCK_SET(spdk_get_ticks, 0);
1296 
1297 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1298 		limits[i] = UINT64_MAX;
1299 	}
1300 
1301 	bdev = &g_bdev.bdev;
1302 
1303 	g_get_io_channel = true;
1304 
1305 	/* Create channels */
1306 	set_thread(0);
1307 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1308 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1309 	CU_ASSERT(bdev_ch[0]->flags == 0);
1310 
1311 	set_thread(1);
1312 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1313 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1314 	CU_ASSERT(bdev_ch[1]->flags == 0);
1315 
1316 	set_thread(0);
1317 
1318 	/*
1319 	 * Enable QoS: Read/Write IOPS, Read/Write byte,
1320 	 * Read only byte and Write only byte per second
1321 	 * rate limits.
1322 	 * More than 10 I/Os allowed per timeslice.
1323 	 */
1324 	status = -1;
1325 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1326 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 100;
1327 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 100;
1328 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 10;
1329 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1330 	poll_threads();
1331 	CU_ASSERT(status == 0);
1332 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1333 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1334 
1335 	/*
1336 	 * Submit and complete 10 I/O to fill the QoS allotment for this timeslice.
1337 	 * Additional I/O will then be queued.
1338 	 */
1339 	set_thread(0);
1340 	for (i = 0; i < 10; i++) {
1341 		bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1342 		rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1343 		CU_ASSERT(rc == 0);
1344 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1345 		poll_thread(0);
1346 		stub_complete_io(g_bdev.io_target, 0);
1347 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1348 	}
1349 
1350 	/*
1351 	 * Send two more I/O.  These I/O will be queued since the current timeslice allotment has been
1352 	 * filled already.  We want to test that when QoS is disabled that these two I/O:
1353 	 *  1) are not aborted
1354 	 *  2) are sent back to their original thread for resubmission
1355 	 */
1356 	bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1357 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1358 	CU_ASSERT(rc == 0);
1359 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1360 	set_thread(1);
1361 	bdev_io_status[1] = SPDK_BDEV_IO_STATUS_PENDING;
1362 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &bdev_io_status[1]);
1363 	CU_ASSERT(rc == 0);
1364 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1365 	poll_threads();
1366 
1367 	/*
1368 	 * Disable QoS: Read/Write IOPS, Read/Write byte,
1369 	 * Read only byte rate limits
1370 	 */
1371 	status = -1;
1372 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1373 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 0;
1374 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 0;
1375 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1376 	poll_threads();
1377 	CU_ASSERT(status == 0);
1378 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1379 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1380 
1381 	/* Disable QoS: Write only Byte per second rate limit */
1382 	status = -1;
1383 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 0;
1384 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1385 	poll_threads();
1386 	CU_ASSERT(status == 0);
1387 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1388 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1389 
1390 	/*
1391 	 * All I/O should have been resubmitted back on their original thread.  Complete
1392 	 *  all I/O on thread 0, and ensure that only the thread 0 I/O was completed.
1393 	 */
1394 	set_thread(0);
1395 	stub_complete_io(g_bdev.io_target, 0);
1396 	poll_threads();
1397 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1398 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1399 
1400 	/* Now complete all I/O on thread 1 and ensure the thread 1 I/O was completed. */
1401 	set_thread(1);
1402 	stub_complete_io(g_bdev.io_target, 0);
1403 	poll_threads();
1404 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_SUCCESS);
1405 
1406 	/* Disable QoS again */
1407 	status = -1;
1408 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1409 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1410 	poll_threads();
1411 	CU_ASSERT(status == 0); /* This should succeed */
1412 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1413 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1414 
1415 	/* Enable QoS on thread 0 */
1416 	status = -1;
1417 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1418 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1419 	poll_threads();
1420 	CU_ASSERT(status == 0);
1421 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1422 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1423 
1424 	/* Disable QoS on thread 1 */
1425 	set_thread(1);
1426 	status = -1;
1427 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1428 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1429 	/* Don't poll yet. This should leave the channels with QoS enabled */
1430 	CU_ASSERT(status == -1);
1431 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1432 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1433 
1434 	/* Enable QoS. This should immediately fail because the previous disable QoS hasn't completed. */
1435 	second_status = 0;
1436 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 10;
1437 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &second_status);
1438 	poll_threads();
1439 	CU_ASSERT(status == 0); /* The disable should succeed */
1440 	CU_ASSERT(second_status < 0); /* The enable should fail */
1441 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1442 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1443 
1444 	/* Enable QoS on thread 1. This should succeed now that the disable has completed. */
1445 	status = -1;
1446 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1447 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1448 	poll_threads();
1449 	CU_ASSERT(status == 0);
1450 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1451 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1452 
1453 	/* Tear down the channels */
1454 	set_thread(0);
1455 	spdk_put_io_channel(io_ch[0]);
1456 	set_thread(1);
1457 	spdk_put_io_channel(io_ch[1]);
1458 	poll_threads();
1459 
1460 	set_thread(0);
1461 	teardown_test();
1462 }
1463 
1464 static void
1465 histogram_status_cb(void *cb_arg, int status)
1466 {
1467 	g_status = status;
1468 }
1469 
1470 static void
1471 histogram_data_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
1472 {
1473 	g_status = status;
1474 	g_histogram = histogram;
1475 }
1476 
1477 static void
1478 histogram_io_count(void *ctx, uint64_t start, uint64_t end, uint64_t count,
1479 		   uint64_t total, uint64_t so_far)
1480 {
1481 	g_count += count;
1482 }
1483 
1484 static void
1485 bdev_histograms_mt(void)
1486 {
1487 	struct spdk_io_channel *ch[2];
1488 	struct spdk_histogram_data *histogram;
1489 	uint8_t buf[4096];
1490 	int status = false;
1491 	int rc;
1492 
1493 
1494 	setup_test();
1495 
1496 	set_thread(0);
1497 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1498 	CU_ASSERT(ch[0] != NULL);
1499 
1500 	set_thread(1);
1501 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1502 	CU_ASSERT(ch[1] != NULL);
1503 
1504 
1505 	/* Enable histogram */
1506 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, true);
1507 	poll_threads();
1508 	CU_ASSERT(g_status == 0);
1509 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1510 
1511 	/* Allocate histogram */
1512 	histogram = spdk_histogram_data_alloc();
1513 
1514 	/* Check if histogram is zeroed */
1515 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1516 	poll_threads();
1517 	CU_ASSERT(g_status == 0);
1518 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1519 
1520 	g_count = 0;
1521 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1522 
1523 	CU_ASSERT(g_count == 0);
1524 
1525 	set_thread(0);
1526 	rc = spdk_bdev_write_blocks(g_desc, ch[0], &buf, 0, 1, io_during_io_done, &status);
1527 	CU_ASSERT(rc == 0);
1528 
1529 	spdk_delay_us(10);
1530 	stub_complete_io(g_bdev.io_target, 1);
1531 	poll_threads();
1532 	CU_ASSERT(status == true);
1533 
1534 
1535 	set_thread(1);
1536 	rc = spdk_bdev_read_blocks(g_desc, ch[1], &buf, 0, 1, io_during_io_done, &status);
1537 	CU_ASSERT(rc == 0);
1538 
1539 	spdk_delay_us(10);
1540 	stub_complete_io(g_bdev.io_target, 1);
1541 	poll_threads();
1542 	CU_ASSERT(status == true);
1543 
1544 	set_thread(0);
1545 
1546 	/* Check if histogram gathered data from all I/O channels */
1547 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1548 	poll_threads();
1549 	CU_ASSERT(g_status == 0);
1550 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1551 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1552 
1553 	g_count = 0;
1554 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1555 	CU_ASSERT(g_count == 2);
1556 
1557 	/* Disable histogram */
1558 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, false);
1559 	poll_threads();
1560 	CU_ASSERT(g_status == 0);
1561 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == false);
1562 
1563 	spdk_histogram_data_free(histogram);
1564 
1565 	/* Tear down the channels */
1566 	set_thread(0);
1567 	spdk_put_io_channel(ch[0]);
1568 	set_thread(1);
1569 	spdk_put_io_channel(ch[1]);
1570 	poll_threads();
1571 	set_thread(0);
1572 	teardown_test();
1573 
1574 }
1575 
1576 struct timeout_io_cb_arg {
1577 	struct iovec iov;
1578 	uint8_t type;
1579 };
1580 
1581 static int
1582 bdev_channel_count_submitted_io(struct spdk_bdev_channel *ch)
1583 {
1584 	struct spdk_bdev_io *bdev_io;
1585 	int n = 0;
1586 
1587 	if (!ch) {
1588 		return -1;
1589 	}
1590 
1591 	TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) {
1592 		n++;
1593 	}
1594 
1595 	return n;
1596 }
1597 
1598 static void
1599 bdev_channel_io_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1600 {
1601 	struct timeout_io_cb_arg *ctx = cb_arg;
1602 
1603 	ctx->type = bdev_io->type;
1604 	ctx->iov.iov_base = bdev_io->iov.iov_base;
1605 	ctx->iov.iov_len = bdev_io->iov.iov_len;
1606 }
1607 
1608 static bool g_io_done;
1609 
1610 static void
1611 io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1612 {
1613 	g_io_done = true;
1614 	spdk_bdev_free_io(bdev_io);
1615 }
1616 
1617 static void
1618 bdev_set_io_timeout_mt(void)
1619 {
1620 	struct spdk_io_channel *ch[3];
1621 	struct spdk_bdev_channel *bdev_ch[3];
1622 	struct timeout_io_cb_arg cb_arg;
1623 
1624 	setup_test();
1625 
1626 	g_bdev.bdev.optimal_io_boundary = 16;
1627 	g_bdev.bdev.split_on_optimal_io_boundary = true;
1628 
1629 	set_thread(0);
1630 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1631 	CU_ASSERT(ch[0] != NULL);
1632 
1633 	set_thread(1);
1634 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1635 	CU_ASSERT(ch[1] != NULL);
1636 
1637 	set_thread(2);
1638 	ch[2] = spdk_bdev_get_io_channel(g_desc);
1639 	CU_ASSERT(ch[2] != NULL);
1640 
1641 	/* Multi-thread mode
1642 	 * 1, Check the poller was registered successfully
1643 	 * 2, Check the timeout IO and ensure the IO was the submitted by user
1644 	 * 3, Check the link int the bdev_ch works right.
1645 	 * 4, Close desc and put io channel during the timeout poller is polling
1646 	 */
1647 
1648 	/* In desc thread set the timeout */
1649 	set_thread(0);
1650 	CU_ASSERT(spdk_bdev_set_timeout(g_desc, 5, bdev_channel_io_timeout_cb, &cb_arg) == 0);
1651 	CU_ASSERT(g_desc->io_timeout_poller != NULL);
1652 	CU_ASSERT(g_desc->cb_fn == bdev_channel_io_timeout_cb);
1653 	CU_ASSERT(g_desc->cb_arg == &cb_arg);
1654 
1655 	/* check the IO submitted list and timeout handler */
1656 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x2000, 0, 1, io_done, NULL) == 0);
1657 	bdev_ch[0] = spdk_io_channel_get_ctx(ch[0]);
1658 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 1);
1659 
1660 	set_thread(1);
1661 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1662 	bdev_ch[1] = spdk_io_channel_get_ctx(ch[1]);
1663 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 1);
1664 
1665 	/* Now test that a single-vector command is split correctly.
1666 	 * Offset 14, length 8, payload 0xF000
1667 	 *  Child - Offset 14, length 2, payload 0xF000
1668 	 *  Child - Offset 16, length 6, payload 0xF000 + 2 * 512
1669 	 *
1670 	 * Set up the expected values before calling spdk_bdev_read_blocks
1671 	 */
1672 	set_thread(2);
1673 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0xF000, 14, 8, io_done, NULL) == 0);
1674 	bdev_ch[2] = spdk_io_channel_get_ctx(ch[2]);
1675 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 3);
1676 
1677 	set_thread(0);
1678 	memset(&cb_arg, 0, sizeof(cb_arg));
1679 	spdk_delay_us(3 * spdk_get_ticks_hz());
1680 	poll_threads();
1681 	CU_ASSERT(cb_arg.type == 0);
1682 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1683 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1684 
1685 	/* Now the time reach the limit */
1686 	spdk_delay_us(3 * spdk_get_ticks_hz());
1687 	poll_thread(0);
1688 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1689 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
1690 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1691 	stub_complete_io(g_bdev.io_target, 1);
1692 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 0);
1693 
1694 	memset(&cb_arg, 0, sizeof(cb_arg));
1695 	set_thread(1);
1696 	poll_thread(1);
1697 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
1698 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x1000);
1699 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1700 	stub_complete_io(g_bdev.io_target, 1);
1701 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 0);
1702 
1703 	memset(&cb_arg, 0, sizeof(cb_arg));
1704 	set_thread(2);
1705 	poll_thread(2);
1706 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1707 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0xF000);
1708 	CU_ASSERT(cb_arg.iov.iov_len == 8 * g_bdev.bdev.blocklen);
1709 	stub_complete_io(g_bdev.io_target, 1);
1710 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 2);
1711 	stub_complete_io(g_bdev.io_target, 1);
1712 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 0);
1713 
1714 	/* Run poll_timeout_done() it means complete the timeout poller */
1715 	set_thread(0);
1716 	poll_thread(0);
1717 	CU_ASSERT(g_desc->refs == 0);
1718 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1719 	set_thread(1);
1720 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x2000, 0, 2, io_done, NULL) == 0);
1721 	set_thread(2);
1722 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0x3000, 0, 3, io_done, NULL) == 0);
1723 
1724 	/* Trigger timeout poller to run again, desc->refs is incremented.
1725 	 * In thread 0 we destroy the io channel before timeout poller runs.
1726 	 * Timeout callback is not called on thread 0.
1727 	 */
1728 	spdk_delay_us(6 * spdk_get_ticks_hz());
1729 	memset(&cb_arg, 0, sizeof(cb_arg));
1730 	set_thread(0);
1731 	stub_complete_io(g_bdev.io_target, 1);
1732 	spdk_put_io_channel(ch[0]);
1733 	poll_thread(0);
1734 	CU_ASSERT(g_desc->refs == 1)
1735 	CU_ASSERT(cb_arg.type == 0);
1736 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1737 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1738 
1739 	/* In thread 1 timeout poller runs then we destroy the io channel
1740 	 * Timeout callback is called on thread 1.
1741 	 */
1742 	memset(&cb_arg, 0, sizeof(cb_arg));
1743 	set_thread(1);
1744 	poll_thread(1);
1745 	stub_complete_io(g_bdev.io_target, 1);
1746 	spdk_put_io_channel(ch[1]);
1747 	poll_thread(1);
1748 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
1749 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
1750 	CU_ASSERT(cb_arg.iov.iov_len == 2 * g_bdev.bdev.blocklen);
1751 
1752 	/* Close the desc.
1753 	 * Unregister the timeout poller first.
1754 	 * Then decrement desc->refs but it's not zero yet so desc is not freed.
1755 	 */
1756 	set_thread(0);
1757 	spdk_bdev_close(g_desc);
1758 	CU_ASSERT(g_desc->refs == 1);
1759 	CU_ASSERT(g_desc->io_timeout_poller == NULL);
1760 
1761 	/* Timeout poller runs on thread 2 then we destroy the io channel.
1762 	 * Desc is closed so we would exit the timeout poller directly.
1763 	 * timeout callback is not called on thread 2.
1764 	 */
1765 	memset(&cb_arg, 0, sizeof(cb_arg));
1766 	set_thread(2);
1767 	poll_thread(2);
1768 	stub_complete_io(g_bdev.io_target, 1);
1769 	spdk_put_io_channel(ch[2]);
1770 	poll_thread(2);
1771 	CU_ASSERT(cb_arg.type == 0);
1772 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1773 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1774 
1775 	set_thread(0);
1776 	poll_thread(0);
1777 	g_teardown_done = false;
1778 	unregister_bdev(&g_bdev);
1779 	spdk_io_device_unregister(&g_io_device, NULL);
1780 	spdk_bdev_finish(finish_cb, NULL);
1781 	poll_threads();
1782 	memset(&g_bdev, 0, sizeof(g_bdev));
1783 	CU_ASSERT(g_teardown_done == true);
1784 	g_teardown_done = false;
1785 	free_threads();
1786 	free_cores();
1787 }
1788 
1789 static bool g_io_done2;
1790 static bool g_lock_lba_range_done;
1791 static bool g_unlock_lba_range_done;
1792 
1793 static void
1794 io_done2(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1795 {
1796 	g_io_done2 = true;
1797 	spdk_bdev_free_io(bdev_io);
1798 }
1799 
1800 static void
1801 lock_lba_range_done(void *ctx, int status)
1802 {
1803 	g_lock_lba_range_done = true;
1804 }
1805 
1806 static void
1807 unlock_lba_range_done(void *ctx, int status)
1808 {
1809 	g_unlock_lba_range_done = true;
1810 }
1811 
1812 static uint32_t
1813 stub_channel_outstanding_cnt(void *io_target)
1814 {
1815 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
1816 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1817 	uint32_t outstanding_cnt;
1818 
1819 	outstanding_cnt = ch->outstanding_cnt;
1820 
1821 	spdk_put_io_channel(_ch);
1822 	return outstanding_cnt;
1823 }
1824 
1825 static void
1826 lock_lba_range_then_submit_io(void)
1827 {
1828 	struct spdk_bdev_desc *desc = NULL;
1829 	void *io_target;
1830 	struct spdk_io_channel *io_ch[3];
1831 	struct spdk_bdev_channel *bdev_ch[3];
1832 	struct lba_range *range;
1833 	char buf[4096];
1834 	int ctx0, ctx1, ctx2;
1835 	int rc;
1836 
1837 	setup_test();
1838 
1839 	io_target = g_bdev.io_target;
1840 	desc = g_desc;
1841 
1842 	set_thread(0);
1843 	io_ch[0] = spdk_bdev_get_io_channel(desc);
1844 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1845 	CU_ASSERT(io_ch[0] != NULL);
1846 
1847 	set_thread(1);
1848 	io_ch[1] = spdk_bdev_get_io_channel(desc);
1849 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1850 	CU_ASSERT(io_ch[1] != NULL);
1851 
1852 	set_thread(0);
1853 	g_lock_lba_range_done = false;
1854 	rc = bdev_lock_lba_range(desc, io_ch[0], 20, 10, lock_lba_range_done, &ctx0);
1855 	CU_ASSERT(rc == 0);
1856 	poll_threads();
1857 
1858 	/* The lock should immediately become valid, since there are no outstanding
1859 	 * write I/O.
1860 	 */
1861 	CU_ASSERT(g_lock_lba_range_done == true);
1862 	range = TAILQ_FIRST(&bdev_ch[0]->locked_ranges);
1863 	SPDK_CU_ASSERT_FATAL(range != NULL);
1864 	CU_ASSERT(range->offset == 20);
1865 	CU_ASSERT(range->length == 10);
1866 	CU_ASSERT(range->owner_ch == bdev_ch[0]);
1867 
1868 	g_io_done = false;
1869 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1870 	rc = spdk_bdev_read_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
1871 	CU_ASSERT(rc == 0);
1872 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1873 
1874 	stub_complete_io(io_target, 1);
1875 	poll_threads();
1876 	CU_ASSERT(g_io_done == true);
1877 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1878 
1879 	/* Try a write I/O.  This should actually be allowed to execute, since the channel
1880 	 * holding the lock is submitting the write I/O.
1881 	 */
1882 	g_io_done = false;
1883 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1884 	rc = spdk_bdev_write_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
1885 	CU_ASSERT(rc == 0);
1886 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1887 
1888 	stub_complete_io(io_target, 1);
1889 	poll_threads();
1890 	CU_ASSERT(g_io_done == true);
1891 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1892 
1893 	/* Try a write I/O.  This should get queued in the io_locked tailq. */
1894 	set_thread(1);
1895 	g_io_done = false;
1896 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1897 	rc = spdk_bdev_write_blocks(desc, io_ch[1], buf, 20, 1, io_done, &ctx1);
1898 	CU_ASSERT(rc == 0);
1899 	poll_threads();
1900 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
1901 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1902 	CU_ASSERT(g_io_done == false);
1903 
1904 	/* Try to unlock the lba range using thread 1's io_ch.  This should fail. */
1905 	rc = bdev_unlock_lba_range(desc, io_ch[1], 20, 10, unlock_lba_range_done, &ctx1);
1906 	CU_ASSERT(rc == -EINVAL);
1907 
1908 	/* Now create a new channel and submit a write I/O with it.  This should also be queued.
1909 	 * The new channel should inherit the active locks from the bdev's internal list.
1910 	 */
1911 	set_thread(2);
1912 	io_ch[2] = spdk_bdev_get_io_channel(desc);
1913 	bdev_ch[2] = spdk_io_channel_get_ctx(io_ch[2]);
1914 	CU_ASSERT(io_ch[2] != NULL);
1915 
1916 	g_io_done2 = false;
1917 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1918 	rc = spdk_bdev_write_blocks(desc, io_ch[2], buf, 22, 2, io_done2, &ctx2);
1919 	CU_ASSERT(rc == 0);
1920 	poll_threads();
1921 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
1922 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1923 	CU_ASSERT(g_io_done2 == false);
1924 
1925 	set_thread(0);
1926 	rc = bdev_unlock_lba_range(desc, io_ch[0], 20, 10, unlock_lba_range_done, &ctx0);
1927 	CU_ASSERT(rc == 0);
1928 	poll_threads();
1929 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->locked_ranges));
1930 
1931 	/* The LBA range is unlocked, so the write IOs should now have started execution. */
1932 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1933 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1934 
1935 	set_thread(1);
1936 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1937 	stub_complete_io(io_target, 1);
1938 	set_thread(2);
1939 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1940 	stub_complete_io(io_target, 1);
1941 
1942 	poll_threads();
1943 	CU_ASSERT(g_io_done == true);
1944 	CU_ASSERT(g_io_done2 == true);
1945 
1946 	/* Tear down the channels */
1947 	set_thread(0);
1948 	spdk_put_io_channel(io_ch[0]);
1949 	set_thread(1);
1950 	spdk_put_io_channel(io_ch[1]);
1951 	set_thread(2);
1952 	spdk_put_io_channel(io_ch[2]);
1953 	poll_threads();
1954 	set_thread(0);
1955 	teardown_test();
1956 }
1957 
1958 int
1959 main(int argc, char **argv)
1960 {
1961 	CU_pSuite	suite = NULL;
1962 	unsigned int	num_failures;
1963 
1964 	CU_set_error_action(CUEA_ABORT);
1965 	CU_initialize_registry();
1966 
1967 	suite = CU_add_suite("bdev", NULL, NULL);
1968 
1969 	CU_ADD_TEST(suite, basic);
1970 	CU_ADD_TEST(suite, unregister_and_close);
1971 	CU_ADD_TEST(suite, basic_qos);
1972 	CU_ADD_TEST(suite, put_channel_during_reset);
1973 	CU_ADD_TEST(suite, aborted_reset);
1974 	CU_ADD_TEST(suite, io_during_reset);
1975 	CU_ADD_TEST(suite, io_during_qos_queue);
1976 	CU_ADD_TEST(suite, io_during_qos_reset);
1977 	CU_ADD_TEST(suite, enomem);
1978 	CU_ADD_TEST(suite, enomem_multi_bdev);
1979 	CU_ADD_TEST(suite, enomem_multi_io_target);
1980 	CU_ADD_TEST(suite, qos_dynamic_enable);
1981 	CU_ADD_TEST(suite, bdev_histograms_mt);
1982 	CU_ADD_TEST(suite, bdev_set_io_timeout_mt);
1983 	CU_ADD_TEST(suite, lock_lba_range_then_submit_io);
1984 
1985 	CU_basic_set_mode(CU_BRM_VERBOSE);
1986 	CU_basic_run_tests();
1987 	num_failures = CU_get_number_of_failures();
1988 	CU_cleanup_registry();
1989 	return num_failures;
1990 }
1991