xref: /spdk/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c (revision ba20950a539d0b71a20f8a1199cbf759de92e854)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk_cunit.h"
8 
9 #include "common/lib/ut_multithread.c"
10 #include "unit/lib/json_mock.c"
11 
12 #include "spdk/config.h"
13 /* HACK: disable VTune integration so the unit test doesn't need VTune headers and libs to build */
14 #undef SPDK_CONFIG_VTUNE
15 
16 #include "bdev/bdev.c"
17 
18 #define BDEV_UT_NUM_THREADS 3
19 
20 DEFINE_STUB(spdk_notify_send, uint64_t, (const char *type, const char *ctx), 0);
21 DEFINE_STUB(spdk_notify_type_register, struct spdk_notify_type *, (const char *type), NULL);
22 DEFINE_STUB_V(spdk_scsi_nvme_translate, (const struct spdk_bdev_io *bdev_io, int *sc, int *sk,
23 		int *asc, int *ascq));
24 DEFINE_STUB(spdk_memory_domain_get_dma_device_id, const char *, (struct spdk_memory_domain *domain),
25 	    "test_domain");
26 DEFINE_STUB(spdk_memory_domain_get_dma_device_type, enum spdk_dma_device_type,
27 	    (struct spdk_memory_domain *domain), 0);
28 
29 DEFINE_RETURN_MOCK(spdk_memory_domain_pull_data, int);
30 int
31 spdk_memory_domain_pull_data(struct spdk_memory_domain *src_domain, void *src_domain_ctx,
32 			     struct iovec *src_iov, uint32_t src_iov_cnt, struct iovec *dst_iov, uint32_t dst_iov_cnt,
33 			     spdk_memory_domain_data_cpl_cb cpl_cb, void *cpl_cb_arg)
34 {
35 	HANDLE_RETURN_MOCK(spdk_memory_domain_pull_data);
36 
37 	cpl_cb(cpl_cb_arg, 0);
38 	return 0;
39 }
40 
41 DEFINE_RETURN_MOCK(spdk_memory_domain_push_data, int);
42 int
43 spdk_memory_domain_push_data(struct spdk_memory_domain *dst_domain, void *dst_domain_ctx,
44 			     struct iovec *dst_iov, uint32_t dst_iovcnt, struct iovec *src_iov, uint32_t src_iovcnt,
45 			     spdk_memory_domain_data_cpl_cb cpl_cb, void *cpl_cb_arg)
46 {
47 	HANDLE_RETURN_MOCK(spdk_memory_domain_push_data);
48 
49 	cpl_cb(cpl_cb_arg, 0);
50 	return 0;
51 }
52 
53 struct ut_bdev {
54 	struct spdk_bdev	bdev;
55 	void			*io_target;
56 };
57 
58 struct ut_bdev_channel {
59 	TAILQ_HEAD(, spdk_bdev_io)	outstanding_io;
60 	uint32_t			outstanding_cnt;
61 	uint32_t			avail_cnt;
62 };
63 
64 int g_io_device;
65 struct ut_bdev g_bdev;
66 struct spdk_bdev_desc *g_desc;
67 bool g_teardown_done = false;
68 bool g_get_io_channel = true;
69 bool g_create_ch = true;
70 bool g_init_complete_called = false;
71 bool g_fini_start_called = true;
72 int g_status = 0;
73 int g_count = 0;
74 struct spdk_histogram_data *g_histogram = NULL;
75 
76 static int
77 stub_create_ch(void *io_device, void *ctx_buf)
78 {
79 	struct ut_bdev_channel *ch = ctx_buf;
80 
81 	if (g_create_ch == false) {
82 		return -1;
83 	}
84 
85 	TAILQ_INIT(&ch->outstanding_io);
86 	ch->outstanding_cnt = 0;
87 	/*
88 	 * When avail gets to 0, the submit_request function will return ENOMEM.
89 	 *  Most tests to not want ENOMEM to occur, so by default set this to a
90 	 *  big value that won't get hit.  The ENOMEM tests can then override this
91 	 *  value to something much smaller to induce ENOMEM conditions.
92 	 */
93 	ch->avail_cnt = 2048;
94 	return 0;
95 }
96 
97 static void
98 stub_destroy_ch(void *io_device, void *ctx_buf)
99 {
100 }
101 
102 static struct spdk_io_channel *
103 stub_get_io_channel(void *ctx)
104 {
105 	struct ut_bdev *ut_bdev = ctx;
106 
107 	if (g_get_io_channel == true) {
108 		return spdk_get_io_channel(ut_bdev->io_target);
109 	} else {
110 		return NULL;
111 	}
112 }
113 
114 static int
115 stub_destruct(void *ctx)
116 {
117 	return 0;
118 }
119 
120 static void
121 stub_submit_request(struct spdk_io_channel *_ch, struct spdk_bdev_io *bdev_io)
122 {
123 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
124 	struct spdk_bdev_io *io;
125 
126 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
127 		while (!TAILQ_EMPTY(&ch->outstanding_io)) {
128 			io = TAILQ_FIRST(&ch->outstanding_io);
129 			TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
130 			ch->outstanding_cnt--;
131 			spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
132 			ch->avail_cnt++;
133 		}
134 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) {
135 		TAILQ_FOREACH(io, &ch->outstanding_io, module_link) {
136 			if (io == bdev_io->u.abort.bio_to_abort) {
137 				TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
138 				ch->outstanding_cnt--;
139 				spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
140 				ch->avail_cnt++;
141 
142 				spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
143 				return;
144 			}
145 		}
146 
147 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
148 		return;
149 	}
150 
151 	if (ch->avail_cnt > 0) {
152 		TAILQ_INSERT_TAIL(&ch->outstanding_io, bdev_io, module_link);
153 		ch->outstanding_cnt++;
154 		ch->avail_cnt--;
155 	} else {
156 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
157 	}
158 }
159 
160 static uint32_t
161 stub_complete_io(void *io_target, uint32_t num_to_complete)
162 {
163 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
164 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
165 	struct spdk_bdev_io *io;
166 	bool complete_all = (num_to_complete == 0);
167 	uint32_t num_completed = 0;
168 
169 	while (complete_all || num_completed < num_to_complete) {
170 		if (TAILQ_EMPTY(&ch->outstanding_io)) {
171 			break;
172 		}
173 		io = TAILQ_FIRST(&ch->outstanding_io);
174 		TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
175 		ch->outstanding_cnt--;
176 		spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_SUCCESS);
177 		ch->avail_cnt++;
178 		num_completed++;
179 	}
180 	spdk_put_io_channel(_ch);
181 	return num_completed;
182 }
183 
184 static bool
185 stub_io_type_supported(void *ctx, enum spdk_bdev_io_type type)
186 {
187 	return true;
188 }
189 
190 static struct spdk_bdev_fn_table fn_table = {
191 	.get_io_channel =	stub_get_io_channel,
192 	.destruct =		stub_destruct,
193 	.submit_request =	stub_submit_request,
194 	.io_type_supported =	stub_io_type_supported,
195 };
196 
197 struct spdk_bdev_module bdev_ut_if;
198 
199 static int
200 module_init(void)
201 {
202 	spdk_bdev_module_init_done(&bdev_ut_if);
203 	return 0;
204 }
205 
206 static void
207 module_fini(void)
208 {
209 }
210 
211 static void
212 init_complete(void)
213 {
214 	g_init_complete_called = true;
215 }
216 
217 static void
218 fini_start(void)
219 {
220 	g_fini_start_called = true;
221 }
222 
223 struct spdk_bdev_module bdev_ut_if = {
224 	.name = "bdev_ut",
225 	.module_init = module_init,
226 	.module_fini = module_fini,
227 	.async_init = true,
228 	.init_complete = init_complete,
229 	.fini_start = fini_start,
230 };
231 
232 SPDK_BDEV_MODULE_REGISTER(bdev_ut, &bdev_ut_if)
233 
234 static void
235 register_bdev(struct ut_bdev *ut_bdev, char *name, void *io_target)
236 {
237 	memset(ut_bdev, 0, sizeof(*ut_bdev));
238 
239 	ut_bdev->io_target = io_target;
240 	ut_bdev->bdev.ctxt = ut_bdev;
241 	ut_bdev->bdev.name = name;
242 	ut_bdev->bdev.fn_table = &fn_table;
243 	ut_bdev->bdev.module = &bdev_ut_if;
244 	ut_bdev->bdev.blocklen = 4096;
245 	ut_bdev->bdev.blockcnt = 1024;
246 
247 	spdk_bdev_register(&ut_bdev->bdev);
248 }
249 
250 static void
251 unregister_bdev(struct ut_bdev *ut_bdev)
252 {
253 	/* Handle any deferred messages. */
254 	poll_threads();
255 	spdk_bdev_unregister(&ut_bdev->bdev, NULL, NULL);
256 	/* Handle the async bdev unregister. */
257 	poll_threads();
258 }
259 
260 static void
261 bdev_init_cb(void *done, int rc)
262 {
263 	CU_ASSERT(rc == 0);
264 	*(bool *)done = true;
265 }
266 
267 static void
268 _bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
269 	       void *event_ctx)
270 {
271 	switch (type) {
272 	case SPDK_BDEV_EVENT_REMOVE:
273 		if (event_ctx != NULL) {
274 			*(bool *)event_ctx = true;
275 		}
276 		break;
277 	case SPDK_BDEV_EVENT_RESIZE:
278 		if (event_ctx != NULL) {
279 			*(int *)event_ctx += 1;
280 		}
281 		break;
282 	default:
283 		CU_ASSERT(false);
284 		break;
285 	}
286 }
287 
288 static void
289 setup_test(void)
290 {
291 	bool done = false;
292 	int rc;
293 
294 	allocate_cores(BDEV_UT_NUM_THREADS);
295 	allocate_threads(BDEV_UT_NUM_THREADS);
296 	set_thread(0);
297 
298 	rc = spdk_iobuf_initialize();
299 	CU_ASSERT(rc == 0);
300 	spdk_bdev_initialize(bdev_init_cb, &done);
301 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
302 				sizeof(struct ut_bdev_channel), NULL);
303 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
304 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
305 }
306 
307 static void
308 finish_cb(void *cb_arg)
309 {
310 	g_teardown_done = true;
311 }
312 
313 static void
314 teardown_test(void)
315 {
316 	set_thread(0);
317 	g_teardown_done = false;
318 	spdk_bdev_close(g_desc);
319 	g_desc = NULL;
320 	unregister_bdev(&g_bdev);
321 	spdk_io_device_unregister(&g_io_device, NULL);
322 	spdk_bdev_finish(finish_cb, NULL);
323 	spdk_iobuf_finish(finish_cb, NULL);
324 	poll_threads();
325 	memset(&g_bdev, 0, sizeof(g_bdev));
326 	CU_ASSERT(g_teardown_done == true);
327 	g_teardown_done = false;
328 	free_threads();
329 	free_cores();
330 }
331 
332 static uint32_t
333 bdev_io_tailq_cnt(bdev_io_tailq_t *tailq)
334 {
335 	struct spdk_bdev_io *io;
336 	uint32_t cnt = 0;
337 
338 	TAILQ_FOREACH(io, tailq, internal.link) {
339 		cnt++;
340 	}
341 
342 	return cnt;
343 }
344 
345 static void
346 basic(void)
347 {
348 	g_init_complete_called = false;
349 	setup_test();
350 	CU_ASSERT(g_init_complete_called == true);
351 
352 	set_thread(0);
353 
354 	g_get_io_channel = false;
355 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
356 	CU_ASSERT(g_ut_threads[0].ch == NULL);
357 
358 	g_get_io_channel = true;
359 	g_create_ch = false;
360 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
361 	CU_ASSERT(g_ut_threads[0].ch == NULL);
362 
363 	g_get_io_channel = true;
364 	g_create_ch = true;
365 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
366 	CU_ASSERT(g_ut_threads[0].ch != NULL);
367 	spdk_put_io_channel(g_ut_threads[0].ch);
368 
369 	g_fini_start_called = false;
370 	teardown_test();
371 	CU_ASSERT(g_fini_start_called == true);
372 }
373 
374 static void
375 _bdev_unregistered(void *done, int rc)
376 {
377 	CU_ASSERT(rc == 0);
378 	*(bool *)done = true;
379 }
380 
381 static void
382 unregister_and_close(void)
383 {
384 	bool done, remove_notify;
385 	struct spdk_bdev_desc *desc = NULL;
386 
387 	setup_test();
388 	set_thread(0);
389 
390 	/* setup_test() automatically opens the bdev,
391 	 * but this test needs to do that in a different
392 	 * way. */
393 	spdk_bdev_close(g_desc);
394 	poll_threads();
395 
396 	/* Try hotremoving a bdev with descriptors which don't provide
397 	 * any context to the notification callback */
398 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &desc);
399 	SPDK_CU_ASSERT_FATAL(desc != NULL);
400 
401 	/* There is an open descriptor on the device. Unregister it,
402 	 * which can't proceed until the descriptor is closed. */
403 	done = false;
404 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
405 
406 	/* Poll the threads to allow all events to be processed */
407 	poll_threads();
408 
409 	/* Make sure the bdev was not unregistered. We still have a
410 	 * descriptor open */
411 	CU_ASSERT(done == false);
412 
413 	spdk_bdev_close(desc);
414 	poll_threads();
415 	desc = NULL;
416 
417 	/* The unregister should have completed */
418 	CU_ASSERT(done == true);
419 
420 
421 	/* Register the bdev again */
422 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
423 
424 	remove_notify = false;
425 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, &remove_notify, &desc);
426 	SPDK_CU_ASSERT_FATAL(desc != NULL);
427 	CU_ASSERT(remove_notify == false);
428 
429 	/* There is an open descriptor on the device. Unregister it,
430 	 * which can't proceed until the descriptor is closed. */
431 	done = false;
432 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
433 	/* No polling has occurred, so neither of these should execute */
434 	CU_ASSERT(remove_notify == false);
435 	CU_ASSERT(done == false);
436 
437 	/* Prior to the unregister completing, close the descriptor */
438 	spdk_bdev_close(desc);
439 
440 	/* Poll the threads to allow all events to be processed */
441 	poll_threads();
442 
443 	/* Remove notify should not have been called because the
444 	 * descriptor is already closed. */
445 	CU_ASSERT(remove_notify == false);
446 
447 	/* The unregister should have completed */
448 	CU_ASSERT(done == true);
449 
450 	/* Restore the original g_bdev so that we can use teardown_test(). */
451 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
452 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
453 	teardown_test();
454 }
455 
456 static void
457 unregister_and_close_different_threads(void)
458 {
459 	bool done;
460 	struct spdk_bdev_desc *desc = NULL;
461 
462 	setup_test();
463 	set_thread(0);
464 
465 	/* setup_test() automatically opens the bdev,
466 	 * but this test needs to do that in a different
467 	 * way. */
468 	spdk_bdev_close(g_desc);
469 	poll_threads();
470 
471 	set_thread(1);
472 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &desc);
473 	SPDK_CU_ASSERT_FATAL(desc != NULL);
474 	done = false;
475 
476 	set_thread(0);
477 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
478 
479 	/* Poll the threads to allow all events to be processed */
480 	poll_threads();
481 
482 	/* Make sure the bdev was not unregistered. We still have a
483 	 * descriptor open */
484 	CU_ASSERT(done == false);
485 
486 	/* Close the descriptor on thread 1.  Poll the thread and confirm the
487 	 * unregister did not complete, since it was unregistered on thread 0.
488 	 */
489 	set_thread(1);
490 	spdk_bdev_close(desc);
491 	poll_thread(1);
492 	CU_ASSERT(done == false);
493 
494 	/* Now poll thread 0 and confirm the unregister completed. */
495 	set_thread(0);
496 	poll_thread(0);
497 	CU_ASSERT(done == true);
498 
499 	/* Restore the original g_bdev so that we can use teardown_test(). */
500 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
501 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
502 	teardown_test();
503 }
504 
505 static void
506 reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
507 {
508 	bool *done = cb_arg;
509 
510 	CU_ASSERT(success == true);
511 	*done = true;
512 	spdk_bdev_free_io(bdev_io);
513 }
514 
515 static void
516 put_channel_during_reset(void)
517 {
518 	struct spdk_io_channel *io_ch;
519 	bool done = false;
520 
521 	setup_test();
522 
523 	set_thread(0);
524 	io_ch = spdk_bdev_get_io_channel(g_desc);
525 	CU_ASSERT(io_ch != NULL);
526 
527 	/*
528 	 * Start a reset, but then put the I/O channel before
529 	 *  the deferred messages for the reset get a chance to
530 	 *  execute.
531 	 */
532 	spdk_bdev_reset(g_desc, io_ch, reset_done, &done);
533 	spdk_put_io_channel(io_ch);
534 	poll_threads();
535 	stub_complete_io(g_bdev.io_target, 0);
536 
537 	teardown_test();
538 }
539 
540 static void
541 aborted_reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
542 {
543 	enum spdk_bdev_io_status *status = cb_arg;
544 
545 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
546 	spdk_bdev_free_io(bdev_io);
547 }
548 
549 static void io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
550 
551 static void
552 aborted_reset(void)
553 {
554 	struct spdk_io_channel *io_ch[2];
555 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
556 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
557 
558 	setup_test();
559 
560 	set_thread(0);
561 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
562 	CU_ASSERT(io_ch[0] != NULL);
563 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
564 	poll_threads();
565 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
566 
567 	/*
568 	 * First reset has been submitted on ch0.  Now submit a second
569 	 *  reset on ch1 which will get queued since there is already a
570 	 *  reset in progress.
571 	 */
572 	set_thread(1);
573 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
574 	CU_ASSERT(io_ch[1] != NULL);
575 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
576 	poll_threads();
577 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
578 
579 	/*
580 	 * Now destroy ch1.  This will abort the queued reset.  Check that
581 	 *  the second reset was completed with failed status.  Also check
582 	 *  that bdev->internal.reset_in_progress != NULL, since the
583 	 *  original reset has not been completed yet.  This ensures that
584 	 *  the bdev code is correctly noticing that the failed reset is
585 	 *  *not* the one that had been submitted to the bdev module.
586 	 */
587 	set_thread(1);
588 	spdk_put_io_channel(io_ch[1]);
589 	poll_threads();
590 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_FAILED);
591 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
592 
593 	/*
594 	 * Now complete the first reset, verify that it completed with SUCCESS
595 	 *  status and that bdev->internal.reset_in_progress is also set back to NULL.
596 	 */
597 	set_thread(0);
598 	spdk_put_io_channel(io_ch[0]);
599 	stub_complete_io(g_bdev.io_target, 0);
600 	poll_threads();
601 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
602 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
603 
604 	teardown_test();
605 }
606 
607 static void
608 aborted_reset_no_outstanding_io(void)
609 {
610 	struct spdk_io_channel *io_ch[2];
611 	struct spdk_bdev_channel *bdev_ch[2];
612 	struct spdk_bdev *bdev[2];
613 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
614 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
615 
616 	setup_test();
617 
618 	/*
619 	 * This time we test the reset without any outstanding IO
620 	 * present on the bdev channel, so both resets should finish
621 	 * immediately.
622 	 */
623 
624 	set_thread(0);
625 	/* Set reset_io_drain_timeout to allow bdev
626 	 * reset to stay pending until we call abort. */
627 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
628 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
629 	bdev[0] = bdev_ch[0]->bdev;
630 	bdev[0]->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
631 	CU_ASSERT(io_ch[0] != NULL);
632 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
633 	poll_threads();
634 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
635 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
636 	spdk_put_io_channel(io_ch[0]);
637 
638 	set_thread(1);
639 	/* Set reset_io_drain_timeout to allow bdev
640 	 * reset to stay pending until we call abort. */
641 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
642 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
643 	bdev[1] = bdev_ch[1]->bdev;
644 	bdev[1]->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
645 	CU_ASSERT(io_ch[1] != NULL);
646 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
647 	poll_threads();
648 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
649 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
650 	spdk_put_io_channel(io_ch[1]);
651 
652 	stub_complete_io(g_bdev.io_target, 0);
653 	poll_threads();
654 
655 	teardown_test();
656 }
657 
658 
659 static void
660 io_during_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
661 {
662 	enum spdk_bdev_io_status *status = cb_arg;
663 
664 	*status = bdev_io->internal.status;
665 	spdk_bdev_free_io(bdev_io);
666 }
667 
668 static void
669 io_during_reset(void)
670 {
671 	struct spdk_io_channel *io_ch[2];
672 	struct spdk_bdev_channel *bdev_ch[2];
673 	enum spdk_bdev_io_status status0, status1, status_reset;
674 	int rc;
675 
676 	setup_test();
677 
678 	/*
679 	 * First test normal case - submit an I/O on each of two channels (with no resets)
680 	 *  and verify they complete successfully.
681 	 */
682 	set_thread(0);
683 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
684 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
685 	CU_ASSERT(bdev_ch[0]->flags == 0);
686 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
687 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
688 	CU_ASSERT(rc == 0);
689 
690 	set_thread(1);
691 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
692 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
693 	CU_ASSERT(bdev_ch[1]->flags == 0);
694 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
695 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
696 	CU_ASSERT(rc == 0);
697 
698 	poll_threads();
699 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
700 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
701 
702 	set_thread(0);
703 	stub_complete_io(g_bdev.io_target, 0);
704 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
705 
706 	set_thread(1);
707 	stub_complete_io(g_bdev.io_target, 0);
708 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
709 
710 	/*
711 	 * Now submit a reset, and leave it pending while we submit I/O on two different
712 	 *  channels.  These I/O should be failed by the bdev layer since the reset is in
713 	 *  progress.
714 	 */
715 	set_thread(0);
716 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
717 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &status_reset);
718 	CU_ASSERT(rc == 0);
719 
720 	CU_ASSERT(bdev_ch[0]->flags == 0);
721 	CU_ASSERT(bdev_ch[1]->flags == 0);
722 	poll_threads();
723 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_RESET_IN_PROGRESS);
724 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_RESET_IN_PROGRESS);
725 
726 	set_thread(0);
727 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
728 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
729 	CU_ASSERT(rc == 0);
730 
731 	set_thread(1);
732 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
733 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
734 	CU_ASSERT(rc == 0);
735 
736 	/*
737 	 * A reset is in progress so these read I/O should complete with aborted.  Note that we
738 	 *  need to poll_threads() since I/O completed inline have their completion deferred.
739 	 */
740 	poll_threads();
741 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
742 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
743 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
744 
745 	/*
746 	 * Complete the reset
747 	 */
748 	set_thread(0);
749 	stub_complete_io(g_bdev.io_target, 0);
750 
751 	/*
752 	 * Only poll thread 0. We should not get a completion.
753 	 */
754 	poll_thread(0);
755 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
756 
757 	/*
758 	 * Poll both thread 0 and 1 so the messages can propagate and we
759 	 * get a completion.
760 	 */
761 	poll_threads();
762 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
763 
764 	spdk_put_io_channel(io_ch[0]);
765 	set_thread(1);
766 	spdk_put_io_channel(io_ch[1]);
767 	poll_threads();
768 
769 	teardown_test();
770 }
771 
772 static uint32_t
773 count_queued_resets(void *io_target)
774 {
775 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
776 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
777 	struct spdk_bdev_io *io;
778 	uint32_t submitted_resets = 0;
779 
780 	TAILQ_FOREACH(io, &ch->outstanding_io, module_link) {
781 		if (io->type == SPDK_BDEV_IO_TYPE_RESET) {
782 			submitted_resets++;
783 		}
784 	}
785 
786 	spdk_put_io_channel(_ch);
787 
788 	return submitted_resets;
789 }
790 
791 static void
792 reset_completions(void)
793 {
794 	struct spdk_io_channel *io_ch;
795 	struct spdk_bdev_channel *bdev_ch;
796 	struct spdk_bdev *bdev;
797 	enum spdk_bdev_io_status status0, status_reset;
798 	int rc, iter;
799 
800 	setup_test();
801 
802 	/* This test covers four test cases:
803 	 * 1) reset_io_drain_timeout of a bdev is greater than 0
804 	 * 2) No outstandind IO are present on any bdev channel
805 	 * 3) Outstanding IO finish during bdev reset
806 	 * 4) Outstanding IO do not finish before reset is done waiting
807 	 *    for them.
808 	 *
809 	 * Above conditions mainly affect the timing of bdev reset completion
810 	 * and whether a reset should be skipped via spdk_bdev_io_complete()
811 	 * or sent down to the underlying bdev module via bdev_io_submit_reset(). */
812 
813 	/* Test preparation */
814 	set_thread(0);
815 	io_ch = spdk_bdev_get_io_channel(g_desc);
816 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
817 	CU_ASSERT(bdev_ch->flags == 0);
818 
819 
820 	/* Test case 1) reset_io_drain_timeout set to 0. Reset should be sent down immediately. */
821 	bdev = &g_bdev.bdev;
822 	bdev->reset_io_drain_timeout = 0;
823 
824 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
825 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
826 	CU_ASSERT(rc == 0);
827 	poll_threads();
828 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 1);
829 
830 	/* Call reset completion inside bdev module. */
831 	stub_complete_io(g_bdev.io_target, 0);
832 	poll_threads();
833 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
834 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
835 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
836 
837 
838 	/* Test case 2) no outstanding IO are present. Reset should perform one iteration over
839 	* channels and then be skipped. */
840 	bdev->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
841 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
842 
843 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
844 	CU_ASSERT(rc == 0);
845 	poll_threads();
846 	/* Reset was never submitted to the bdev module. */
847 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
848 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
849 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
850 
851 
852 	/* Test case 3) outstanding IO finish during bdev reset procedure. Reset should initiate
853 	* wait poller to check for IO completions every second, until reset_io_drain_timeout is
854 	* reached, but finish earlier than this threshold. */
855 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
856 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
857 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, io_during_io_done, &status0);
858 	CU_ASSERT(rc == 0);
859 
860 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
861 	CU_ASSERT(rc == 0);
862 	poll_threads();
863 	/* The reset just started and should not have been submitted yet. */
864 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
865 
866 	poll_threads();
867 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
868 	/* Let the poller wait for about half the time then complete outstanding IO. */
869 	for (iter = 0; iter < 2; iter++) {
870 		/* Reset is still processing and not submitted at this point. */
871 		CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
872 		spdk_delay_us(1000 * 1000);
873 		poll_threads();
874 		poll_threads();
875 	}
876 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
877 	stub_complete_io(g_bdev.io_target, 0);
878 	poll_threads();
879 	spdk_delay_us(BDEV_RESET_CHECK_OUTSTANDING_IO_PERIOD);
880 	poll_threads();
881 	poll_threads();
882 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
883 	/* Sending reset to the bdev module has been skipped. */
884 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
885 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
886 
887 
888 	/* Test case 4) outstanding IO are still present after reset_io_drain_timeout
889 	* seconds have passed. */
890 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
891 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
892 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, io_during_io_done, &status0);
893 	CU_ASSERT(rc == 0);
894 
895 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
896 	CU_ASSERT(rc == 0);
897 	poll_threads();
898 	/* The reset just started and should not have been submitted yet. */
899 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
900 
901 	poll_threads();
902 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
903 	/* Let the poller wait for reset_io_drain_timeout seconds. */
904 	for (iter = 0; iter < bdev->reset_io_drain_timeout; iter++) {
905 		CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
906 		spdk_delay_us(BDEV_RESET_CHECK_OUTSTANDING_IO_PERIOD);
907 		poll_threads();
908 		poll_threads();
909 	}
910 
911 	/* After timing out, the reset should have been sent to the module. */
912 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 1);
913 	/* Complete reset submitted to the module and the read IO. */
914 	stub_complete_io(g_bdev.io_target, 0);
915 	poll_threads();
916 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
917 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
918 
919 
920 	/* Destroy the channel and end the test. */
921 	spdk_put_io_channel(io_ch);
922 	poll_threads();
923 
924 	teardown_test();
925 }
926 
927 
928 static void
929 basic_qos(void)
930 {
931 	struct spdk_io_channel *io_ch[2];
932 	struct spdk_bdev_channel *bdev_ch[2];
933 	struct spdk_bdev *bdev;
934 	enum spdk_bdev_io_status status, abort_status;
935 	int rc;
936 
937 	setup_test();
938 
939 	/* Enable QoS */
940 	bdev = &g_bdev.bdev;
941 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
942 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
943 	TAILQ_INIT(&bdev->internal.qos->queued);
944 	/*
945 	 * Enable read/write IOPS, read only byte per second and
946 	 * read/write byte per second rate limits.
947 	 * In this case, all rate limits will take equal effect.
948 	 */
949 	/* 2000 read/write I/O per second, or 2 per millisecond */
950 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
951 	/* 8K read/write byte per millisecond with 4K block size */
952 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
953 	/* 8K read only byte per millisecond with 4K block size */
954 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 8192000;
955 
956 	g_get_io_channel = true;
957 
958 	set_thread(0);
959 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
960 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
961 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
962 
963 	set_thread(1);
964 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
965 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
966 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
967 
968 	/*
969 	 * Send an I/O on thread 0, which is where the QoS thread is running.
970 	 */
971 	set_thread(0);
972 	status = SPDK_BDEV_IO_STATUS_PENDING;
973 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
974 	CU_ASSERT(rc == 0);
975 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
976 	poll_threads();
977 	stub_complete_io(g_bdev.io_target, 0);
978 	poll_threads();
979 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
980 
981 	/* Send an I/O on thread 1. The QoS thread is not running here. */
982 	status = SPDK_BDEV_IO_STATUS_PENDING;
983 	set_thread(1);
984 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
985 	CU_ASSERT(rc == 0);
986 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
987 	poll_threads();
988 	/* Complete I/O on thread 0. This should not complete the I/O we submitted. */
989 	set_thread(0);
990 	stub_complete_io(g_bdev.io_target, 0);
991 	poll_threads();
992 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
993 	/* Now complete I/O on original thread 1. */
994 	set_thread(1);
995 	poll_threads();
996 	stub_complete_io(g_bdev.io_target, 0);
997 	poll_threads();
998 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
999 
1000 	/* Reset rate limit for the next test cases. */
1001 	spdk_delay_us(SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1002 	poll_threads();
1003 
1004 	/*
1005 	 * Test abort request when QoS is enabled.
1006 	 */
1007 
1008 	/* Send an I/O on thread 0, which is where the QoS thread is running. */
1009 	set_thread(0);
1010 	status = SPDK_BDEV_IO_STATUS_PENDING;
1011 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
1012 	CU_ASSERT(rc == 0);
1013 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
1014 	/* Send an abort to the I/O on the same thread. */
1015 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
1016 	rc = spdk_bdev_abort(g_desc, io_ch[0], &status, io_during_io_done, &abort_status);
1017 	CU_ASSERT(rc == 0);
1018 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
1019 	poll_threads();
1020 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1021 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
1022 
1023 	/* Send an I/O on thread 1. The QoS thread is not running here. */
1024 	status = SPDK_BDEV_IO_STATUS_PENDING;
1025 	set_thread(1);
1026 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
1027 	CU_ASSERT(rc == 0);
1028 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
1029 	poll_threads();
1030 	/* Send an abort to the I/O on the same thread. */
1031 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
1032 	rc = spdk_bdev_abort(g_desc, io_ch[1], &status, io_during_io_done, &abort_status);
1033 	CU_ASSERT(rc == 0);
1034 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
1035 	poll_threads();
1036 	/* Complete the I/O with failure and the abort with success on thread 1. */
1037 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1038 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
1039 
1040 	set_thread(0);
1041 
1042 	/*
1043 	 * Close the descriptor only, which should stop the qos channel as
1044 	 * the last descriptor removed.
1045 	 */
1046 	spdk_bdev_close(g_desc);
1047 	poll_threads();
1048 	CU_ASSERT(bdev->internal.qos->ch == NULL);
1049 
1050 	/*
1051 	 * Open the bdev again which shall setup the qos channel as the
1052 	 * channels are valid.
1053 	 */
1054 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
1055 	poll_threads();
1056 	CU_ASSERT(bdev->internal.qos->ch != NULL);
1057 
1058 	/* Tear down the channels */
1059 	set_thread(0);
1060 	spdk_put_io_channel(io_ch[0]);
1061 	set_thread(1);
1062 	spdk_put_io_channel(io_ch[1]);
1063 	poll_threads();
1064 	set_thread(0);
1065 
1066 	/* Close the descriptor, which should stop the qos channel */
1067 	spdk_bdev_close(g_desc);
1068 	poll_threads();
1069 	CU_ASSERT(bdev->internal.qos->ch == NULL);
1070 
1071 	/* Open the bdev again, no qos channel setup without valid channels. */
1072 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
1073 	poll_threads();
1074 	CU_ASSERT(bdev->internal.qos->ch == NULL);
1075 
1076 	/* Create the channels in reverse order. */
1077 	set_thread(1);
1078 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1079 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1080 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
1081 
1082 	set_thread(0);
1083 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1084 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1085 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
1086 
1087 	/* Confirm that the qos thread is now thread 1 */
1088 	CU_ASSERT(bdev->internal.qos->ch == bdev_ch[1]);
1089 
1090 	/* Tear down the channels */
1091 	set_thread(0);
1092 	spdk_put_io_channel(io_ch[0]);
1093 	set_thread(1);
1094 	spdk_put_io_channel(io_ch[1]);
1095 	poll_threads();
1096 
1097 	set_thread(0);
1098 
1099 	teardown_test();
1100 }
1101 
1102 static void
1103 io_during_qos_queue(void)
1104 {
1105 	struct spdk_io_channel *io_ch[2];
1106 	struct spdk_bdev_channel *bdev_ch[2];
1107 	struct spdk_bdev *bdev;
1108 	enum spdk_bdev_io_status status0, status1, status2;
1109 	int rc;
1110 
1111 	setup_test();
1112 	MOCK_SET(spdk_get_ticks, 0);
1113 
1114 	/* Enable QoS */
1115 	bdev = &g_bdev.bdev;
1116 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
1117 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
1118 	TAILQ_INIT(&bdev->internal.qos->queued);
1119 	/*
1120 	 * Enable read/write IOPS, read only byte per sec, write only
1121 	 * byte per sec and read/write byte per sec rate limits.
1122 	 * In this case, both read only and write only byte per sec
1123 	 * rate limit will take effect.
1124 	 */
1125 	/* 4000 read/write I/O per second, or 4 per millisecond */
1126 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 4000;
1127 	/* 8K byte per millisecond with 4K block size */
1128 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
1129 	/* 4K byte per millisecond with 4K block size */
1130 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 4096000;
1131 	/* 4K byte per millisecond with 4K block size */
1132 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 4096000;
1133 
1134 	g_get_io_channel = true;
1135 
1136 	/* Create channels */
1137 	set_thread(0);
1138 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1139 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1140 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
1141 
1142 	set_thread(1);
1143 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1144 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1145 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
1146 
1147 	/* Send two read I/Os */
1148 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
1149 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
1150 	CU_ASSERT(rc == 0);
1151 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
1152 	set_thread(0);
1153 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
1154 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
1155 	CU_ASSERT(rc == 0);
1156 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
1157 	/* Send one write I/O */
1158 	status2 = SPDK_BDEV_IO_STATUS_PENDING;
1159 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status2);
1160 	CU_ASSERT(rc == 0);
1161 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_PENDING);
1162 
1163 	/* Complete any I/O that arrived at the disk */
1164 	poll_threads();
1165 	set_thread(1);
1166 	stub_complete_io(g_bdev.io_target, 0);
1167 	set_thread(0);
1168 	stub_complete_io(g_bdev.io_target, 0);
1169 	poll_threads();
1170 
1171 	/* Only one of the two read I/Os should complete. (logical XOR) */
1172 	if (status0 == SPDK_BDEV_IO_STATUS_SUCCESS) {
1173 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
1174 	} else {
1175 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
1176 	}
1177 	/* The write I/O should complete. */
1178 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
1179 
1180 	/* Advance in time by a millisecond */
1181 	spdk_delay_us(1000);
1182 
1183 	/* Complete more I/O */
1184 	poll_threads();
1185 	set_thread(1);
1186 	stub_complete_io(g_bdev.io_target, 0);
1187 	set_thread(0);
1188 	stub_complete_io(g_bdev.io_target, 0);
1189 	poll_threads();
1190 
1191 	/* Now the second read I/O should be done */
1192 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
1193 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
1194 
1195 	/* Tear down the channels */
1196 	set_thread(1);
1197 	spdk_put_io_channel(io_ch[1]);
1198 	set_thread(0);
1199 	spdk_put_io_channel(io_ch[0]);
1200 	poll_threads();
1201 
1202 	teardown_test();
1203 }
1204 
1205 static void
1206 io_during_qos_reset(void)
1207 {
1208 	struct spdk_io_channel *io_ch[2];
1209 	struct spdk_bdev_channel *bdev_ch[2];
1210 	struct spdk_bdev *bdev;
1211 	enum spdk_bdev_io_status status0, status1, reset_status;
1212 	int rc;
1213 
1214 	setup_test();
1215 	MOCK_SET(spdk_get_ticks, 0);
1216 
1217 	/* Enable QoS */
1218 	bdev = &g_bdev.bdev;
1219 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
1220 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
1221 	TAILQ_INIT(&bdev->internal.qos->queued);
1222 	/*
1223 	 * Enable read/write IOPS, write only byte per sec and
1224 	 * read/write byte per second rate limits.
1225 	 * In this case, read/write byte per second rate limit will
1226 	 * take effect first.
1227 	 */
1228 	/* 2000 read/write I/O per second, or 2 per millisecond */
1229 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
1230 	/* 4K byte per millisecond with 4K block size */
1231 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 4096000;
1232 	/* 8K byte per millisecond with 4K block size */
1233 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 8192000;
1234 
1235 	g_get_io_channel = true;
1236 
1237 	/* Create channels */
1238 	set_thread(0);
1239 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1240 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1241 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
1242 
1243 	set_thread(1);
1244 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1245 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1246 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
1247 
1248 	/* Send two I/O. One of these gets queued by QoS. The other is sitting at the disk. */
1249 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
1250 	rc = spdk_bdev_write_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
1251 	CU_ASSERT(rc == 0);
1252 	set_thread(0);
1253 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
1254 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
1255 	CU_ASSERT(rc == 0);
1256 
1257 	poll_threads();
1258 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
1259 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
1260 
1261 	/* Reset the bdev. */
1262 	reset_status = SPDK_BDEV_IO_STATUS_PENDING;
1263 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &reset_status);
1264 	CU_ASSERT(rc == 0);
1265 
1266 	/* Complete any I/O that arrived at the disk */
1267 	poll_threads();
1268 	set_thread(1);
1269 	stub_complete_io(g_bdev.io_target, 0);
1270 	set_thread(0);
1271 	stub_complete_io(g_bdev.io_target, 0);
1272 	poll_threads();
1273 
1274 	CU_ASSERT(reset_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1275 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
1276 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
1277 
1278 	/* Tear down the channels */
1279 	set_thread(1);
1280 	spdk_put_io_channel(io_ch[1]);
1281 	set_thread(0);
1282 	spdk_put_io_channel(io_ch[0]);
1283 	poll_threads();
1284 
1285 	teardown_test();
1286 }
1287 
1288 static void
1289 enomem_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1290 {
1291 	enum spdk_bdev_io_status *status = cb_arg;
1292 
1293 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1294 	spdk_bdev_free_io(bdev_io);
1295 }
1296 
1297 static void
1298 enomem(void)
1299 {
1300 	struct spdk_io_channel *io_ch;
1301 	struct spdk_bdev_channel *bdev_ch;
1302 	struct spdk_bdev_shared_resource *shared_resource;
1303 	struct ut_bdev_channel *ut_ch;
1304 	const uint32_t IO_ARRAY_SIZE = 64;
1305 	const uint32_t AVAIL = 20;
1306 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE], status_reset;
1307 	uint32_t nomem_cnt, i;
1308 	struct spdk_bdev_io *first_io;
1309 	int rc;
1310 
1311 	setup_test();
1312 
1313 	set_thread(0);
1314 	io_ch = spdk_bdev_get_io_channel(g_desc);
1315 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1316 	shared_resource = bdev_ch->shared_resource;
1317 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1318 	ut_ch->avail_cnt = AVAIL;
1319 
1320 	/* First submit a number of IOs equal to what the channel can support. */
1321 	for (i = 0; i < AVAIL; i++) {
1322 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1323 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1324 		CU_ASSERT(rc == 0);
1325 	}
1326 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1327 
1328 	/*
1329 	 * Next, submit one additional I/O.  This one should fail with ENOMEM and then go onto
1330 	 *  the enomem_io list.
1331 	 */
1332 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1333 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1334 	CU_ASSERT(rc == 0);
1335 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1336 	first_io = TAILQ_FIRST(&shared_resource->nomem_io);
1337 
1338 	/*
1339 	 * Now submit a bunch more I/O.  These should all fail with ENOMEM and get queued behind
1340 	 *  the first_io above.
1341 	 */
1342 	for (i = AVAIL + 1; i < IO_ARRAY_SIZE; i++) {
1343 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1344 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1345 		CU_ASSERT(rc == 0);
1346 	}
1347 
1348 	/* Assert that first_io is still at the head of the list. */
1349 	CU_ASSERT(TAILQ_FIRST(&shared_resource->nomem_io) == first_io);
1350 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == (IO_ARRAY_SIZE - AVAIL));
1351 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1352 	CU_ASSERT(shared_resource->nomem_threshold == (AVAIL - NOMEM_THRESHOLD_COUNT));
1353 
1354 	/*
1355 	 * Complete 1 I/O only.  The key check here is bdev_io_tailq_cnt - this should not have
1356 	 *  changed since completing just 1 I/O should not trigger retrying the queued nomem_io
1357 	 *  list.
1358 	 */
1359 	stub_complete_io(g_bdev.io_target, 1);
1360 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1361 
1362 	/*
1363 	 * Complete enough I/O to hit the nomem_threshold.  This should trigger retrying nomem_io,
1364 	 *  and we should see I/O get resubmitted to the test bdev module.
1365 	 */
1366 	stub_complete_io(g_bdev.io_target, NOMEM_THRESHOLD_COUNT - 1);
1367 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) < nomem_cnt);
1368 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1369 
1370 	/* Complete 1 I/O only.  This should not trigger retrying the queued nomem_io. */
1371 	stub_complete_io(g_bdev.io_target, 1);
1372 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1373 
1374 	/*
1375 	 * Send a reset and confirm that all I/O are completed, including the ones that
1376 	 *  were queued on the nomem_io list.
1377 	 */
1378 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
1379 	rc = spdk_bdev_reset(g_desc, io_ch, enomem_done, &status_reset);
1380 	poll_threads();
1381 	CU_ASSERT(rc == 0);
1382 	/* This will complete the reset. */
1383 	stub_complete_io(g_bdev.io_target, 0);
1384 
1385 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == 0);
1386 	CU_ASSERT(shared_resource->io_outstanding == 0);
1387 
1388 	spdk_put_io_channel(io_ch);
1389 	poll_threads();
1390 	teardown_test();
1391 }
1392 
1393 static void
1394 enomem_multi_bdev(void)
1395 {
1396 	struct spdk_io_channel *io_ch;
1397 	struct spdk_bdev_channel *bdev_ch;
1398 	struct spdk_bdev_shared_resource *shared_resource;
1399 	struct ut_bdev_channel *ut_ch;
1400 	const uint32_t IO_ARRAY_SIZE = 64;
1401 	const uint32_t AVAIL = 20;
1402 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1403 	uint32_t i;
1404 	struct ut_bdev *second_bdev;
1405 	struct spdk_bdev_desc *second_desc = NULL;
1406 	struct spdk_bdev_channel *second_bdev_ch;
1407 	struct spdk_io_channel *second_ch;
1408 	int rc;
1409 
1410 	setup_test();
1411 
1412 	/* Register second bdev with the same io_target  */
1413 	second_bdev = calloc(1, sizeof(*second_bdev));
1414 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1415 	register_bdev(second_bdev, "ut_bdev2", g_bdev.io_target);
1416 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1417 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1418 
1419 	set_thread(0);
1420 	io_ch = spdk_bdev_get_io_channel(g_desc);
1421 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1422 	shared_resource = bdev_ch->shared_resource;
1423 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1424 	ut_ch->avail_cnt = AVAIL;
1425 
1426 	second_ch = spdk_bdev_get_io_channel(second_desc);
1427 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1428 	SPDK_CU_ASSERT_FATAL(shared_resource == second_bdev_ch->shared_resource);
1429 
1430 	/* Saturate io_target through bdev A. */
1431 	for (i = 0; i < AVAIL; i++) {
1432 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1433 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1434 		CU_ASSERT(rc == 0);
1435 	}
1436 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1437 
1438 	/*
1439 	 * Now submit I/O through the second bdev. This should fail with ENOMEM
1440 	 * and then go onto the nomem_io list.
1441 	 */
1442 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1443 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1444 	CU_ASSERT(rc == 0);
1445 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1446 
1447 	/* Complete first bdev's I/O. This should retry sending second bdev's nomem_io */
1448 	stub_complete_io(g_bdev.io_target, AVAIL);
1449 
1450 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1451 	CU_ASSERT(shared_resource->io_outstanding == 1);
1452 
1453 	/* Now complete our retried I/O  */
1454 	stub_complete_io(g_bdev.io_target, 1);
1455 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1456 
1457 	spdk_put_io_channel(io_ch);
1458 	spdk_put_io_channel(second_ch);
1459 	spdk_bdev_close(second_desc);
1460 	unregister_bdev(second_bdev);
1461 	poll_threads();
1462 	free(second_bdev);
1463 	teardown_test();
1464 }
1465 
1466 static void
1467 enomem_multi_bdev_unregister(void)
1468 {
1469 	struct spdk_io_channel *io_ch;
1470 	struct spdk_bdev_channel *bdev_ch;
1471 	struct spdk_bdev_shared_resource *shared_resource;
1472 	struct ut_bdev_channel *ut_ch;
1473 	const uint32_t IO_ARRAY_SIZE = 64;
1474 	const uint32_t AVAIL = 20;
1475 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1476 	uint32_t i;
1477 	int rc;
1478 
1479 	setup_test();
1480 
1481 	set_thread(0);
1482 	io_ch = spdk_bdev_get_io_channel(g_desc);
1483 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1484 	shared_resource = bdev_ch->shared_resource;
1485 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1486 	ut_ch->avail_cnt = AVAIL;
1487 
1488 	/* Saturate io_target through the bdev. */
1489 	for (i = 0; i < AVAIL; i++) {
1490 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1491 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1492 		CU_ASSERT(rc == 0);
1493 	}
1494 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1495 
1496 	/*
1497 	 * Now submit I/O through the bdev. This should fail with ENOMEM
1498 	 * and then go onto the nomem_io list.
1499 	 */
1500 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1501 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1502 	CU_ASSERT(rc == 0);
1503 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1504 
1505 	/* Unregister the bdev to abort the IOs from nomem_io queue. */
1506 	unregister_bdev(&g_bdev);
1507 	CU_ASSERT(status[AVAIL] == SPDK_BDEV_IO_STATUS_FAILED);
1508 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1509 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == AVAIL);
1510 
1511 	/* Complete the bdev's I/O. */
1512 	stub_complete_io(g_bdev.io_target, AVAIL);
1513 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1514 
1515 	spdk_put_io_channel(io_ch);
1516 	poll_threads();
1517 	teardown_test();
1518 }
1519 
1520 static void
1521 enomem_multi_io_target(void)
1522 {
1523 	struct spdk_io_channel *io_ch;
1524 	struct spdk_bdev_channel *bdev_ch;
1525 	struct ut_bdev_channel *ut_ch;
1526 	const uint32_t IO_ARRAY_SIZE = 64;
1527 	const uint32_t AVAIL = 20;
1528 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1529 	uint32_t i;
1530 	int new_io_device;
1531 	struct ut_bdev *second_bdev;
1532 	struct spdk_bdev_desc *second_desc = NULL;
1533 	struct spdk_bdev_channel *second_bdev_ch;
1534 	struct spdk_io_channel *second_ch;
1535 	int rc;
1536 
1537 	setup_test();
1538 
1539 	/* Create new io_target and a second bdev using it */
1540 	spdk_io_device_register(&new_io_device, stub_create_ch, stub_destroy_ch,
1541 				sizeof(struct ut_bdev_channel), NULL);
1542 	second_bdev = calloc(1, sizeof(*second_bdev));
1543 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1544 	register_bdev(second_bdev, "ut_bdev2", &new_io_device);
1545 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1546 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1547 
1548 	set_thread(0);
1549 	io_ch = spdk_bdev_get_io_channel(g_desc);
1550 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1551 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1552 	ut_ch->avail_cnt = AVAIL;
1553 
1554 	/* Different io_target should imply a different shared_resource */
1555 	second_ch = spdk_bdev_get_io_channel(second_desc);
1556 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1557 	SPDK_CU_ASSERT_FATAL(bdev_ch->shared_resource != second_bdev_ch->shared_resource);
1558 
1559 	/* Saturate io_target through bdev A. */
1560 	for (i = 0; i < AVAIL; i++) {
1561 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1562 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1563 		CU_ASSERT(rc == 0);
1564 	}
1565 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1566 
1567 	/* Issue one more I/O to fill ENOMEM list. */
1568 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1569 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1570 	CU_ASSERT(rc == 0);
1571 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1572 
1573 	/*
1574 	 * Now submit I/O through the second bdev. This should go through and complete
1575 	 * successfully because we're using a different io_device underneath.
1576 	 */
1577 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1578 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1579 	CU_ASSERT(rc == 0);
1580 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&second_bdev_ch->shared_resource->nomem_io));
1581 	stub_complete_io(second_bdev->io_target, 1);
1582 
1583 	/* Cleanup; Complete outstanding I/O. */
1584 	stub_complete_io(g_bdev.io_target, AVAIL);
1585 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1586 	/* Complete the ENOMEM I/O */
1587 	stub_complete_io(g_bdev.io_target, 1);
1588 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1589 
1590 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1591 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1592 	spdk_put_io_channel(io_ch);
1593 	spdk_put_io_channel(second_ch);
1594 	spdk_bdev_close(second_desc);
1595 	unregister_bdev(second_bdev);
1596 	spdk_io_device_unregister(&new_io_device, NULL);
1597 	poll_threads();
1598 	free(second_bdev);
1599 	teardown_test();
1600 }
1601 
1602 static void
1603 qos_dynamic_enable_done(void *cb_arg, int status)
1604 {
1605 	int *rc = cb_arg;
1606 	*rc = status;
1607 }
1608 
1609 static void
1610 qos_dynamic_enable(void)
1611 {
1612 	struct spdk_io_channel *io_ch[2];
1613 	struct spdk_bdev_channel *bdev_ch[2];
1614 	struct spdk_bdev *bdev;
1615 	enum spdk_bdev_io_status bdev_io_status[2];
1616 	uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
1617 	int status, second_status, rc, i;
1618 
1619 	setup_test();
1620 	MOCK_SET(spdk_get_ticks, 0);
1621 
1622 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1623 		limits[i] = UINT64_MAX;
1624 	}
1625 
1626 	bdev = &g_bdev.bdev;
1627 
1628 	g_get_io_channel = true;
1629 
1630 	/* Create channels */
1631 	set_thread(0);
1632 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1633 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1634 	CU_ASSERT(bdev_ch[0]->flags == 0);
1635 
1636 	set_thread(1);
1637 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1638 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1639 	CU_ASSERT(bdev_ch[1]->flags == 0);
1640 
1641 	set_thread(0);
1642 
1643 	/*
1644 	 * Enable QoS: Read/Write IOPS, Read/Write byte,
1645 	 * Read only byte and Write only byte per second
1646 	 * rate limits.
1647 	 * More than 10 I/Os allowed per timeslice.
1648 	 */
1649 	status = -1;
1650 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1651 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 100;
1652 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 100;
1653 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 10;
1654 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1655 	poll_threads();
1656 	CU_ASSERT(status == 0);
1657 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1658 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1659 
1660 	/*
1661 	 * Submit and complete 10 I/O to fill the QoS allotment for this timeslice.
1662 	 * Additional I/O will then be queued.
1663 	 */
1664 	set_thread(0);
1665 	for (i = 0; i < 10; i++) {
1666 		bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1667 		rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1668 		CU_ASSERT(rc == 0);
1669 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1670 		poll_thread(0);
1671 		stub_complete_io(g_bdev.io_target, 0);
1672 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1673 	}
1674 
1675 	/*
1676 	 * Send two more I/O.  These I/O will be queued since the current timeslice allotment has been
1677 	 * filled already.  We want to test that when QoS is disabled that these two I/O:
1678 	 *  1) are not aborted
1679 	 *  2) are sent back to their original thread for resubmission
1680 	 */
1681 	bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1682 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1683 	CU_ASSERT(rc == 0);
1684 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1685 	set_thread(1);
1686 	bdev_io_status[1] = SPDK_BDEV_IO_STATUS_PENDING;
1687 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &bdev_io_status[1]);
1688 	CU_ASSERT(rc == 0);
1689 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1690 	poll_threads();
1691 
1692 	/*
1693 	 * Disable QoS: Read/Write IOPS, Read/Write byte,
1694 	 * Read only byte rate limits
1695 	 */
1696 	status = -1;
1697 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1698 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 0;
1699 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 0;
1700 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1701 	poll_threads();
1702 	CU_ASSERT(status == 0);
1703 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1704 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1705 
1706 	/* Disable QoS: Write only Byte per second rate limit */
1707 	status = -1;
1708 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 0;
1709 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1710 	poll_threads();
1711 	CU_ASSERT(status == 0);
1712 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1713 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1714 
1715 	/*
1716 	 * All I/O should have been resubmitted back on their original thread.  Complete
1717 	 *  all I/O on thread 0, and ensure that only the thread 0 I/O was completed.
1718 	 */
1719 	set_thread(0);
1720 	stub_complete_io(g_bdev.io_target, 0);
1721 	poll_threads();
1722 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1723 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1724 
1725 	/* Now complete all I/O on thread 1 and ensure the thread 1 I/O was completed. */
1726 	set_thread(1);
1727 	stub_complete_io(g_bdev.io_target, 0);
1728 	poll_threads();
1729 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_SUCCESS);
1730 
1731 	/* Disable QoS again */
1732 	status = -1;
1733 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1734 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1735 	poll_threads();
1736 	CU_ASSERT(status == 0); /* This should succeed */
1737 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1738 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1739 
1740 	/* Enable QoS on thread 0 */
1741 	status = -1;
1742 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1743 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1744 	poll_threads();
1745 	CU_ASSERT(status == 0);
1746 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1747 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1748 
1749 	/* Disable QoS on thread 1 */
1750 	set_thread(1);
1751 	status = -1;
1752 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1753 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1754 	/* Don't poll yet. This should leave the channels with QoS enabled */
1755 	CU_ASSERT(status == -1);
1756 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1757 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1758 
1759 	/* Enable QoS. This should immediately fail because the previous disable QoS hasn't completed. */
1760 	second_status = 0;
1761 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 10;
1762 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &second_status);
1763 	poll_threads();
1764 	CU_ASSERT(status == 0); /* The disable should succeed */
1765 	CU_ASSERT(second_status < 0); /* The enable should fail */
1766 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1767 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1768 
1769 	/* Enable QoS on thread 1. This should succeed now that the disable has completed. */
1770 	status = -1;
1771 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1772 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1773 	poll_threads();
1774 	CU_ASSERT(status == 0);
1775 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1776 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1777 
1778 	/* Tear down the channels */
1779 	set_thread(0);
1780 	spdk_put_io_channel(io_ch[0]);
1781 	set_thread(1);
1782 	spdk_put_io_channel(io_ch[1]);
1783 	poll_threads();
1784 
1785 	set_thread(0);
1786 	teardown_test();
1787 }
1788 
1789 static void
1790 histogram_status_cb(void *cb_arg, int status)
1791 {
1792 	g_status = status;
1793 }
1794 
1795 static void
1796 histogram_data_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
1797 {
1798 	g_status = status;
1799 	g_histogram = histogram;
1800 }
1801 
1802 static void
1803 histogram_io_count(void *ctx, uint64_t start, uint64_t end, uint64_t count,
1804 		   uint64_t total, uint64_t so_far)
1805 {
1806 	g_count += count;
1807 }
1808 
1809 static void
1810 bdev_histograms_mt(void)
1811 {
1812 	struct spdk_io_channel *ch[2];
1813 	struct spdk_histogram_data *histogram;
1814 	uint8_t buf[4096];
1815 	int status = false;
1816 	int rc;
1817 
1818 
1819 	setup_test();
1820 
1821 	set_thread(0);
1822 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1823 	CU_ASSERT(ch[0] != NULL);
1824 
1825 	set_thread(1);
1826 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1827 	CU_ASSERT(ch[1] != NULL);
1828 
1829 
1830 	/* Enable histogram */
1831 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, true);
1832 	poll_threads();
1833 	CU_ASSERT(g_status == 0);
1834 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1835 
1836 	/* Allocate histogram */
1837 	histogram = spdk_histogram_data_alloc();
1838 
1839 	/* Check if histogram is zeroed */
1840 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1841 	poll_threads();
1842 	CU_ASSERT(g_status == 0);
1843 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1844 
1845 	g_count = 0;
1846 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1847 
1848 	CU_ASSERT(g_count == 0);
1849 
1850 	set_thread(0);
1851 	rc = spdk_bdev_write_blocks(g_desc, ch[0], &buf, 0, 1, io_during_io_done, &status);
1852 	CU_ASSERT(rc == 0);
1853 
1854 	spdk_delay_us(10);
1855 	stub_complete_io(g_bdev.io_target, 1);
1856 	poll_threads();
1857 	CU_ASSERT(status == true);
1858 
1859 
1860 	set_thread(1);
1861 	rc = spdk_bdev_read_blocks(g_desc, ch[1], &buf, 0, 1, io_during_io_done, &status);
1862 	CU_ASSERT(rc == 0);
1863 
1864 	spdk_delay_us(10);
1865 	stub_complete_io(g_bdev.io_target, 1);
1866 	poll_threads();
1867 	CU_ASSERT(status == true);
1868 
1869 	set_thread(0);
1870 
1871 	/* Check if histogram gathered data from all I/O channels */
1872 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1873 	poll_threads();
1874 	CU_ASSERT(g_status == 0);
1875 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1876 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1877 
1878 	g_count = 0;
1879 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1880 	CU_ASSERT(g_count == 2);
1881 
1882 	/* Disable histogram */
1883 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, false);
1884 	poll_threads();
1885 	CU_ASSERT(g_status == 0);
1886 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == false);
1887 
1888 	spdk_histogram_data_free(histogram);
1889 
1890 	/* Tear down the channels */
1891 	set_thread(0);
1892 	spdk_put_io_channel(ch[0]);
1893 	set_thread(1);
1894 	spdk_put_io_channel(ch[1]);
1895 	poll_threads();
1896 	set_thread(0);
1897 	teardown_test();
1898 
1899 }
1900 
1901 struct timeout_io_cb_arg {
1902 	struct iovec iov;
1903 	uint8_t type;
1904 };
1905 
1906 static int
1907 bdev_channel_count_submitted_io(struct spdk_bdev_channel *ch)
1908 {
1909 	struct spdk_bdev_io *bdev_io;
1910 	int n = 0;
1911 
1912 	if (!ch) {
1913 		return -1;
1914 	}
1915 
1916 	TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) {
1917 		n++;
1918 	}
1919 
1920 	return n;
1921 }
1922 
1923 static void
1924 bdev_channel_io_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1925 {
1926 	struct timeout_io_cb_arg *ctx = cb_arg;
1927 
1928 	ctx->type = bdev_io->type;
1929 	ctx->iov.iov_base = bdev_io->iov.iov_base;
1930 	ctx->iov.iov_len = bdev_io->iov.iov_len;
1931 }
1932 
1933 static bool g_io_done;
1934 
1935 static void
1936 io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1937 {
1938 	g_io_done = true;
1939 	spdk_bdev_free_io(bdev_io);
1940 }
1941 
1942 static void
1943 bdev_set_io_timeout_mt(void)
1944 {
1945 	struct spdk_io_channel *ch[3];
1946 	struct spdk_bdev_channel *bdev_ch[3];
1947 	struct timeout_io_cb_arg cb_arg;
1948 
1949 	setup_test();
1950 
1951 	g_bdev.bdev.optimal_io_boundary = 16;
1952 	g_bdev.bdev.split_on_optimal_io_boundary = true;
1953 
1954 	set_thread(0);
1955 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1956 	CU_ASSERT(ch[0] != NULL);
1957 
1958 	set_thread(1);
1959 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1960 	CU_ASSERT(ch[1] != NULL);
1961 
1962 	set_thread(2);
1963 	ch[2] = spdk_bdev_get_io_channel(g_desc);
1964 	CU_ASSERT(ch[2] != NULL);
1965 
1966 	/* Multi-thread mode
1967 	 * 1, Check the poller was registered successfully
1968 	 * 2, Check the timeout IO and ensure the IO was the submitted by user
1969 	 * 3, Check the link int the bdev_ch works right.
1970 	 * 4, Close desc and put io channel during the timeout poller is polling
1971 	 */
1972 
1973 	/* In desc thread set the timeout */
1974 	set_thread(0);
1975 	CU_ASSERT(spdk_bdev_set_timeout(g_desc, 5, bdev_channel_io_timeout_cb, &cb_arg) == 0);
1976 	CU_ASSERT(g_desc->io_timeout_poller != NULL);
1977 	CU_ASSERT(g_desc->cb_fn == bdev_channel_io_timeout_cb);
1978 	CU_ASSERT(g_desc->cb_arg == &cb_arg);
1979 
1980 	/* check the IO submitted list and timeout handler */
1981 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x2000, 0, 1, io_done, NULL) == 0);
1982 	bdev_ch[0] = spdk_io_channel_get_ctx(ch[0]);
1983 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 1);
1984 
1985 	set_thread(1);
1986 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1987 	bdev_ch[1] = spdk_io_channel_get_ctx(ch[1]);
1988 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 1);
1989 
1990 	/* Now test that a single-vector command is split correctly.
1991 	 * Offset 14, length 8, payload 0xF000
1992 	 *  Child - Offset 14, length 2, payload 0xF000
1993 	 *  Child - Offset 16, length 6, payload 0xF000 + 2 * 512
1994 	 *
1995 	 * Set up the expected values before calling spdk_bdev_read_blocks
1996 	 */
1997 	set_thread(2);
1998 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0xF000, 14, 8, io_done, NULL) == 0);
1999 	bdev_ch[2] = spdk_io_channel_get_ctx(ch[2]);
2000 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 3);
2001 
2002 	set_thread(0);
2003 	memset(&cb_arg, 0, sizeof(cb_arg));
2004 	spdk_delay_us(3 * spdk_get_ticks_hz());
2005 	poll_threads();
2006 	CU_ASSERT(cb_arg.type == 0);
2007 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
2008 	CU_ASSERT(cb_arg.iov.iov_len == 0);
2009 
2010 	/* Now the time reach the limit */
2011 	spdk_delay_us(3 * spdk_get_ticks_hz());
2012 	poll_thread(0);
2013 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
2014 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
2015 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
2016 	stub_complete_io(g_bdev.io_target, 1);
2017 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 0);
2018 
2019 	memset(&cb_arg, 0, sizeof(cb_arg));
2020 	set_thread(1);
2021 	poll_thread(1);
2022 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
2023 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x1000);
2024 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
2025 	stub_complete_io(g_bdev.io_target, 1);
2026 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 0);
2027 
2028 	memset(&cb_arg, 0, sizeof(cb_arg));
2029 	set_thread(2);
2030 	poll_thread(2);
2031 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
2032 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0xF000);
2033 	CU_ASSERT(cb_arg.iov.iov_len == 8 * g_bdev.bdev.blocklen);
2034 	stub_complete_io(g_bdev.io_target, 1);
2035 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 2);
2036 	stub_complete_io(g_bdev.io_target, 1);
2037 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 0);
2038 
2039 	/* Run poll_timeout_done() it means complete the timeout poller */
2040 	set_thread(0);
2041 	poll_thread(0);
2042 	CU_ASSERT(g_desc->refs == 0);
2043 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x1000, 0, 1, io_done, NULL) == 0);
2044 	set_thread(1);
2045 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x2000, 0, 2, io_done, NULL) == 0);
2046 	set_thread(2);
2047 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0x3000, 0, 3, io_done, NULL) == 0);
2048 
2049 	/* Trigger timeout poller to run again, desc->refs is incremented.
2050 	 * In thread 0 we destroy the io channel before timeout poller runs.
2051 	 * Timeout callback is not called on thread 0.
2052 	 */
2053 	spdk_delay_us(6 * spdk_get_ticks_hz());
2054 	memset(&cb_arg, 0, sizeof(cb_arg));
2055 	set_thread(0);
2056 	stub_complete_io(g_bdev.io_target, 1);
2057 	spdk_put_io_channel(ch[0]);
2058 	poll_thread(0);
2059 	CU_ASSERT(g_desc->refs == 1)
2060 	CU_ASSERT(cb_arg.type == 0);
2061 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
2062 	CU_ASSERT(cb_arg.iov.iov_len == 0);
2063 
2064 	/* In thread 1 timeout poller runs then we destroy the io channel
2065 	 * Timeout callback is called on thread 1.
2066 	 */
2067 	memset(&cb_arg, 0, sizeof(cb_arg));
2068 	set_thread(1);
2069 	poll_thread(1);
2070 	stub_complete_io(g_bdev.io_target, 1);
2071 	spdk_put_io_channel(ch[1]);
2072 	poll_thread(1);
2073 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
2074 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
2075 	CU_ASSERT(cb_arg.iov.iov_len == 2 * g_bdev.bdev.blocklen);
2076 
2077 	/* Close the desc.
2078 	 * Unregister the timeout poller first.
2079 	 * Then decrement desc->refs but it's not zero yet so desc is not freed.
2080 	 */
2081 	set_thread(0);
2082 	spdk_bdev_close(g_desc);
2083 	CU_ASSERT(g_desc->refs == 1);
2084 	CU_ASSERT(g_desc->io_timeout_poller == NULL);
2085 
2086 	/* Timeout poller runs on thread 2 then we destroy the io channel.
2087 	 * Desc is closed so we would exit the timeout poller directly.
2088 	 * timeout callback is not called on thread 2.
2089 	 */
2090 	memset(&cb_arg, 0, sizeof(cb_arg));
2091 	set_thread(2);
2092 	poll_thread(2);
2093 	stub_complete_io(g_bdev.io_target, 1);
2094 	spdk_put_io_channel(ch[2]);
2095 	poll_thread(2);
2096 	CU_ASSERT(cb_arg.type == 0);
2097 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
2098 	CU_ASSERT(cb_arg.iov.iov_len == 0);
2099 
2100 	set_thread(0);
2101 	poll_thread(0);
2102 	g_teardown_done = false;
2103 	unregister_bdev(&g_bdev);
2104 	spdk_io_device_unregister(&g_io_device, NULL);
2105 	spdk_bdev_finish(finish_cb, NULL);
2106 	spdk_iobuf_finish(finish_cb, NULL);
2107 	poll_threads();
2108 	memset(&g_bdev, 0, sizeof(g_bdev));
2109 	CU_ASSERT(g_teardown_done == true);
2110 	g_teardown_done = false;
2111 	free_threads();
2112 	free_cores();
2113 }
2114 
2115 static bool g_io_done2;
2116 static bool g_lock_lba_range_done;
2117 static bool g_unlock_lba_range_done;
2118 
2119 static void
2120 io_done2(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2121 {
2122 	g_io_done2 = true;
2123 	spdk_bdev_free_io(bdev_io);
2124 }
2125 
2126 static void
2127 lock_lba_range_done(void *ctx, int status)
2128 {
2129 	g_lock_lba_range_done = true;
2130 }
2131 
2132 static void
2133 unlock_lba_range_done(void *ctx, int status)
2134 {
2135 	g_unlock_lba_range_done = true;
2136 }
2137 
2138 static uint32_t
2139 stub_channel_outstanding_cnt(void *io_target)
2140 {
2141 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
2142 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2143 	uint32_t outstanding_cnt;
2144 
2145 	outstanding_cnt = ch->outstanding_cnt;
2146 
2147 	spdk_put_io_channel(_ch);
2148 	return outstanding_cnt;
2149 }
2150 
2151 static void
2152 lock_lba_range_then_submit_io(void)
2153 {
2154 	struct spdk_bdev_desc *desc = NULL;
2155 	void *io_target;
2156 	struct spdk_io_channel *io_ch[3];
2157 	struct spdk_bdev_channel *bdev_ch[3];
2158 	struct lba_range *range;
2159 	char buf[4096];
2160 	int ctx0, ctx1, ctx2;
2161 	int rc;
2162 
2163 	setup_test();
2164 
2165 	io_target = g_bdev.io_target;
2166 	desc = g_desc;
2167 
2168 	set_thread(0);
2169 	io_ch[0] = spdk_bdev_get_io_channel(desc);
2170 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
2171 	CU_ASSERT(io_ch[0] != NULL);
2172 
2173 	set_thread(1);
2174 	io_ch[1] = spdk_bdev_get_io_channel(desc);
2175 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
2176 	CU_ASSERT(io_ch[1] != NULL);
2177 
2178 	set_thread(0);
2179 	g_lock_lba_range_done = false;
2180 	rc = bdev_lock_lba_range(desc, io_ch[0], 20, 10, lock_lba_range_done, &ctx0);
2181 	CU_ASSERT(rc == 0);
2182 	poll_threads();
2183 
2184 	/* The lock should immediately become valid, since there are no outstanding
2185 	 * write I/O.
2186 	 */
2187 	CU_ASSERT(g_lock_lba_range_done == true);
2188 	range = TAILQ_FIRST(&bdev_ch[0]->locked_ranges);
2189 	SPDK_CU_ASSERT_FATAL(range != NULL);
2190 	CU_ASSERT(range->offset == 20);
2191 	CU_ASSERT(range->length == 10);
2192 	CU_ASSERT(range->owner_ch == bdev_ch[0]);
2193 
2194 	g_io_done = false;
2195 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2196 	rc = spdk_bdev_read_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
2197 	CU_ASSERT(rc == 0);
2198 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2199 
2200 	stub_complete_io(io_target, 1);
2201 	poll_threads();
2202 	CU_ASSERT(g_io_done == true);
2203 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2204 
2205 	/* Try a write I/O.  This should actually be allowed to execute, since the channel
2206 	 * holding the lock is submitting the write I/O.
2207 	 */
2208 	g_io_done = false;
2209 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2210 	rc = spdk_bdev_write_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
2211 	CU_ASSERT(rc == 0);
2212 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2213 
2214 	stub_complete_io(io_target, 1);
2215 	poll_threads();
2216 	CU_ASSERT(g_io_done == true);
2217 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2218 
2219 	/* Try a write I/O.  This should get queued in the io_locked tailq. */
2220 	set_thread(1);
2221 	g_io_done = false;
2222 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
2223 	rc = spdk_bdev_write_blocks(desc, io_ch[1], buf, 20, 1, io_done, &ctx1);
2224 	CU_ASSERT(rc == 0);
2225 	poll_threads();
2226 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
2227 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[1]->io_locked));
2228 	CU_ASSERT(g_io_done == false);
2229 
2230 	/* Try to unlock the lba range using thread 1's io_ch.  This should fail. */
2231 	rc = bdev_unlock_lba_range(desc, io_ch[1], 20, 10, unlock_lba_range_done, &ctx1);
2232 	CU_ASSERT(rc == -EINVAL);
2233 
2234 	/* Now create a new channel and submit a write I/O with it.  This should also be queued.
2235 	 * The new channel should inherit the active locks from the bdev's internal list.
2236 	 */
2237 	set_thread(2);
2238 	io_ch[2] = spdk_bdev_get_io_channel(desc);
2239 	bdev_ch[2] = spdk_io_channel_get_ctx(io_ch[2]);
2240 	CU_ASSERT(io_ch[2] != NULL);
2241 
2242 	g_io_done2 = false;
2243 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2244 	rc = spdk_bdev_write_blocks(desc, io_ch[2], buf, 22, 2, io_done2, &ctx2);
2245 	CU_ASSERT(rc == 0);
2246 	poll_threads();
2247 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
2248 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2249 	CU_ASSERT(g_io_done2 == false);
2250 
2251 	set_thread(0);
2252 	rc = bdev_unlock_lba_range(desc, io_ch[0], 20, 10, unlock_lba_range_done, &ctx0);
2253 	CU_ASSERT(rc == 0);
2254 	poll_threads();
2255 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->locked_ranges));
2256 
2257 	/* The LBA range is unlocked, so the write IOs should now have started execution. */
2258 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
2259 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2260 
2261 	set_thread(1);
2262 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2263 	stub_complete_io(io_target, 1);
2264 	set_thread(2);
2265 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2266 	stub_complete_io(io_target, 1);
2267 
2268 	poll_threads();
2269 	CU_ASSERT(g_io_done == true);
2270 	CU_ASSERT(g_io_done2 == true);
2271 
2272 	/* Tear down the channels */
2273 	set_thread(0);
2274 	spdk_put_io_channel(io_ch[0]);
2275 	set_thread(1);
2276 	spdk_put_io_channel(io_ch[1]);
2277 	set_thread(2);
2278 	spdk_put_io_channel(io_ch[2]);
2279 	poll_threads();
2280 	set_thread(0);
2281 	teardown_test();
2282 }
2283 
2284 /* spdk_bdev_reset() freezes and unfreezes I/O channels by using spdk_for_each_channel().
2285  * spdk_bdev_unregister() calls spdk_io_device_unregister() in the end. However
2286  * spdk_io_device_unregister() fails if it is called while executing spdk_for_each_channel().
2287  * Hence, in this case, spdk_io_device_unregister() is deferred until spdk_bdev_reset()
2288  * completes. Test this behavior.
2289  */
2290 static void
2291 unregister_during_reset(void)
2292 {
2293 	struct spdk_io_channel *io_ch[2];
2294 	bool done_reset = false, done_unregister = false;
2295 	int rc;
2296 
2297 	setup_test();
2298 	set_thread(0);
2299 
2300 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
2301 	SPDK_CU_ASSERT_FATAL(io_ch[0] != NULL);
2302 
2303 	set_thread(1);
2304 
2305 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
2306 	SPDK_CU_ASSERT_FATAL(io_ch[1] != NULL);
2307 
2308 	set_thread(0);
2309 
2310 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
2311 
2312 	rc = spdk_bdev_reset(g_desc, io_ch[0], reset_done, &done_reset);
2313 	CU_ASSERT(rc == 0);
2314 
2315 	set_thread(0);
2316 
2317 	poll_thread_times(0, 1);
2318 
2319 	spdk_bdev_close(g_desc);
2320 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done_unregister);
2321 
2322 	CU_ASSERT(done_reset == false);
2323 	CU_ASSERT(done_unregister == false);
2324 
2325 	poll_threads();
2326 
2327 	stub_complete_io(g_bdev.io_target, 0);
2328 
2329 	poll_threads();
2330 
2331 	CU_ASSERT(done_reset == true);
2332 	CU_ASSERT(done_unregister == false);
2333 
2334 	spdk_put_io_channel(io_ch[0]);
2335 
2336 	set_thread(1);
2337 
2338 	spdk_put_io_channel(io_ch[1]);
2339 
2340 	poll_threads();
2341 
2342 	CU_ASSERT(done_unregister == true);
2343 
2344 	/* Restore the original g_bdev so that we can use teardown_test(). */
2345 	set_thread(0);
2346 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
2347 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
2348 	teardown_test();
2349 }
2350 
2351 static void
2352 bdev_init_wt_cb(void *done, int rc)
2353 {
2354 }
2355 
2356 
2357 static int
2358 wrong_thread_setup(void)
2359 {
2360 	allocate_cores(1);
2361 	allocate_threads(2);
2362 	set_thread(0);
2363 
2364 	spdk_bdev_initialize(bdev_init_wt_cb, NULL);
2365 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
2366 				sizeof(struct ut_bdev_channel), NULL);
2367 
2368 	set_thread(1);
2369 
2370 	return 0;
2371 }
2372 
2373 static int
2374 wrong_thread_teardown(void)
2375 {
2376 	int rc = 0;
2377 
2378 	set_thread(0);
2379 
2380 	g_teardown_done = false;
2381 	spdk_io_device_unregister(&g_io_device, NULL);
2382 	spdk_bdev_finish(finish_cb, NULL);
2383 	poll_threads();
2384 	memset(&g_bdev, 0, sizeof(g_bdev));
2385 	if (!g_teardown_done) {
2386 		fprintf(stderr, "%s:%d %s: teardown not done\n", __FILE__, __LINE__, __func__);
2387 		rc = -1;
2388 	}
2389 	g_teardown_done = false;
2390 
2391 	free_threads();
2392 	free_cores();
2393 
2394 	return rc;
2395 }
2396 
2397 static void
2398 _bdev_unregistered_wt(void *ctx, int rc)
2399 {
2400 	struct spdk_thread **threadp = ctx;
2401 
2402 	*threadp = spdk_get_thread();
2403 }
2404 
2405 static void
2406 spdk_bdev_register_wt(void)
2407 {
2408 	struct spdk_bdev bdev = { 0 };
2409 	int rc;
2410 	struct spdk_thread *unreg_thread;
2411 
2412 	bdev.name = "wt_bdev";
2413 	bdev.fn_table = &fn_table;
2414 	bdev.module = &bdev_ut_if;
2415 	bdev.blocklen = 4096;
2416 	bdev.blockcnt = 1024;
2417 
2418 	/* Can register only on app thread */
2419 	rc = spdk_bdev_register(&bdev);
2420 	CU_ASSERT(rc == -EINVAL);
2421 
2422 	/* Can unregister on any thread */
2423 	set_thread(0);
2424 	rc = spdk_bdev_register(&bdev);
2425 	CU_ASSERT(rc == 0);
2426 	set_thread(1);
2427 	unreg_thread = NULL;
2428 	spdk_bdev_unregister(&bdev, _bdev_unregistered_wt, &unreg_thread);
2429 	poll_threads();
2430 	CU_ASSERT(unreg_thread == spdk_get_thread());
2431 
2432 	/* Can unregister by name on any thread */
2433 	set_thread(0);
2434 	rc = spdk_bdev_register(&bdev);
2435 	CU_ASSERT(rc == 0);
2436 	set_thread(1);
2437 	unreg_thread = NULL;
2438 	rc = spdk_bdev_unregister_by_name(bdev.name, bdev.module, _bdev_unregistered_wt,
2439 					  &unreg_thread);
2440 	CU_ASSERT(rc == 0);
2441 	poll_threads();
2442 	CU_ASSERT(unreg_thread == spdk_get_thread());
2443 }
2444 
2445 static void
2446 wait_for_examine_cb(void *arg)
2447 {
2448 	struct spdk_thread **thread = arg;
2449 
2450 	*thread = spdk_get_thread();
2451 }
2452 
2453 static void
2454 spdk_bdev_examine_wt(void)
2455 {
2456 	int rc;
2457 	bool save_auto_examine = g_bdev_opts.bdev_auto_examine;
2458 	struct spdk_thread *thread;
2459 
2460 	g_bdev_opts.bdev_auto_examine = false;
2461 
2462 	set_thread(0);
2463 	register_bdev(&g_bdev, "ut_bdev_wt", &g_io_device);
2464 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") != NULL);
2465 	set_thread(1);
2466 
2467 	/* Can examine only on the app thread */
2468 	rc = spdk_bdev_examine("ut_bdev_wt");
2469 	CU_ASSERT(rc == -EINVAL);
2470 	unregister_bdev(&g_bdev);
2471 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") == NULL);
2472 
2473 	/* Can wait for examine on app thread, callback called on app thread. */
2474 	set_thread(0);
2475 	register_bdev(&g_bdev, "ut_bdev_wt", &g_io_device);
2476 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") != NULL);
2477 	thread = NULL;
2478 	rc = spdk_bdev_wait_for_examine(wait_for_examine_cb, &thread);
2479 	CU_ASSERT(rc == 0);
2480 	poll_threads();
2481 	CU_ASSERT(thread == spdk_get_thread());
2482 	unregister_bdev(&g_bdev);
2483 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") == NULL);
2484 
2485 	/* Can wait for examine on non-app thread, callback called on same thread. */
2486 	set_thread(0);
2487 	register_bdev(&g_bdev, "ut_bdev_wt", &g_io_device);
2488 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") != NULL);
2489 	thread = NULL;
2490 	rc = spdk_bdev_wait_for_examine(wait_for_examine_cb, &thread);
2491 	CU_ASSERT(rc == 0);
2492 	poll_threads();
2493 	CU_ASSERT(thread == spdk_get_thread());
2494 	unregister_bdev(&g_bdev);
2495 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") == NULL);
2496 
2497 	unregister_bdev(&g_bdev);
2498 	g_bdev_opts.bdev_auto_examine = save_auto_examine;
2499 }
2500 
2501 static void
2502 event_notify_and_close(void)
2503 {
2504 	int resize_notify_count = 0;
2505 	struct spdk_bdev_desc *desc = NULL;
2506 	struct spdk_bdev *bdev;
2507 	int rc;
2508 
2509 	setup_test();
2510 	set_thread(0);
2511 
2512 	/* setup_test() automatically opens the bdev, but this test needs to do
2513 	 * that in a different way. */
2514 	spdk_bdev_close(g_desc);
2515 	poll_threads();
2516 
2517 	set_thread(1);
2518 
2519 	rc = spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, &resize_notify_count, &desc);
2520 	CU_ASSERT(rc == 0);
2521 	SPDK_CU_ASSERT_FATAL(desc != NULL);
2522 
2523 	bdev = spdk_bdev_desc_get_bdev(desc);
2524 	SPDK_CU_ASSERT_FATAL(bdev != NULL);
2525 
2526 	/* Test a normal case that a resize event is notified. */
2527 	set_thread(0);
2528 
2529 	rc = spdk_bdev_notify_blockcnt_change(bdev, 1024 * 2);
2530 	CU_ASSERT(rc == 0);
2531 	CU_ASSERT(bdev->blockcnt == 1024 * 2);
2532 	CU_ASSERT(desc->refs == 1);
2533 	CU_ASSERT(resize_notify_count == 0);
2534 
2535 	poll_threads();
2536 
2537 	CU_ASSERT(desc->refs == 0);
2538 	CU_ASSERT(resize_notify_count == 1);
2539 
2540 	/* Test a complex case if the bdev is closed after two event_notify messages are sent,
2541 	 * then both event_notify messages are discarded and the desc is freed.
2542 	 */
2543 	rc = spdk_bdev_notify_blockcnt_change(bdev, 1024 * 3);
2544 	CU_ASSERT(rc == 0);
2545 	CU_ASSERT(bdev->blockcnt == 1024 * 3);
2546 	CU_ASSERT(desc->refs == 1);
2547 	CU_ASSERT(resize_notify_count == 1);
2548 
2549 	rc = spdk_bdev_notify_blockcnt_change(bdev, 1024 * 4);
2550 	CU_ASSERT(rc == 0);
2551 	CU_ASSERT(bdev->blockcnt == 1024 * 4);
2552 	CU_ASSERT(desc->refs == 2);
2553 	CU_ASSERT(resize_notify_count == 1);
2554 
2555 	set_thread(1);
2556 
2557 	spdk_bdev_close(desc);
2558 	CU_ASSERT(desc->closed == true);
2559 	CU_ASSERT(desc->refs == 2);
2560 	CU_ASSERT(resize_notify_count == 1);
2561 
2562 	poll_threads();
2563 
2564 	CU_ASSERT(resize_notify_count == 1);
2565 
2566 	set_thread(0);
2567 
2568 	/* Restore g_desc. Then, we can execute teardown_test(). */
2569 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
2570 	teardown_test();
2571 }
2572 
2573 int
2574 main(int argc, char **argv)
2575 {
2576 	CU_pSuite	suite = NULL;
2577 	CU_pSuite	suite_wt = NULL;
2578 	unsigned int	num_failures;
2579 
2580 	CU_set_error_action(CUEA_ABORT);
2581 	CU_initialize_registry();
2582 
2583 	suite = CU_add_suite("bdev", NULL, NULL);
2584 	suite_wt = CU_add_suite("bdev_wrong_thread", wrong_thread_setup, wrong_thread_teardown);
2585 
2586 	CU_ADD_TEST(suite, basic);
2587 	CU_ADD_TEST(suite, unregister_and_close);
2588 	CU_ADD_TEST(suite, unregister_and_close_different_threads);
2589 	CU_ADD_TEST(suite, basic_qos);
2590 	CU_ADD_TEST(suite, put_channel_during_reset);
2591 	CU_ADD_TEST(suite, aborted_reset);
2592 	CU_ADD_TEST(suite, aborted_reset_no_outstanding_io);
2593 	CU_ADD_TEST(suite, io_during_reset);
2594 	CU_ADD_TEST(suite, reset_completions);
2595 	CU_ADD_TEST(suite, io_during_qos_queue);
2596 	CU_ADD_TEST(suite, io_during_qos_reset);
2597 	CU_ADD_TEST(suite, enomem);
2598 	CU_ADD_TEST(suite, enomem_multi_bdev);
2599 	CU_ADD_TEST(suite, enomem_multi_bdev_unregister);
2600 	CU_ADD_TEST(suite, enomem_multi_io_target);
2601 	CU_ADD_TEST(suite, qos_dynamic_enable);
2602 	CU_ADD_TEST(suite, bdev_histograms_mt);
2603 	CU_ADD_TEST(suite, bdev_set_io_timeout_mt);
2604 	CU_ADD_TEST(suite, lock_lba_range_then_submit_io);
2605 	CU_ADD_TEST(suite, unregister_during_reset);
2606 	CU_ADD_TEST(suite_wt, spdk_bdev_register_wt);
2607 	CU_ADD_TEST(suite_wt, spdk_bdev_examine_wt);
2608 	CU_ADD_TEST(suite, event_notify_and_close);
2609 
2610 	CU_basic_set_mode(CU_BRM_VERBOSE);
2611 	CU_basic_run_tests();
2612 	num_failures = CU_get_number_of_failures();
2613 	CU_cleanup_registry();
2614 	return num_failures;
2615 }
2616