xref: /spdk/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c (revision b30d57cdad6d2bc75cc1e4e2ebbcebcb0d98dcfa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_cunit.h"
35 
36 #include "common/lib/ut_multithread.c"
37 #include "unit/lib/json_mock.c"
38 
39 #include "spdk/config.h"
40 /* HACK: disable VTune integration so the unit test doesn't need VTune headers and libs to build */
41 #undef SPDK_CONFIG_VTUNE
42 
43 #include "bdev/bdev.c"
44 
45 #define BDEV_UT_NUM_THREADS 3
46 
47 struct spdk_trace_histories *g_trace_histories;
48 DEFINE_STUB_V(spdk_trace_add_register_fn, (struct spdk_trace_register_fn *reg_fn));
49 DEFINE_STUB_V(spdk_trace_register_owner, (uint8_t type, char id_prefix));
50 DEFINE_STUB_V(spdk_trace_register_object, (uint8_t type, char id_prefix));
51 DEFINE_STUB_V(spdk_trace_register_description, (const char *name,
52 		uint16_t tpoint_id, uint8_t owner_type,
53 		uint8_t object_type, uint8_t new_object,
54 		uint8_t arg1_type, const char *arg1_name));
55 DEFINE_STUB_V(_spdk_trace_record, (uint64_t tsc, uint16_t tpoint_id, uint16_t poller_id,
56 				   uint32_t size, uint64_t object_id, uint64_t arg1));
57 DEFINE_STUB(spdk_notify_send, uint64_t, (const char *type, const char *ctx), 0);
58 DEFINE_STUB(spdk_notify_type_register, struct spdk_notify_type *, (const char *type), NULL);
59 DEFINE_STUB_V(spdk_scsi_nvme_translate, (const struct spdk_bdev_io *bdev_io, int *sc, int *sk,
60 		int *asc, int *ascq));
61 
62 struct ut_bdev {
63 	struct spdk_bdev	bdev;
64 	void			*io_target;
65 };
66 
67 struct ut_bdev_channel {
68 	TAILQ_HEAD(, spdk_bdev_io)	outstanding_io;
69 	uint32_t			outstanding_cnt;
70 	uint32_t			avail_cnt;
71 };
72 
73 int g_io_device;
74 struct ut_bdev g_bdev;
75 struct spdk_bdev_desc *g_desc;
76 bool g_teardown_done = false;
77 bool g_get_io_channel = true;
78 bool g_create_ch = true;
79 bool g_init_complete_called = false;
80 bool g_fini_start_called = true;
81 int g_status = 0;
82 int g_count = 0;
83 struct spdk_histogram_data *g_histogram = NULL;
84 
85 static int
86 stub_create_ch(void *io_device, void *ctx_buf)
87 {
88 	struct ut_bdev_channel *ch = ctx_buf;
89 
90 	if (g_create_ch == false) {
91 		return -1;
92 	}
93 
94 	TAILQ_INIT(&ch->outstanding_io);
95 	ch->outstanding_cnt = 0;
96 	/*
97 	 * When avail gets to 0, the submit_request function will return ENOMEM.
98 	 *  Most tests to not want ENOMEM to occur, so by default set this to a
99 	 *  big value that won't get hit.  The ENOMEM tests can then override this
100 	 *  value to something much smaller to induce ENOMEM conditions.
101 	 */
102 	ch->avail_cnt = 2048;
103 	return 0;
104 }
105 
106 static void
107 stub_destroy_ch(void *io_device, void *ctx_buf)
108 {
109 }
110 
111 static struct spdk_io_channel *
112 stub_get_io_channel(void *ctx)
113 {
114 	struct ut_bdev *ut_bdev = ctx;
115 
116 	if (g_get_io_channel == true) {
117 		return spdk_get_io_channel(ut_bdev->io_target);
118 	} else {
119 		return NULL;
120 	}
121 }
122 
123 static int
124 stub_destruct(void *ctx)
125 {
126 	return 0;
127 }
128 
129 static void
130 stub_submit_request(struct spdk_io_channel *_ch, struct spdk_bdev_io *bdev_io)
131 {
132 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
133 	struct spdk_bdev_io *io;
134 
135 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
136 		while (!TAILQ_EMPTY(&ch->outstanding_io)) {
137 			io = TAILQ_FIRST(&ch->outstanding_io);
138 			TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
139 			ch->outstanding_cnt--;
140 			spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
141 			ch->avail_cnt++;
142 		}
143 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) {
144 		TAILQ_FOREACH(io, &ch->outstanding_io, module_link) {
145 			if (io == bdev_io->u.abort.bio_to_abort) {
146 				TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
147 				ch->outstanding_cnt--;
148 				spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
149 				ch->avail_cnt++;
150 
151 				spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
152 				return;
153 			}
154 		}
155 
156 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
157 		return;
158 	}
159 
160 	if (ch->avail_cnt > 0) {
161 		TAILQ_INSERT_TAIL(&ch->outstanding_io, bdev_io, module_link);
162 		ch->outstanding_cnt++;
163 		ch->avail_cnt--;
164 	} else {
165 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
166 	}
167 }
168 
169 static uint32_t
170 stub_complete_io(void *io_target, uint32_t num_to_complete)
171 {
172 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
173 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
174 	struct spdk_bdev_io *io;
175 	bool complete_all = (num_to_complete == 0);
176 	uint32_t num_completed = 0;
177 
178 	while (complete_all || num_completed < num_to_complete) {
179 		if (TAILQ_EMPTY(&ch->outstanding_io)) {
180 			break;
181 		}
182 		io = TAILQ_FIRST(&ch->outstanding_io);
183 		TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
184 		ch->outstanding_cnt--;
185 		spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_SUCCESS);
186 		ch->avail_cnt++;
187 		num_completed++;
188 	}
189 
190 	spdk_put_io_channel(_ch);
191 	return num_completed;
192 }
193 
194 static bool
195 stub_io_type_supported(void *ctx, enum spdk_bdev_io_type type)
196 {
197 	return true;
198 }
199 
200 static struct spdk_bdev_fn_table fn_table = {
201 	.get_io_channel =	stub_get_io_channel,
202 	.destruct =		stub_destruct,
203 	.submit_request =	stub_submit_request,
204 	.io_type_supported =	stub_io_type_supported,
205 };
206 
207 struct spdk_bdev_module bdev_ut_if;
208 
209 static int
210 module_init(void)
211 {
212 	spdk_bdev_module_init_done(&bdev_ut_if);
213 	return 0;
214 }
215 
216 static void
217 module_fini(void)
218 {
219 }
220 
221 static void
222 init_complete(void)
223 {
224 	g_init_complete_called = true;
225 }
226 
227 static void
228 fini_start(void)
229 {
230 	g_fini_start_called = true;
231 }
232 
233 struct spdk_bdev_module bdev_ut_if = {
234 	.name = "bdev_ut",
235 	.module_init = module_init,
236 	.module_fini = module_fini,
237 	.async_init = true,
238 	.init_complete = init_complete,
239 	.fini_start = fini_start,
240 };
241 
242 SPDK_BDEV_MODULE_REGISTER(bdev_ut, &bdev_ut_if)
243 
244 static void
245 register_bdev(struct ut_bdev *ut_bdev, char *name, void *io_target)
246 {
247 	memset(ut_bdev, 0, sizeof(*ut_bdev));
248 
249 	ut_bdev->io_target = io_target;
250 	ut_bdev->bdev.ctxt = ut_bdev;
251 	ut_bdev->bdev.name = name;
252 	ut_bdev->bdev.fn_table = &fn_table;
253 	ut_bdev->bdev.module = &bdev_ut_if;
254 	ut_bdev->bdev.blocklen = 4096;
255 	ut_bdev->bdev.blockcnt = 1024;
256 
257 	spdk_bdev_register(&ut_bdev->bdev);
258 }
259 
260 static void
261 unregister_bdev(struct ut_bdev *ut_bdev)
262 {
263 	/* Handle any deferred messages. */
264 	poll_threads();
265 	spdk_bdev_unregister(&ut_bdev->bdev, NULL, NULL);
266 }
267 
268 static void
269 bdev_init_cb(void *done, int rc)
270 {
271 	CU_ASSERT(rc == 0);
272 	*(bool *)done = true;
273 }
274 
275 static void
276 _bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
277 	       void *event_ctx)
278 {
279 	switch (type) {
280 	case SPDK_BDEV_EVENT_REMOVE:
281 		if (event_ctx != NULL) {
282 			*(bool *)event_ctx = true;
283 		}
284 		break;
285 	default:
286 		CU_ASSERT(false);
287 		break;
288 	}
289 }
290 
291 static void
292 setup_test(void)
293 {
294 	bool done = false;
295 
296 	allocate_cores(BDEV_UT_NUM_THREADS);
297 	allocate_threads(BDEV_UT_NUM_THREADS);
298 	set_thread(0);
299 	spdk_bdev_initialize(bdev_init_cb, &done);
300 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
301 				sizeof(struct ut_bdev_channel), NULL);
302 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
303 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
304 }
305 
306 static void
307 finish_cb(void *cb_arg)
308 {
309 	g_teardown_done = true;
310 }
311 
312 static void
313 teardown_test(void)
314 {
315 	set_thread(0);
316 	g_teardown_done = false;
317 	spdk_bdev_close(g_desc);
318 	g_desc = NULL;
319 	unregister_bdev(&g_bdev);
320 	spdk_io_device_unregister(&g_io_device, NULL);
321 	spdk_bdev_finish(finish_cb, NULL);
322 	poll_threads();
323 	memset(&g_bdev, 0, sizeof(g_bdev));
324 	CU_ASSERT(g_teardown_done == true);
325 	g_teardown_done = false;
326 	free_threads();
327 	free_cores();
328 }
329 
330 static uint32_t
331 bdev_io_tailq_cnt(bdev_io_tailq_t *tailq)
332 {
333 	struct spdk_bdev_io *io;
334 	uint32_t cnt = 0;
335 
336 	TAILQ_FOREACH(io, tailq, internal.link) {
337 		cnt++;
338 	}
339 
340 	return cnt;
341 }
342 
343 static void
344 basic(void)
345 {
346 	g_init_complete_called = false;
347 	setup_test();
348 	CU_ASSERT(g_init_complete_called == true);
349 
350 	set_thread(0);
351 
352 	g_get_io_channel = false;
353 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
354 	CU_ASSERT(g_ut_threads[0].ch == NULL);
355 
356 	g_get_io_channel = true;
357 	g_create_ch = false;
358 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
359 	CU_ASSERT(g_ut_threads[0].ch == NULL);
360 
361 	g_get_io_channel = true;
362 	g_create_ch = true;
363 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
364 	CU_ASSERT(g_ut_threads[0].ch != NULL);
365 	spdk_put_io_channel(g_ut_threads[0].ch);
366 
367 	g_fini_start_called = false;
368 	teardown_test();
369 	CU_ASSERT(g_fini_start_called == true);
370 }
371 
372 static void
373 _bdev_unregistered(void *done, int rc)
374 {
375 	CU_ASSERT(rc == 0);
376 	*(bool *)done = true;
377 }
378 
379 static void
380 unregister_and_close(void)
381 {
382 	bool done, remove_notify;
383 	struct spdk_bdev_desc *desc = NULL;
384 
385 	setup_test();
386 	set_thread(0);
387 
388 	/* setup_test() automatically opens the bdev,
389 	 * but this test needs to do that in a different
390 	 * way. */
391 	spdk_bdev_close(g_desc);
392 	poll_threads();
393 
394 	/* Try hotremoving a bdev with descriptors which don't provide
395 	 * any context to the notification callback */
396 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &desc);
397 	SPDK_CU_ASSERT_FATAL(desc != NULL);
398 
399 	/* There is an open descriptor on the device. Unregister it,
400 	 * which can't proceed until the descriptor is closed. */
401 	done = false;
402 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
403 
404 	/* Poll the threads to allow all events to be processed */
405 	poll_threads();
406 
407 	/* Make sure the bdev was not unregistered. We still have a
408 	 * descriptor open */
409 	CU_ASSERT(done == false);
410 
411 	spdk_bdev_close(desc);
412 	poll_threads();
413 	desc = NULL;
414 
415 	/* The unregister should have completed */
416 	CU_ASSERT(done == true);
417 
418 
419 	/* Register the bdev again */
420 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
421 
422 	remove_notify = false;
423 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, &remove_notify, &desc);
424 	SPDK_CU_ASSERT_FATAL(desc != NULL);
425 	CU_ASSERT(remove_notify == false);
426 
427 	/* There is an open descriptor on the device. Unregister it,
428 	 * which can't proceed until the descriptor is closed. */
429 	done = false;
430 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
431 	/* No polling has occurred, so neither of these should execute */
432 	CU_ASSERT(remove_notify == false);
433 	CU_ASSERT(done == false);
434 
435 	/* Prior to the unregister completing, close the descriptor */
436 	spdk_bdev_close(desc);
437 
438 	/* Poll the threads to allow all events to be processed */
439 	poll_threads();
440 
441 	/* Remove notify should not have been called because the
442 	 * descriptor is already closed. */
443 	CU_ASSERT(remove_notify == false);
444 
445 	/* The unregister should have completed */
446 	CU_ASSERT(done == true);
447 
448 	/* Restore the original g_bdev so that we can use teardown_test(). */
449 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
450 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
451 	teardown_test();
452 }
453 
454 static void
455 reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
456 {
457 	bool *done = cb_arg;
458 
459 	CU_ASSERT(success == true);
460 	*done = true;
461 	spdk_bdev_free_io(bdev_io);
462 }
463 
464 static void
465 put_channel_during_reset(void)
466 {
467 	struct spdk_io_channel *io_ch;
468 	bool done = false;
469 
470 	setup_test();
471 
472 	set_thread(0);
473 	io_ch = spdk_bdev_get_io_channel(g_desc);
474 	CU_ASSERT(io_ch != NULL);
475 
476 	/*
477 	 * Start a reset, but then put the I/O channel before
478 	 *  the deferred messages for the reset get a chance to
479 	 *  execute.
480 	 */
481 	spdk_bdev_reset(g_desc, io_ch, reset_done, &done);
482 	spdk_put_io_channel(io_ch);
483 	poll_threads();
484 	stub_complete_io(g_bdev.io_target, 0);
485 
486 	teardown_test();
487 }
488 
489 static void
490 aborted_reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
491 {
492 	enum spdk_bdev_io_status *status = cb_arg;
493 
494 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
495 	spdk_bdev_free_io(bdev_io);
496 }
497 
498 static void
499 aborted_reset(void)
500 {
501 	struct spdk_io_channel *io_ch[2];
502 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
503 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
504 
505 	setup_test();
506 
507 	set_thread(0);
508 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
509 	CU_ASSERT(io_ch[0] != NULL);
510 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
511 	poll_threads();
512 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
513 
514 	/*
515 	 * First reset has been submitted on ch0.  Now submit a second
516 	 *  reset on ch1 which will get queued since there is already a
517 	 *  reset in progress.
518 	 */
519 	set_thread(1);
520 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
521 	CU_ASSERT(io_ch[1] != NULL);
522 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
523 	poll_threads();
524 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
525 
526 	/*
527 	 * Now destroy ch1.  This will abort the queued reset.  Check that
528 	 *  the second reset was completed with failed status.  Also check
529 	 *  that bdev->internal.reset_in_progress != NULL, since the
530 	 *  original reset has not been completed yet.  This ensures that
531 	 *  the bdev code is correctly noticing that the failed reset is
532 	 *  *not* the one that had been submitted to the bdev module.
533 	 */
534 	set_thread(1);
535 	spdk_put_io_channel(io_ch[1]);
536 	poll_threads();
537 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_FAILED);
538 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
539 
540 	/*
541 	 * Now complete the first reset, verify that it completed with SUCCESS
542 	 *  status and that bdev->internal.reset_in_progress is also set back to NULL.
543 	 */
544 	set_thread(0);
545 	spdk_put_io_channel(io_ch[0]);
546 	stub_complete_io(g_bdev.io_target, 0);
547 	poll_threads();
548 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
549 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
550 
551 	teardown_test();
552 }
553 
554 static void
555 io_during_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
556 {
557 	enum spdk_bdev_io_status *status = cb_arg;
558 
559 	*status = bdev_io->internal.status;
560 	spdk_bdev_free_io(bdev_io);
561 }
562 
563 static void
564 io_during_reset(void)
565 {
566 	struct spdk_io_channel *io_ch[2];
567 	struct spdk_bdev_channel *bdev_ch[2];
568 	enum spdk_bdev_io_status status0, status1, status_reset;
569 	int rc;
570 
571 	setup_test();
572 
573 	/*
574 	 * First test normal case - submit an I/O on each of two channels (with no resets)
575 	 *  and verify they complete successfully.
576 	 */
577 	set_thread(0);
578 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
579 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
580 	CU_ASSERT(bdev_ch[0]->flags == 0);
581 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
582 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
583 	CU_ASSERT(rc == 0);
584 
585 	set_thread(1);
586 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
587 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
588 	CU_ASSERT(bdev_ch[1]->flags == 0);
589 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
590 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
591 	CU_ASSERT(rc == 0);
592 
593 	poll_threads();
594 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
595 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
596 
597 	set_thread(0);
598 	stub_complete_io(g_bdev.io_target, 0);
599 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
600 
601 	set_thread(1);
602 	stub_complete_io(g_bdev.io_target, 0);
603 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
604 
605 	/*
606 	 * Now submit a reset, and leave it pending while we submit I/O on two different
607 	 *  channels.  These I/O should be failed by the bdev layer since the reset is in
608 	 *  progress.
609 	 */
610 	set_thread(0);
611 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
612 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &status_reset);
613 	CU_ASSERT(rc == 0);
614 
615 	CU_ASSERT(bdev_ch[0]->flags == 0);
616 	CU_ASSERT(bdev_ch[1]->flags == 0);
617 	poll_threads();
618 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_RESET_IN_PROGRESS);
619 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_RESET_IN_PROGRESS);
620 
621 	set_thread(0);
622 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
623 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
624 	CU_ASSERT(rc == 0);
625 
626 	set_thread(1);
627 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
628 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
629 	CU_ASSERT(rc == 0);
630 
631 	/*
632 	 * A reset is in progress so these read I/O should complete with aborted.  Note that we
633 	 *  need to poll_threads() since I/O completed inline have their completion deferred.
634 	 */
635 	poll_threads();
636 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
637 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
638 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
639 
640 	/*
641 	 * Complete the reset
642 	 */
643 	set_thread(0);
644 	stub_complete_io(g_bdev.io_target, 0);
645 
646 	/*
647 	 * Only poll thread 0. We should not get a completion.
648 	 */
649 	poll_thread(0);
650 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
651 
652 	/*
653 	 * Poll both thread 0 and 1 so the messages can propagate and we
654 	 * get a completion.
655 	 */
656 	poll_threads();
657 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
658 
659 	spdk_put_io_channel(io_ch[0]);
660 	set_thread(1);
661 	spdk_put_io_channel(io_ch[1]);
662 	poll_threads();
663 
664 	teardown_test();
665 }
666 
667 static void
668 basic_qos(void)
669 {
670 	struct spdk_io_channel *io_ch[2];
671 	struct spdk_bdev_channel *bdev_ch[2];
672 	struct spdk_bdev *bdev;
673 	enum spdk_bdev_io_status status, abort_status;
674 	int rc;
675 
676 	setup_test();
677 
678 	/* Enable QoS */
679 	bdev = &g_bdev.bdev;
680 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
681 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
682 	TAILQ_INIT(&bdev->internal.qos->queued);
683 	/*
684 	 * Enable read/write IOPS, read only byte per second and
685 	 * read/write byte per second rate limits.
686 	 * In this case, all rate limits will take equal effect.
687 	 */
688 	/* 2000 read/write I/O per second, or 2 per millisecond */
689 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
690 	/* 8K read/write byte per millisecond with 4K block size */
691 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
692 	/* 8K read only byte per millisecond with 4K block size */
693 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 8192000;
694 
695 	g_get_io_channel = true;
696 
697 	set_thread(0);
698 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
699 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
700 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
701 
702 	set_thread(1);
703 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
704 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
705 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
706 
707 	/*
708 	 * Send an I/O on thread 0, which is where the QoS thread is running.
709 	 */
710 	set_thread(0);
711 	status = SPDK_BDEV_IO_STATUS_PENDING;
712 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
713 	CU_ASSERT(rc == 0);
714 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
715 	poll_threads();
716 	stub_complete_io(g_bdev.io_target, 0);
717 	poll_threads();
718 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
719 
720 	/* Send an I/O on thread 1. The QoS thread is not running here. */
721 	status = SPDK_BDEV_IO_STATUS_PENDING;
722 	set_thread(1);
723 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
724 	CU_ASSERT(rc == 0);
725 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
726 	poll_threads();
727 	/* Complete I/O on thread 1. This should not complete the I/O we submitted */
728 	stub_complete_io(g_bdev.io_target, 0);
729 	poll_threads();
730 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
731 	/* Now complete I/O on thread 0 */
732 	set_thread(0);
733 	poll_threads();
734 	stub_complete_io(g_bdev.io_target, 0);
735 	poll_threads();
736 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
737 
738 	/* Reset rate limit for the next test cases. */
739 	spdk_delay_us(SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
740 	poll_threads();
741 
742 	/*
743 	 * Test abort request when QoS is enabled.
744 	 */
745 
746 	/* Send an I/O on thread 0, which is where the QoS thread is running. */
747 	set_thread(0);
748 	status = SPDK_BDEV_IO_STATUS_PENDING;
749 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
750 	CU_ASSERT(rc == 0);
751 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
752 	/* Send an abort to the I/O on the same thread. */
753 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
754 	rc = spdk_bdev_abort(g_desc, io_ch[0], &status, io_during_io_done, &abort_status);
755 	CU_ASSERT(rc == 0);
756 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
757 	poll_threads();
758 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
759 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
760 
761 	/* Send an I/O on thread 1. The QoS thread is not running here. */
762 	status = SPDK_BDEV_IO_STATUS_PENDING;
763 	set_thread(1);
764 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
765 	CU_ASSERT(rc == 0);
766 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
767 	poll_threads();
768 	/* Send an abort to the I/O on the same thread. */
769 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
770 	rc = spdk_bdev_abort(g_desc, io_ch[1], &status, io_during_io_done, &abort_status);
771 	CU_ASSERT(rc == 0);
772 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
773 	poll_threads();
774 	/* Complete the I/O with failure and the abort with success on thread 1. */
775 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
776 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
777 
778 	set_thread(0);
779 
780 	/*
781 	 * Close the descriptor only, which should stop the qos channel as
782 	 * the last descriptor removed.
783 	 */
784 	spdk_bdev_close(g_desc);
785 	poll_threads();
786 	CU_ASSERT(bdev->internal.qos->ch == NULL);
787 
788 	/*
789 	 * Open the bdev again which shall setup the qos channel as the
790 	 * channels are valid.
791 	 */
792 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
793 	poll_threads();
794 	CU_ASSERT(bdev->internal.qos->ch != NULL);
795 
796 	/* Tear down the channels */
797 	set_thread(0);
798 	spdk_put_io_channel(io_ch[0]);
799 	set_thread(1);
800 	spdk_put_io_channel(io_ch[1]);
801 	poll_threads();
802 	set_thread(0);
803 
804 	/* Close the descriptor, which should stop the qos channel */
805 	spdk_bdev_close(g_desc);
806 	poll_threads();
807 	CU_ASSERT(bdev->internal.qos->ch == NULL);
808 
809 	/* Open the bdev again, no qos channel setup without valid channels. */
810 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
811 	poll_threads();
812 	CU_ASSERT(bdev->internal.qos->ch == NULL);
813 
814 	/* Create the channels in reverse order. */
815 	set_thread(1);
816 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
817 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
818 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
819 
820 	set_thread(0);
821 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
822 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
823 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
824 
825 	/* Confirm that the qos thread is now thread 1 */
826 	CU_ASSERT(bdev->internal.qos->ch == bdev_ch[1]);
827 
828 	/* Tear down the channels */
829 	set_thread(0);
830 	spdk_put_io_channel(io_ch[0]);
831 	set_thread(1);
832 	spdk_put_io_channel(io_ch[1]);
833 	poll_threads();
834 
835 	set_thread(0);
836 
837 	teardown_test();
838 }
839 
840 static void
841 io_during_qos_queue(void)
842 {
843 	struct spdk_io_channel *io_ch[2];
844 	struct spdk_bdev_channel *bdev_ch[2];
845 	struct spdk_bdev *bdev;
846 	enum spdk_bdev_io_status status0, status1, status2;
847 	int rc;
848 
849 	setup_test();
850 	MOCK_SET(spdk_get_ticks, 0);
851 
852 	/* Enable QoS */
853 	bdev = &g_bdev.bdev;
854 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
855 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
856 	TAILQ_INIT(&bdev->internal.qos->queued);
857 	/*
858 	 * Enable read/write IOPS, read only byte per sec, write only
859 	 * byte per sec and read/write byte per sec rate limits.
860 	 * In this case, both read only and write only byte per sec
861 	 * rate limit will take effect.
862 	 */
863 	/* 4000 read/write I/O per second, or 4 per millisecond */
864 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 4000;
865 	/* 8K byte per millisecond with 4K block size */
866 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
867 	/* 4K byte per millisecond with 4K block size */
868 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 4096000;
869 	/* 4K byte per millisecond with 4K block size */
870 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 4096000;
871 
872 	g_get_io_channel = true;
873 
874 	/* Create channels */
875 	set_thread(0);
876 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
877 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
878 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
879 
880 	set_thread(1);
881 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
882 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
883 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
884 
885 	/* Send two read I/Os */
886 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
887 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
888 	CU_ASSERT(rc == 0);
889 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
890 	set_thread(0);
891 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
892 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
893 	CU_ASSERT(rc == 0);
894 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
895 	/* Send one write I/O */
896 	status2 = SPDK_BDEV_IO_STATUS_PENDING;
897 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status2);
898 	CU_ASSERT(rc == 0);
899 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_PENDING);
900 
901 	/* Complete any I/O that arrived at the disk */
902 	poll_threads();
903 	set_thread(1);
904 	stub_complete_io(g_bdev.io_target, 0);
905 	set_thread(0);
906 	stub_complete_io(g_bdev.io_target, 0);
907 	poll_threads();
908 
909 	/* Only one of the two read I/Os should complete. (logical XOR) */
910 	if (status0 == SPDK_BDEV_IO_STATUS_SUCCESS) {
911 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
912 	} else {
913 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
914 	}
915 	/* The write I/O should complete. */
916 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
917 
918 	/* Advance in time by a millisecond */
919 	spdk_delay_us(1000);
920 
921 	/* Complete more I/O */
922 	poll_threads();
923 	set_thread(1);
924 	stub_complete_io(g_bdev.io_target, 0);
925 	set_thread(0);
926 	stub_complete_io(g_bdev.io_target, 0);
927 	poll_threads();
928 
929 	/* Now the second read I/O should be done */
930 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
931 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
932 
933 	/* Tear down the channels */
934 	set_thread(1);
935 	spdk_put_io_channel(io_ch[1]);
936 	set_thread(0);
937 	spdk_put_io_channel(io_ch[0]);
938 	poll_threads();
939 
940 	teardown_test();
941 }
942 
943 static void
944 io_during_qos_reset(void)
945 {
946 	struct spdk_io_channel *io_ch[2];
947 	struct spdk_bdev_channel *bdev_ch[2];
948 	struct spdk_bdev *bdev;
949 	enum spdk_bdev_io_status status0, status1, reset_status;
950 	int rc;
951 
952 	setup_test();
953 	MOCK_SET(spdk_get_ticks, 0);
954 
955 	/* Enable QoS */
956 	bdev = &g_bdev.bdev;
957 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
958 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
959 	TAILQ_INIT(&bdev->internal.qos->queued);
960 	/*
961 	 * Enable read/write IOPS, write only byte per sec and
962 	 * read/write byte per second rate limits.
963 	 * In this case, read/write byte per second rate limit will
964 	 * take effect first.
965 	 */
966 	/* 2000 read/write I/O per second, or 2 per millisecond */
967 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
968 	/* 4K byte per millisecond with 4K block size */
969 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 4096000;
970 	/* 8K byte per millisecond with 4K block size */
971 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 8192000;
972 
973 	g_get_io_channel = true;
974 
975 	/* Create channels */
976 	set_thread(0);
977 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
978 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
979 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
980 
981 	set_thread(1);
982 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
983 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
984 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
985 
986 	/* Send two I/O. One of these gets queued by QoS. The other is sitting at the disk. */
987 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
988 	rc = spdk_bdev_write_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
989 	CU_ASSERT(rc == 0);
990 	set_thread(0);
991 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
992 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
993 	CU_ASSERT(rc == 0);
994 
995 	poll_threads();
996 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
997 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
998 
999 	/* Reset the bdev. */
1000 	reset_status = SPDK_BDEV_IO_STATUS_PENDING;
1001 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &reset_status);
1002 	CU_ASSERT(rc == 0);
1003 
1004 	/* Complete any I/O that arrived at the disk */
1005 	poll_threads();
1006 	set_thread(1);
1007 	stub_complete_io(g_bdev.io_target, 0);
1008 	set_thread(0);
1009 	stub_complete_io(g_bdev.io_target, 0);
1010 	poll_threads();
1011 
1012 	CU_ASSERT(reset_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1013 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
1014 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
1015 
1016 	/* Tear down the channels */
1017 	set_thread(1);
1018 	spdk_put_io_channel(io_ch[1]);
1019 	set_thread(0);
1020 	spdk_put_io_channel(io_ch[0]);
1021 	poll_threads();
1022 
1023 	teardown_test();
1024 }
1025 
1026 static void
1027 enomem_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1028 {
1029 	enum spdk_bdev_io_status *status = cb_arg;
1030 
1031 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1032 	spdk_bdev_free_io(bdev_io);
1033 }
1034 
1035 static void
1036 enomem(void)
1037 {
1038 	struct spdk_io_channel *io_ch;
1039 	struct spdk_bdev_channel *bdev_ch;
1040 	struct spdk_bdev_shared_resource *shared_resource;
1041 	struct ut_bdev_channel *ut_ch;
1042 	const uint32_t IO_ARRAY_SIZE = 64;
1043 	const uint32_t AVAIL = 20;
1044 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE], status_reset;
1045 	uint32_t nomem_cnt, i;
1046 	struct spdk_bdev_io *first_io;
1047 	int rc;
1048 
1049 	setup_test();
1050 
1051 	set_thread(0);
1052 	io_ch = spdk_bdev_get_io_channel(g_desc);
1053 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1054 	shared_resource = bdev_ch->shared_resource;
1055 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1056 	ut_ch->avail_cnt = AVAIL;
1057 
1058 	/* First submit a number of IOs equal to what the channel can support. */
1059 	for (i = 0; i < AVAIL; i++) {
1060 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1061 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1062 		CU_ASSERT(rc == 0);
1063 	}
1064 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1065 
1066 	/*
1067 	 * Next, submit one additional I/O.  This one should fail with ENOMEM and then go onto
1068 	 *  the enomem_io list.
1069 	 */
1070 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1071 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1072 	CU_ASSERT(rc == 0);
1073 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1074 	first_io = TAILQ_FIRST(&shared_resource->nomem_io);
1075 
1076 	/*
1077 	 * Now submit a bunch more I/O.  These should all fail with ENOMEM and get queued behind
1078 	 *  the first_io above.
1079 	 */
1080 	for (i = AVAIL + 1; i < IO_ARRAY_SIZE; i++) {
1081 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1082 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1083 		CU_ASSERT(rc == 0);
1084 	}
1085 
1086 	/* Assert that first_io is still at the head of the list. */
1087 	CU_ASSERT(TAILQ_FIRST(&shared_resource->nomem_io) == first_io);
1088 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == (IO_ARRAY_SIZE - AVAIL));
1089 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1090 	CU_ASSERT(shared_resource->nomem_threshold == (AVAIL - NOMEM_THRESHOLD_COUNT));
1091 
1092 	/*
1093 	 * Complete 1 I/O only.  The key check here is bdev_io_tailq_cnt - this should not have
1094 	 *  changed since completing just 1 I/O should not trigger retrying the queued nomem_io
1095 	 *  list.
1096 	 */
1097 	stub_complete_io(g_bdev.io_target, 1);
1098 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1099 
1100 	/*
1101 	 * Complete enough I/O to hit the nomem_theshold.  This should trigger retrying nomem_io,
1102 	 *  and we should see I/O get resubmitted to the test bdev module.
1103 	 */
1104 	stub_complete_io(g_bdev.io_target, NOMEM_THRESHOLD_COUNT - 1);
1105 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) < nomem_cnt);
1106 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1107 
1108 	/* Complete 1 I/O only.  This should not trigger retrying the queued nomem_io. */
1109 	stub_complete_io(g_bdev.io_target, 1);
1110 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1111 
1112 	/*
1113 	 * Send a reset and confirm that all I/O are completed, including the ones that
1114 	 *  were queued on the nomem_io list.
1115 	 */
1116 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
1117 	rc = spdk_bdev_reset(g_desc, io_ch, enomem_done, &status_reset);
1118 	poll_threads();
1119 	CU_ASSERT(rc == 0);
1120 	/* This will complete the reset. */
1121 	stub_complete_io(g_bdev.io_target, 0);
1122 
1123 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == 0);
1124 	CU_ASSERT(shared_resource->io_outstanding == 0);
1125 
1126 	spdk_put_io_channel(io_ch);
1127 	poll_threads();
1128 	teardown_test();
1129 }
1130 
1131 static void
1132 enomem_multi_bdev(void)
1133 {
1134 	struct spdk_io_channel *io_ch;
1135 	struct spdk_bdev_channel *bdev_ch;
1136 	struct spdk_bdev_shared_resource *shared_resource;
1137 	struct ut_bdev_channel *ut_ch;
1138 	const uint32_t IO_ARRAY_SIZE = 64;
1139 	const uint32_t AVAIL = 20;
1140 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1141 	uint32_t i;
1142 	struct ut_bdev *second_bdev;
1143 	struct spdk_bdev_desc *second_desc = NULL;
1144 	struct spdk_bdev_channel *second_bdev_ch;
1145 	struct spdk_io_channel *second_ch;
1146 	int rc;
1147 
1148 	setup_test();
1149 
1150 	/* Register second bdev with the same io_target  */
1151 	second_bdev = calloc(1, sizeof(*second_bdev));
1152 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1153 	register_bdev(second_bdev, "ut_bdev2", g_bdev.io_target);
1154 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1155 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1156 
1157 	set_thread(0);
1158 	io_ch = spdk_bdev_get_io_channel(g_desc);
1159 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1160 	shared_resource = bdev_ch->shared_resource;
1161 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1162 	ut_ch->avail_cnt = AVAIL;
1163 
1164 	second_ch = spdk_bdev_get_io_channel(second_desc);
1165 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1166 	SPDK_CU_ASSERT_FATAL(shared_resource == second_bdev_ch->shared_resource);
1167 
1168 	/* Saturate io_target through bdev A. */
1169 	for (i = 0; i < AVAIL; i++) {
1170 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1171 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1172 		CU_ASSERT(rc == 0);
1173 	}
1174 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1175 
1176 	/*
1177 	 * Now submit I/O through the second bdev. This should fail with ENOMEM
1178 	 * and then go onto the nomem_io list.
1179 	 */
1180 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1181 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1182 	CU_ASSERT(rc == 0);
1183 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1184 
1185 	/* Complete first bdev's I/O. This should retry sending second bdev's nomem_io */
1186 	stub_complete_io(g_bdev.io_target, AVAIL);
1187 
1188 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1189 	CU_ASSERT(shared_resource->io_outstanding == 1);
1190 
1191 	/* Now complete our retried I/O  */
1192 	stub_complete_io(g_bdev.io_target, 1);
1193 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1194 
1195 	spdk_put_io_channel(io_ch);
1196 	spdk_put_io_channel(second_ch);
1197 	spdk_bdev_close(second_desc);
1198 	unregister_bdev(second_bdev);
1199 	poll_threads();
1200 	free(second_bdev);
1201 	teardown_test();
1202 }
1203 
1204 
1205 static void
1206 enomem_multi_io_target(void)
1207 {
1208 	struct spdk_io_channel *io_ch;
1209 	struct spdk_bdev_channel *bdev_ch;
1210 	struct ut_bdev_channel *ut_ch;
1211 	const uint32_t IO_ARRAY_SIZE = 64;
1212 	const uint32_t AVAIL = 20;
1213 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1214 	uint32_t i;
1215 	int new_io_device;
1216 	struct ut_bdev *second_bdev;
1217 	struct spdk_bdev_desc *second_desc = NULL;
1218 	struct spdk_bdev_channel *second_bdev_ch;
1219 	struct spdk_io_channel *second_ch;
1220 	int rc;
1221 
1222 	setup_test();
1223 
1224 	/* Create new io_target and a second bdev using it */
1225 	spdk_io_device_register(&new_io_device, stub_create_ch, stub_destroy_ch,
1226 				sizeof(struct ut_bdev_channel), NULL);
1227 	second_bdev = calloc(1, sizeof(*second_bdev));
1228 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1229 	register_bdev(second_bdev, "ut_bdev2", &new_io_device);
1230 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1231 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1232 
1233 	set_thread(0);
1234 	io_ch = spdk_bdev_get_io_channel(g_desc);
1235 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1236 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1237 	ut_ch->avail_cnt = AVAIL;
1238 
1239 	/* Different io_target should imply a different shared_resource */
1240 	second_ch = spdk_bdev_get_io_channel(second_desc);
1241 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1242 	SPDK_CU_ASSERT_FATAL(bdev_ch->shared_resource != second_bdev_ch->shared_resource);
1243 
1244 	/* Saturate io_target through bdev A. */
1245 	for (i = 0; i < AVAIL; i++) {
1246 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1247 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1248 		CU_ASSERT(rc == 0);
1249 	}
1250 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1251 
1252 	/* Issue one more I/O to fill ENOMEM list. */
1253 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1254 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1255 	CU_ASSERT(rc == 0);
1256 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1257 
1258 	/*
1259 	 * Now submit I/O through the second bdev. This should go through and complete
1260 	 * successfully because we're using a different io_device underneath.
1261 	 */
1262 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1263 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1264 	CU_ASSERT(rc == 0);
1265 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&second_bdev_ch->shared_resource->nomem_io));
1266 	stub_complete_io(second_bdev->io_target, 1);
1267 
1268 	/* Cleanup; Complete outstanding I/O. */
1269 	stub_complete_io(g_bdev.io_target, AVAIL);
1270 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1271 	/* Complete the ENOMEM I/O */
1272 	stub_complete_io(g_bdev.io_target, 1);
1273 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1274 
1275 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1276 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1277 	spdk_put_io_channel(io_ch);
1278 	spdk_put_io_channel(second_ch);
1279 	spdk_bdev_close(second_desc);
1280 	unregister_bdev(second_bdev);
1281 	spdk_io_device_unregister(&new_io_device, NULL);
1282 	poll_threads();
1283 	free(second_bdev);
1284 	teardown_test();
1285 }
1286 
1287 static void
1288 qos_dynamic_enable_done(void *cb_arg, int status)
1289 {
1290 	int *rc = cb_arg;
1291 	*rc = status;
1292 }
1293 
1294 static void
1295 qos_dynamic_enable(void)
1296 {
1297 	struct spdk_io_channel *io_ch[2];
1298 	struct spdk_bdev_channel *bdev_ch[2];
1299 	struct spdk_bdev *bdev;
1300 	enum spdk_bdev_io_status bdev_io_status[2];
1301 	uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
1302 	int status, second_status, rc, i;
1303 
1304 	setup_test();
1305 	MOCK_SET(spdk_get_ticks, 0);
1306 
1307 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1308 		limits[i] = UINT64_MAX;
1309 	}
1310 
1311 	bdev = &g_bdev.bdev;
1312 
1313 	g_get_io_channel = true;
1314 
1315 	/* Create channels */
1316 	set_thread(0);
1317 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1318 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1319 	CU_ASSERT(bdev_ch[0]->flags == 0);
1320 
1321 	set_thread(1);
1322 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1323 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1324 	CU_ASSERT(bdev_ch[1]->flags == 0);
1325 
1326 	set_thread(0);
1327 
1328 	/*
1329 	 * Enable QoS: Read/Write IOPS, Read/Write byte,
1330 	 * Read only byte and Write only byte per second
1331 	 * rate limits.
1332 	 * More than 10 I/Os allowed per timeslice.
1333 	 */
1334 	status = -1;
1335 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1336 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 100;
1337 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 100;
1338 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 10;
1339 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1340 	poll_threads();
1341 	CU_ASSERT(status == 0);
1342 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1343 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1344 
1345 	/*
1346 	 * Submit and complete 10 I/O to fill the QoS allotment for this timeslice.
1347 	 * Additional I/O will then be queued.
1348 	 */
1349 	set_thread(0);
1350 	for (i = 0; i < 10; i++) {
1351 		bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1352 		rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1353 		CU_ASSERT(rc == 0);
1354 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1355 		poll_thread(0);
1356 		stub_complete_io(g_bdev.io_target, 0);
1357 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1358 	}
1359 
1360 	/*
1361 	 * Send two more I/O.  These I/O will be queued since the current timeslice allotment has been
1362 	 * filled already.  We want to test that when QoS is disabled that these two I/O:
1363 	 *  1) are not aborted
1364 	 *  2) are sent back to their original thread for resubmission
1365 	 */
1366 	bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1367 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1368 	CU_ASSERT(rc == 0);
1369 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1370 	set_thread(1);
1371 	bdev_io_status[1] = SPDK_BDEV_IO_STATUS_PENDING;
1372 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &bdev_io_status[1]);
1373 	CU_ASSERT(rc == 0);
1374 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1375 	poll_threads();
1376 
1377 	/*
1378 	 * Disable QoS: Read/Write IOPS, Read/Write byte,
1379 	 * Read only byte rate limits
1380 	 */
1381 	status = -1;
1382 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1383 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 0;
1384 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 0;
1385 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1386 	poll_threads();
1387 	CU_ASSERT(status == 0);
1388 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1389 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1390 
1391 	/* Disable QoS: Write only Byte per second rate limit */
1392 	status = -1;
1393 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 0;
1394 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1395 	poll_threads();
1396 	CU_ASSERT(status == 0);
1397 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1398 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1399 
1400 	/*
1401 	 * All I/O should have been resubmitted back on their original thread.  Complete
1402 	 *  all I/O on thread 0, and ensure that only the thread 0 I/O was completed.
1403 	 */
1404 	set_thread(0);
1405 	stub_complete_io(g_bdev.io_target, 0);
1406 	poll_threads();
1407 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1408 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1409 
1410 	/* Now complete all I/O on thread 1 and ensure the thread 1 I/O was completed. */
1411 	set_thread(1);
1412 	stub_complete_io(g_bdev.io_target, 0);
1413 	poll_threads();
1414 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_SUCCESS);
1415 
1416 	/* Disable QoS again */
1417 	status = -1;
1418 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1419 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1420 	poll_threads();
1421 	CU_ASSERT(status == 0); /* This should succeed */
1422 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1423 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1424 
1425 	/* Enable QoS on thread 0 */
1426 	status = -1;
1427 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1428 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1429 	poll_threads();
1430 	CU_ASSERT(status == 0);
1431 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1432 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1433 
1434 	/* Disable QoS on thread 1 */
1435 	set_thread(1);
1436 	status = -1;
1437 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1438 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1439 	/* Don't poll yet. This should leave the channels with QoS enabled */
1440 	CU_ASSERT(status == -1);
1441 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1442 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1443 
1444 	/* Enable QoS. This should immediately fail because the previous disable QoS hasn't completed. */
1445 	second_status = 0;
1446 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 10;
1447 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &second_status);
1448 	poll_threads();
1449 	CU_ASSERT(status == 0); /* The disable should succeed */
1450 	CU_ASSERT(second_status < 0); /* The enable should fail */
1451 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1452 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1453 
1454 	/* Enable QoS on thread 1. This should succeed now that the disable has completed. */
1455 	status = -1;
1456 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1457 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1458 	poll_threads();
1459 	CU_ASSERT(status == 0);
1460 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1461 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1462 
1463 	/* Tear down the channels */
1464 	set_thread(0);
1465 	spdk_put_io_channel(io_ch[0]);
1466 	set_thread(1);
1467 	spdk_put_io_channel(io_ch[1]);
1468 	poll_threads();
1469 
1470 	set_thread(0);
1471 	teardown_test();
1472 }
1473 
1474 static void
1475 histogram_status_cb(void *cb_arg, int status)
1476 {
1477 	g_status = status;
1478 }
1479 
1480 static void
1481 histogram_data_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
1482 {
1483 	g_status = status;
1484 	g_histogram = histogram;
1485 }
1486 
1487 static void
1488 histogram_io_count(void *ctx, uint64_t start, uint64_t end, uint64_t count,
1489 		   uint64_t total, uint64_t so_far)
1490 {
1491 	g_count += count;
1492 }
1493 
1494 static void
1495 bdev_histograms_mt(void)
1496 {
1497 	struct spdk_io_channel *ch[2];
1498 	struct spdk_histogram_data *histogram;
1499 	uint8_t buf[4096];
1500 	int status = false;
1501 	int rc;
1502 
1503 
1504 	setup_test();
1505 
1506 	set_thread(0);
1507 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1508 	CU_ASSERT(ch[0] != NULL);
1509 
1510 	set_thread(1);
1511 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1512 	CU_ASSERT(ch[1] != NULL);
1513 
1514 
1515 	/* Enable histogram */
1516 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, true);
1517 	poll_threads();
1518 	CU_ASSERT(g_status == 0);
1519 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1520 
1521 	/* Allocate histogram */
1522 	histogram = spdk_histogram_data_alloc();
1523 
1524 	/* Check if histogram is zeroed */
1525 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1526 	poll_threads();
1527 	CU_ASSERT(g_status == 0);
1528 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1529 
1530 	g_count = 0;
1531 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1532 
1533 	CU_ASSERT(g_count == 0);
1534 
1535 	set_thread(0);
1536 	rc = spdk_bdev_write_blocks(g_desc, ch[0], &buf, 0, 1, io_during_io_done, &status);
1537 	CU_ASSERT(rc == 0);
1538 
1539 	spdk_delay_us(10);
1540 	stub_complete_io(g_bdev.io_target, 1);
1541 	poll_threads();
1542 	CU_ASSERT(status == true);
1543 
1544 
1545 	set_thread(1);
1546 	rc = spdk_bdev_read_blocks(g_desc, ch[1], &buf, 0, 1, io_during_io_done, &status);
1547 	CU_ASSERT(rc == 0);
1548 
1549 	spdk_delay_us(10);
1550 	stub_complete_io(g_bdev.io_target, 1);
1551 	poll_threads();
1552 	CU_ASSERT(status == true);
1553 
1554 	set_thread(0);
1555 
1556 	/* Check if histogram gathered data from all I/O channels */
1557 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1558 	poll_threads();
1559 	CU_ASSERT(g_status == 0);
1560 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1561 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1562 
1563 	g_count = 0;
1564 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1565 	CU_ASSERT(g_count == 2);
1566 
1567 	/* Disable histogram */
1568 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, false);
1569 	poll_threads();
1570 	CU_ASSERT(g_status == 0);
1571 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == false);
1572 
1573 	spdk_histogram_data_free(histogram);
1574 
1575 	/* Tear down the channels */
1576 	set_thread(0);
1577 	spdk_put_io_channel(ch[0]);
1578 	set_thread(1);
1579 	spdk_put_io_channel(ch[1]);
1580 	poll_threads();
1581 	set_thread(0);
1582 	teardown_test();
1583 
1584 }
1585 
1586 struct timeout_io_cb_arg {
1587 	struct iovec iov;
1588 	uint8_t type;
1589 };
1590 
1591 static int
1592 bdev_channel_count_submitted_io(struct spdk_bdev_channel *ch)
1593 {
1594 	struct spdk_bdev_io *bdev_io;
1595 	int n = 0;
1596 
1597 	if (!ch) {
1598 		return -1;
1599 	}
1600 
1601 	TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) {
1602 		n++;
1603 	}
1604 
1605 	return n;
1606 }
1607 
1608 static void
1609 bdev_channel_io_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1610 {
1611 	struct timeout_io_cb_arg *ctx = cb_arg;
1612 
1613 	ctx->type = bdev_io->type;
1614 	ctx->iov.iov_base = bdev_io->iov.iov_base;
1615 	ctx->iov.iov_len = bdev_io->iov.iov_len;
1616 }
1617 
1618 static bool g_io_done;
1619 
1620 static void
1621 io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1622 {
1623 	g_io_done = true;
1624 	spdk_bdev_free_io(bdev_io);
1625 }
1626 
1627 static void
1628 bdev_set_io_timeout_mt(void)
1629 {
1630 	struct spdk_io_channel *ch[3];
1631 	struct spdk_bdev_channel *bdev_ch[3];
1632 	struct timeout_io_cb_arg cb_arg;
1633 
1634 	setup_test();
1635 
1636 	g_bdev.bdev.optimal_io_boundary = 16;
1637 	g_bdev.bdev.split_on_optimal_io_boundary = true;
1638 
1639 	set_thread(0);
1640 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1641 	CU_ASSERT(ch[0] != NULL);
1642 
1643 	set_thread(1);
1644 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1645 	CU_ASSERT(ch[1] != NULL);
1646 
1647 	set_thread(2);
1648 	ch[2] = spdk_bdev_get_io_channel(g_desc);
1649 	CU_ASSERT(ch[2] != NULL);
1650 
1651 	/* Multi-thread mode
1652 	 * 1, Check the poller was registered successfully
1653 	 * 2, Check the timeout IO and ensure the IO was the submitted by user
1654 	 * 3, Check the link int the bdev_ch works right.
1655 	 * 4, Close desc and put io channel during the timeout poller is polling
1656 	 */
1657 
1658 	/* In desc thread set the timeout */
1659 	set_thread(0);
1660 	CU_ASSERT(spdk_bdev_set_timeout(g_desc, 5, bdev_channel_io_timeout_cb, &cb_arg) == 0);
1661 	CU_ASSERT(g_desc->io_timeout_poller != NULL);
1662 	CU_ASSERT(g_desc->cb_fn == bdev_channel_io_timeout_cb);
1663 	CU_ASSERT(g_desc->cb_arg == &cb_arg);
1664 
1665 	/* check the IO submitted list and timeout handler */
1666 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x2000, 0, 1, io_done, NULL) == 0);
1667 	bdev_ch[0] = spdk_io_channel_get_ctx(ch[0]);
1668 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 1);
1669 
1670 	set_thread(1);
1671 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1672 	bdev_ch[1] = spdk_io_channel_get_ctx(ch[1]);
1673 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 1);
1674 
1675 	/* Now test that a single-vector command is split correctly.
1676 	 * Offset 14, length 8, payload 0xF000
1677 	 *  Child - Offset 14, length 2, payload 0xF000
1678 	 *  Child - Offset 16, length 6, payload 0xF000 + 2 * 512
1679 	 *
1680 	 * Set up the expected values before calling spdk_bdev_read_blocks
1681 	 */
1682 	set_thread(2);
1683 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0xF000, 14, 8, io_done, NULL) == 0);
1684 	bdev_ch[2] = spdk_io_channel_get_ctx(ch[2]);
1685 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 3);
1686 
1687 	set_thread(0);
1688 	memset(&cb_arg, 0, sizeof(cb_arg));
1689 	spdk_delay_us(3 * spdk_get_ticks_hz());
1690 	poll_threads();
1691 	CU_ASSERT(cb_arg.type == 0);
1692 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1693 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1694 
1695 	/* Now the time reach the limit */
1696 	spdk_delay_us(3 * spdk_get_ticks_hz());
1697 	poll_thread(0);
1698 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1699 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
1700 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1701 	stub_complete_io(g_bdev.io_target, 1);
1702 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 0);
1703 
1704 	memset(&cb_arg, 0, sizeof(cb_arg));
1705 	set_thread(1);
1706 	poll_thread(1);
1707 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
1708 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x1000);
1709 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1710 	stub_complete_io(g_bdev.io_target, 1);
1711 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 0);
1712 
1713 	memset(&cb_arg, 0, sizeof(cb_arg));
1714 	set_thread(2);
1715 	poll_thread(2);
1716 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1717 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0xF000);
1718 	CU_ASSERT(cb_arg.iov.iov_len == 8 * g_bdev.bdev.blocklen);
1719 	stub_complete_io(g_bdev.io_target, 1);
1720 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 2);
1721 	stub_complete_io(g_bdev.io_target, 1);
1722 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 0);
1723 
1724 	/* Run poll_timeout_done() it means complete the timeout poller */
1725 	set_thread(0);
1726 	poll_thread(0);
1727 	CU_ASSERT(g_desc->refs == 0);
1728 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1729 	set_thread(1);
1730 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x2000, 0, 2, io_done, NULL) == 0);
1731 	set_thread(2);
1732 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0x3000, 0, 3, io_done, NULL) == 0);
1733 
1734 	/* Trigger timeout poller to run again, desc->refs is incremented.
1735 	 * In thread 0 we destroy the io channel before timeout poller runs.
1736 	 * Timeout callback is not called on thread 0.
1737 	 */
1738 	spdk_delay_us(6 * spdk_get_ticks_hz());
1739 	memset(&cb_arg, 0, sizeof(cb_arg));
1740 	set_thread(0);
1741 	stub_complete_io(g_bdev.io_target, 1);
1742 	spdk_put_io_channel(ch[0]);
1743 	poll_thread(0);
1744 	CU_ASSERT(g_desc->refs == 1)
1745 	CU_ASSERT(cb_arg.type == 0);
1746 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1747 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1748 
1749 	/* In thread 1 timeout poller runs then we destroy the io channel
1750 	 * Timeout callback is called on thread 1.
1751 	 */
1752 	memset(&cb_arg, 0, sizeof(cb_arg));
1753 	set_thread(1);
1754 	poll_thread(1);
1755 	stub_complete_io(g_bdev.io_target, 1);
1756 	spdk_put_io_channel(ch[1]);
1757 	poll_thread(1);
1758 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
1759 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
1760 	CU_ASSERT(cb_arg.iov.iov_len == 2 * g_bdev.bdev.blocklen);
1761 
1762 	/* Close the desc.
1763 	 * Unregister the timeout poller first.
1764 	 * Then decrement desc->refs but it's not zero yet so desc is not freed.
1765 	 */
1766 	set_thread(0);
1767 	spdk_bdev_close(g_desc);
1768 	CU_ASSERT(g_desc->refs == 1);
1769 	CU_ASSERT(g_desc->io_timeout_poller == NULL);
1770 
1771 	/* Timeout poller runs on thread 2 then we destroy the io channel.
1772 	 * Desc is closed so we would exit the timeout poller directly.
1773 	 * timeout callback is not called on thread 2.
1774 	 */
1775 	memset(&cb_arg, 0, sizeof(cb_arg));
1776 	set_thread(2);
1777 	poll_thread(2);
1778 	stub_complete_io(g_bdev.io_target, 1);
1779 	spdk_put_io_channel(ch[2]);
1780 	poll_thread(2);
1781 	CU_ASSERT(cb_arg.type == 0);
1782 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1783 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1784 
1785 	set_thread(0);
1786 	poll_thread(0);
1787 	g_teardown_done = false;
1788 	unregister_bdev(&g_bdev);
1789 	spdk_io_device_unregister(&g_io_device, NULL);
1790 	spdk_bdev_finish(finish_cb, NULL);
1791 	poll_threads();
1792 	memset(&g_bdev, 0, sizeof(g_bdev));
1793 	CU_ASSERT(g_teardown_done == true);
1794 	g_teardown_done = false;
1795 	free_threads();
1796 	free_cores();
1797 }
1798 
1799 static bool g_io_done2;
1800 static bool g_lock_lba_range_done;
1801 static bool g_unlock_lba_range_done;
1802 
1803 static void
1804 io_done2(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1805 {
1806 	g_io_done2 = true;
1807 	spdk_bdev_free_io(bdev_io);
1808 }
1809 
1810 static void
1811 lock_lba_range_done(void *ctx, int status)
1812 {
1813 	g_lock_lba_range_done = true;
1814 }
1815 
1816 static void
1817 unlock_lba_range_done(void *ctx, int status)
1818 {
1819 	g_unlock_lba_range_done = true;
1820 }
1821 
1822 static uint32_t
1823 stub_channel_outstanding_cnt(void *io_target)
1824 {
1825 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
1826 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1827 	uint32_t outstanding_cnt;
1828 
1829 	outstanding_cnt = ch->outstanding_cnt;
1830 
1831 	spdk_put_io_channel(_ch);
1832 	return outstanding_cnt;
1833 }
1834 
1835 static void
1836 lock_lba_range_then_submit_io(void)
1837 {
1838 	struct spdk_bdev_desc *desc = NULL;
1839 	void *io_target;
1840 	struct spdk_io_channel *io_ch[3];
1841 	struct spdk_bdev_channel *bdev_ch[3];
1842 	struct lba_range *range;
1843 	char buf[4096];
1844 	int ctx0, ctx1, ctx2;
1845 	int rc;
1846 
1847 	setup_test();
1848 
1849 	io_target = g_bdev.io_target;
1850 	desc = g_desc;
1851 
1852 	set_thread(0);
1853 	io_ch[0] = spdk_bdev_get_io_channel(desc);
1854 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1855 	CU_ASSERT(io_ch[0] != NULL);
1856 
1857 	set_thread(1);
1858 	io_ch[1] = spdk_bdev_get_io_channel(desc);
1859 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1860 	CU_ASSERT(io_ch[1] != NULL);
1861 
1862 	set_thread(0);
1863 	g_lock_lba_range_done = false;
1864 	rc = bdev_lock_lba_range(desc, io_ch[0], 20, 10, lock_lba_range_done, &ctx0);
1865 	CU_ASSERT(rc == 0);
1866 	poll_threads();
1867 
1868 	/* The lock should immediately become valid, since there are no outstanding
1869 	 * write I/O.
1870 	 */
1871 	CU_ASSERT(g_lock_lba_range_done == true);
1872 	range = TAILQ_FIRST(&bdev_ch[0]->locked_ranges);
1873 	SPDK_CU_ASSERT_FATAL(range != NULL);
1874 	CU_ASSERT(range->offset == 20);
1875 	CU_ASSERT(range->length == 10);
1876 	CU_ASSERT(range->owner_ch == bdev_ch[0]);
1877 
1878 	g_io_done = false;
1879 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1880 	rc = spdk_bdev_read_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
1881 	CU_ASSERT(rc == 0);
1882 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1883 
1884 	stub_complete_io(io_target, 1);
1885 	poll_threads();
1886 	CU_ASSERT(g_io_done == true);
1887 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1888 
1889 	/* Try a write I/O.  This should actually be allowed to execute, since the channel
1890 	 * holding the lock is submitting the write I/O.
1891 	 */
1892 	g_io_done = false;
1893 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1894 	rc = spdk_bdev_write_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
1895 	CU_ASSERT(rc == 0);
1896 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1897 
1898 	stub_complete_io(io_target, 1);
1899 	poll_threads();
1900 	CU_ASSERT(g_io_done == true);
1901 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
1902 
1903 	/* Try a write I/O.  This should get queued in the io_locked tailq. */
1904 	set_thread(1);
1905 	g_io_done = false;
1906 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1907 	rc = spdk_bdev_write_blocks(desc, io_ch[1], buf, 20, 1, io_done, &ctx1);
1908 	CU_ASSERT(rc == 0);
1909 	poll_threads();
1910 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
1911 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1912 	CU_ASSERT(g_io_done == false);
1913 
1914 	/* Try to unlock the lba range using thread 1's io_ch.  This should fail. */
1915 	rc = bdev_unlock_lba_range(desc, io_ch[1], 20, 10, unlock_lba_range_done, &ctx1);
1916 	CU_ASSERT(rc == -EINVAL);
1917 
1918 	/* Now create a new channel and submit a write I/O with it.  This should also be queued.
1919 	 * The new channel should inherit the active locks from the bdev's internal list.
1920 	 */
1921 	set_thread(2);
1922 	io_ch[2] = spdk_bdev_get_io_channel(desc);
1923 	bdev_ch[2] = spdk_io_channel_get_ctx(io_ch[2]);
1924 	CU_ASSERT(io_ch[2] != NULL);
1925 
1926 	g_io_done2 = false;
1927 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1928 	rc = spdk_bdev_write_blocks(desc, io_ch[2], buf, 22, 2, io_done2, &ctx2);
1929 	CU_ASSERT(rc == 0);
1930 	poll_threads();
1931 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
1932 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1933 	CU_ASSERT(g_io_done2 == false);
1934 
1935 	set_thread(0);
1936 	rc = bdev_unlock_lba_range(desc, io_ch[0], 20, 10, unlock_lba_range_done, &ctx0);
1937 	CU_ASSERT(rc == 0);
1938 	poll_threads();
1939 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->locked_ranges));
1940 
1941 	/* The LBA range is unlocked, so the write IOs should now have started execution. */
1942 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
1943 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
1944 
1945 	set_thread(1);
1946 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1947 	stub_complete_io(io_target, 1);
1948 	set_thread(2);
1949 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
1950 	stub_complete_io(io_target, 1);
1951 
1952 	poll_threads();
1953 	CU_ASSERT(g_io_done == true);
1954 	CU_ASSERT(g_io_done2 == true);
1955 
1956 	/* Tear down the channels */
1957 	set_thread(0);
1958 	spdk_put_io_channel(io_ch[0]);
1959 	set_thread(1);
1960 	spdk_put_io_channel(io_ch[1]);
1961 	set_thread(2);
1962 	spdk_put_io_channel(io_ch[2]);
1963 	poll_threads();
1964 	set_thread(0);
1965 	teardown_test();
1966 }
1967 
1968 int
1969 main(int argc, char **argv)
1970 {
1971 	CU_pSuite	suite = NULL;
1972 	unsigned int	num_failures;
1973 
1974 	CU_set_error_action(CUEA_ABORT);
1975 	CU_initialize_registry();
1976 
1977 	suite = CU_add_suite("bdev", NULL, NULL);
1978 
1979 	CU_ADD_TEST(suite, basic);
1980 	CU_ADD_TEST(suite, unregister_and_close);
1981 	CU_ADD_TEST(suite, basic_qos);
1982 	CU_ADD_TEST(suite, put_channel_during_reset);
1983 	CU_ADD_TEST(suite, aborted_reset);
1984 	CU_ADD_TEST(suite, io_during_reset);
1985 	CU_ADD_TEST(suite, io_during_qos_queue);
1986 	CU_ADD_TEST(suite, io_during_qos_reset);
1987 	CU_ADD_TEST(suite, enomem);
1988 	CU_ADD_TEST(suite, enomem_multi_bdev);
1989 	CU_ADD_TEST(suite, enomem_multi_io_target);
1990 	CU_ADD_TEST(suite, qos_dynamic_enable);
1991 	CU_ADD_TEST(suite, bdev_histograms_mt);
1992 	CU_ADD_TEST(suite, bdev_set_io_timeout_mt);
1993 	CU_ADD_TEST(suite, lock_lba_range_then_submit_io);
1994 
1995 	CU_basic_set_mode(CU_BRM_VERBOSE);
1996 	CU_basic_run_tests();
1997 	num_failures = CU_get_number_of_failures();
1998 	CU_cleanup_registry();
1999 	return num_failures;
2000 }
2001