xref: /spdk/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c (revision cf64422ad71e4294d5683f6e51bfee7e12832000)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk_cunit.h"
8 
9 #include "common/lib/ut_multithread.c"
10 #include "unit/lib/json_mock.c"
11 
12 #include "spdk/config.h"
13 /* HACK: disable VTune integration so the unit test doesn't need VTune headers and libs to build */
14 #undef SPDK_CONFIG_VTUNE
15 
16 #include "bdev/bdev.c"
17 
18 #define BDEV_UT_NUM_THREADS 3
19 
20 DEFINE_STUB(spdk_notify_send, uint64_t, (const char *type, const char *ctx), 0);
21 DEFINE_STUB(spdk_notify_type_register, struct spdk_notify_type *, (const char *type), NULL);
22 DEFINE_STUB_V(spdk_scsi_nvme_translate, (const struct spdk_bdev_io *bdev_io, int *sc, int *sk,
23 		int *asc, int *ascq));
24 DEFINE_STUB(spdk_memory_domain_get_dma_device_id, const char *, (struct spdk_memory_domain *domain),
25 	    "test_domain");
26 DEFINE_STUB(spdk_memory_domain_get_dma_device_type, enum spdk_dma_device_type,
27 	    (struct spdk_memory_domain *domain), 0);
28 
29 DEFINE_RETURN_MOCK(spdk_memory_domain_pull_data, int);
30 int
31 spdk_memory_domain_pull_data(struct spdk_memory_domain *src_domain, void *src_domain_ctx,
32 			     struct iovec *src_iov, uint32_t src_iov_cnt, struct iovec *dst_iov, uint32_t dst_iov_cnt,
33 			     spdk_memory_domain_data_cpl_cb cpl_cb, void *cpl_cb_arg)
34 {
35 	HANDLE_RETURN_MOCK(spdk_memory_domain_pull_data);
36 
37 	cpl_cb(cpl_cb_arg, 0);
38 	return 0;
39 }
40 
41 DEFINE_RETURN_MOCK(spdk_memory_domain_push_data, int);
42 int
43 spdk_memory_domain_push_data(struct spdk_memory_domain *dst_domain, void *dst_domain_ctx,
44 			     struct iovec *dst_iov, uint32_t dst_iovcnt, struct iovec *src_iov, uint32_t src_iovcnt,
45 			     spdk_memory_domain_data_cpl_cb cpl_cb, void *cpl_cb_arg)
46 {
47 	HANDLE_RETURN_MOCK(spdk_memory_domain_push_data);
48 
49 	cpl_cb(cpl_cb_arg, 0);
50 	return 0;
51 }
52 
53 struct ut_bdev {
54 	struct spdk_bdev	bdev;
55 	void			*io_target;
56 };
57 
58 struct ut_bdev_channel {
59 	TAILQ_HEAD(, spdk_bdev_io)	outstanding_io;
60 	uint32_t			outstanding_cnt;
61 	uint32_t			avail_cnt;
62 };
63 
64 int g_io_device;
65 struct ut_bdev g_bdev;
66 struct spdk_bdev_desc *g_desc;
67 bool g_teardown_done = false;
68 bool g_get_io_channel = true;
69 bool g_create_ch = true;
70 bool g_init_complete_called = false;
71 bool g_fini_start_called = true;
72 int g_status = 0;
73 int g_count = 0;
74 struct spdk_histogram_data *g_histogram = NULL;
75 
76 static int
77 stub_create_ch(void *io_device, void *ctx_buf)
78 {
79 	struct ut_bdev_channel *ch = ctx_buf;
80 
81 	if (g_create_ch == false) {
82 		return -1;
83 	}
84 
85 	TAILQ_INIT(&ch->outstanding_io);
86 	ch->outstanding_cnt = 0;
87 	/*
88 	 * When avail gets to 0, the submit_request function will return ENOMEM.
89 	 *  Most tests to not want ENOMEM to occur, so by default set this to a
90 	 *  big value that won't get hit.  The ENOMEM tests can then override this
91 	 *  value to something much smaller to induce ENOMEM conditions.
92 	 */
93 	ch->avail_cnt = 2048;
94 	return 0;
95 }
96 
97 static void
98 stub_destroy_ch(void *io_device, void *ctx_buf)
99 {
100 }
101 
102 static struct spdk_io_channel *
103 stub_get_io_channel(void *ctx)
104 {
105 	struct ut_bdev *ut_bdev = ctx;
106 
107 	if (g_get_io_channel == true) {
108 		return spdk_get_io_channel(ut_bdev->io_target);
109 	} else {
110 		return NULL;
111 	}
112 }
113 
114 static int
115 stub_destruct(void *ctx)
116 {
117 	return 0;
118 }
119 
120 static void
121 stub_submit_request(struct spdk_io_channel *_ch, struct spdk_bdev_io *bdev_io)
122 {
123 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
124 	struct spdk_bdev_io *io;
125 
126 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
127 		while (!TAILQ_EMPTY(&ch->outstanding_io)) {
128 			io = TAILQ_FIRST(&ch->outstanding_io);
129 			TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
130 			ch->outstanding_cnt--;
131 			spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
132 			ch->avail_cnt++;
133 		}
134 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) {
135 		TAILQ_FOREACH(io, &ch->outstanding_io, module_link) {
136 			if (io == bdev_io->u.abort.bio_to_abort) {
137 				TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
138 				ch->outstanding_cnt--;
139 				spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
140 				ch->avail_cnt++;
141 
142 				spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
143 				return;
144 			}
145 		}
146 
147 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
148 		return;
149 	}
150 
151 	if (ch->avail_cnt > 0) {
152 		TAILQ_INSERT_TAIL(&ch->outstanding_io, bdev_io, module_link);
153 		ch->outstanding_cnt++;
154 		ch->avail_cnt--;
155 	} else {
156 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
157 	}
158 }
159 
160 static uint32_t
161 stub_complete_io(void *io_target, uint32_t num_to_complete)
162 {
163 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
164 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
165 	struct spdk_bdev_io *io;
166 	bool complete_all = (num_to_complete == 0);
167 	uint32_t num_completed = 0;
168 
169 	while (complete_all || num_completed < num_to_complete) {
170 		if (TAILQ_EMPTY(&ch->outstanding_io)) {
171 			break;
172 		}
173 		io = TAILQ_FIRST(&ch->outstanding_io);
174 		TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
175 		ch->outstanding_cnt--;
176 		spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_SUCCESS);
177 		ch->avail_cnt++;
178 		num_completed++;
179 	}
180 	spdk_put_io_channel(_ch);
181 	return num_completed;
182 }
183 
184 static bool
185 stub_io_type_supported(void *ctx, enum spdk_bdev_io_type type)
186 {
187 	return true;
188 }
189 
190 static struct spdk_bdev_fn_table fn_table = {
191 	.get_io_channel =	stub_get_io_channel,
192 	.destruct =		stub_destruct,
193 	.submit_request =	stub_submit_request,
194 	.io_type_supported =	stub_io_type_supported,
195 };
196 
197 struct spdk_bdev_module bdev_ut_if;
198 
199 static int
200 module_init(void)
201 {
202 	spdk_bdev_module_init_done(&bdev_ut_if);
203 	return 0;
204 }
205 
206 static void
207 module_fini(void)
208 {
209 }
210 
211 static void
212 init_complete(void)
213 {
214 	g_init_complete_called = true;
215 }
216 
217 static void
218 fini_start(void)
219 {
220 	g_fini_start_called = true;
221 }
222 
223 struct spdk_bdev_module bdev_ut_if = {
224 	.name = "bdev_ut",
225 	.module_init = module_init,
226 	.module_fini = module_fini,
227 	.async_init = true,
228 	.init_complete = init_complete,
229 	.fini_start = fini_start,
230 };
231 
232 SPDK_BDEV_MODULE_REGISTER(bdev_ut, &bdev_ut_if)
233 
234 static void
235 register_bdev(struct ut_bdev *ut_bdev, char *name, void *io_target)
236 {
237 	memset(ut_bdev, 0, sizeof(*ut_bdev));
238 
239 	ut_bdev->io_target = io_target;
240 	ut_bdev->bdev.ctxt = ut_bdev;
241 	ut_bdev->bdev.name = name;
242 	ut_bdev->bdev.fn_table = &fn_table;
243 	ut_bdev->bdev.module = &bdev_ut_if;
244 	ut_bdev->bdev.blocklen = 4096;
245 	ut_bdev->bdev.blockcnt = 1024;
246 
247 	spdk_bdev_register(&ut_bdev->bdev);
248 }
249 
250 static void
251 unregister_bdev(struct ut_bdev *ut_bdev)
252 {
253 	/* Handle any deferred messages. */
254 	poll_threads();
255 	spdk_bdev_unregister(&ut_bdev->bdev, NULL, NULL);
256 	/* Handle the async bdev unregister. */
257 	poll_threads();
258 }
259 
260 static void
261 bdev_init_cb(void *done, int rc)
262 {
263 	CU_ASSERT(rc == 0);
264 	*(bool *)done = true;
265 }
266 
267 static void
268 _bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
269 	       void *event_ctx)
270 {
271 	switch (type) {
272 	case SPDK_BDEV_EVENT_REMOVE:
273 		if (event_ctx != NULL) {
274 			*(bool *)event_ctx = true;
275 		}
276 		break;
277 	default:
278 		CU_ASSERT(false);
279 		break;
280 	}
281 }
282 
283 static void
284 setup_test(void)
285 {
286 	bool done = false;
287 	int rc;
288 
289 	allocate_cores(BDEV_UT_NUM_THREADS);
290 	allocate_threads(BDEV_UT_NUM_THREADS);
291 	set_thread(0);
292 
293 	rc = spdk_iobuf_initialize();
294 	CU_ASSERT(rc == 0);
295 	spdk_bdev_initialize(bdev_init_cb, &done);
296 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
297 				sizeof(struct ut_bdev_channel), NULL);
298 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
299 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
300 }
301 
302 static void
303 finish_cb(void *cb_arg)
304 {
305 	g_teardown_done = true;
306 }
307 
308 static void
309 teardown_test(void)
310 {
311 	set_thread(0);
312 	g_teardown_done = false;
313 	spdk_bdev_close(g_desc);
314 	g_desc = NULL;
315 	unregister_bdev(&g_bdev);
316 	spdk_io_device_unregister(&g_io_device, NULL);
317 	spdk_bdev_finish(finish_cb, NULL);
318 	spdk_iobuf_finish(finish_cb, NULL);
319 	poll_threads();
320 	memset(&g_bdev, 0, sizeof(g_bdev));
321 	CU_ASSERT(g_teardown_done == true);
322 	g_teardown_done = false;
323 	free_threads();
324 	free_cores();
325 }
326 
327 static uint32_t
328 bdev_io_tailq_cnt(bdev_io_tailq_t *tailq)
329 {
330 	struct spdk_bdev_io *io;
331 	uint32_t cnt = 0;
332 
333 	TAILQ_FOREACH(io, tailq, internal.link) {
334 		cnt++;
335 	}
336 
337 	return cnt;
338 }
339 
340 static void
341 basic(void)
342 {
343 	g_init_complete_called = false;
344 	setup_test();
345 	CU_ASSERT(g_init_complete_called == true);
346 
347 	set_thread(0);
348 
349 	g_get_io_channel = false;
350 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
351 	CU_ASSERT(g_ut_threads[0].ch == NULL);
352 
353 	g_get_io_channel = true;
354 	g_create_ch = false;
355 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
356 	CU_ASSERT(g_ut_threads[0].ch == NULL);
357 
358 	g_get_io_channel = true;
359 	g_create_ch = true;
360 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
361 	CU_ASSERT(g_ut_threads[0].ch != NULL);
362 	spdk_put_io_channel(g_ut_threads[0].ch);
363 
364 	g_fini_start_called = false;
365 	teardown_test();
366 	CU_ASSERT(g_fini_start_called == true);
367 }
368 
369 static void
370 _bdev_unregistered(void *done, int rc)
371 {
372 	CU_ASSERT(rc == 0);
373 	*(bool *)done = true;
374 }
375 
376 static void
377 unregister_and_close(void)
378 {
379 	bool done, remove_notify;
380 	struct spdk_bdev_desc *desc = NULL;
381 
382 	setup_test();
383 	set_thread(0);
384 
385 	/* setup_test() automatically opens the bdev,
386 	 * but this test needs to do that in a different
387 	 * way. */
388 	spdk_bdev_close(g_desc);
389 	poll_threads();
390 
391 	/* Try hotremoving a bdev with descriptors which don't provide
392 	 * any context to the notification callback */
393 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &desc);
394 	SPDK_CU_ASSERT_FATAL(desc != NULL);
395 
396 	/* There is an open descriptor on the device. Unregister it,
397 	 * which can't proceed until the descriptor is closed. */
398 	done = false;
399 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
400 
401 	/* Poll the threads to allow all events to be processed */
402 	poll_threads();
403 
404 	/* Make sure the bdev was not unregistered. We still have a
405 	 * descriptor open */
406 	CU_ASSERT(done == false);
407 
408 	spdk_bdev_close(desc);
409 	poll_threads();
410 	desc = NULL;
411 
412 	/* The unregister should have completed */
413 	CU_ASSERT(done == true);
414 
415 
416 	/* Register the bdev again */
417 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
418 
419 	remove_notify = false;
420 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, &remove_notify, &desc);
421 	SPDK_CU_ASSERT_FATAL(desc != NULL);
422 	CU_ASSERT(remove_notify == false);
423 
424 	/* There is an open descriptor on the device. Unregister it,
425 	 * which can't proceed until the descriptor is closed. */
426 	done = false;
427 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
428 	/* No polling has occurred, so neither of these should execute */
429 	CU_ASSERT(remove_notify == false);
430 	CU_ASSERT(done == false);
431 
432 	/* Prior to the unregister completing, close the descriptor */
433 	spdk_bdev_close(desc);
434 
435 	/* Poll the threads to allow all events to be processed */
436 	poll_threads();
437 
438 	/* Remove notify should not have been called because the
439 	 * descriptor is already closed. */
440 	CU_ASSERT(remove_notify == false);
441 
442 	/* The unregister should have completed */
443 	CU_ASSERT(done == true);
444 
445 	/* Restore the original g_bdev so that we can use teardown_test(). */
446 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
447 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
448 	teardown_test();
449 }
450 
451 static void
452 unregister_and_close_different_threads(void)
453 {
454 	bool done;
455 	struct spdk_bdev_desc *desc = NULL;
456 
457 	setup_test();
458 	set_thread(0);
459 
460 	/* setup_test() automatically opens the bdev,
461 	 * but this test needs to do that in a different
462 	 * way. */
463 	spdk_bdev_close(g_desc);
464 	poll_threads();
465 
466 	set_thread(1);
467 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &desc);
468 	SPDK_CU_ASSERT_FATAL(desc != NULL);
469 	done = false;
470 
471 	set_thread(0);
472 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
473 
474 	/* Poll the threads to allow all events to be processed */
475 	poll_threads();
476 
477 	/* Make sure the bdev was not unregistered. We still have a
478 	 * descriptor open */
479 	CU_ASSERT(done == false);
480 
481 	/* Close the descriptor on thread 1.  Poll the thread and confirm the
482 	 * unregister did not complete, since it was unregistered on thread 0.
483 	 */
484 	set_thread(1);
485 	spdk_bdev_close(desc);
486 	poll_thread(1);
487 	CU_ASSERT(done == false);
488 
489 	/* Now poll thread 0 and confirm the unregister completed. */
490 	set_thread(0);
491 	poll_thread(0);
492 	CU_ASSERT(done == true);
493 
494 	/* Restore the original g_bdev so that we can use teardown_test(). */
495 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
496 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
497 	teardown_test();
498 }
499 
500 static void
501 reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
502 {
503 	bool *done = cb_arg;
504 
505 	CU_ASSERT(success == true);
506 	*done = true;
507 	spdk_bdev_free_io(bdev_io);
508 }
509 
510 static void
511 put_channel_during_reset(void)
512 {
513 	struct spdk_io_channel *io_ch;
514 	bool done = false;
515 
516 	setup_test();
517 
518 	set_thread(0);
519 	io_ch = spdk_bdev_get_io_channel(g_desc);
520 	CU_ASSERT(io_ch != NULL);
521 
522 	/*
523 	 * Start a reset, but then put the I/O channel before
524 	 *  the deferred messages for the reset get a chance to
525 	 *  execute.
526 	 */
527 	spdk_bdev_reset(g_desc, io_ch, reset_done, &done);
528 	spdk_put_io_channel(io_ch);
529 	poll_threads();
530 	stub_complete_io(g_bdev.io_target, 0);
531 
532 	teardown_test();
533 }
534 
535 static void
536 aborted_reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
537 {
538 	enum spdk_bdev_io_status *status = cb_arg;
539 
540 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
541 	spdk_bdev_free_io(bdev_io);
542 }
543 
544 static void io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
545 
546 static void
547 aborted_reset(void)
548 {
549 	struct spdk_io_channel *io_ch[2];
550 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
551 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
552 
553 	setup_test();
554 
555 	set_thread(0);
556 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
557 	CU_ASSERT(io_ch[0] != NULL);
558 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
559 	poll_threads();
560 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
561 
562 	/*
563 	 * First reset has been submitted on ch0.  Now submit a second
564 	 *  reset on ch1 which will get queued since there is already a
565 	 *  reset in progress.
566 	 */
567 	set_thread(1);
568 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
569 	CU_ASSERT(io_ch[1] != NULL);
570 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
571 	poll_threads();
572 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
573 
574 	/*
575 	 * Now destroy ch1.  This will abort the queued reset.  Check that
576 	 *  the second reset was completed with failed status.  Also check
577 	 *  that bdev->internal.reset_in_progress != NULL, since the
578 	 *  original reset has not been completed yet.  This ensures that
579 	 *  the bdev code is correctly noticing that the failed reset is
580 	 *  *not* the one that had been submitted to the bdev module.
581 	 */
582 	set_thread(1);
583 	spdk_put_io_channel(io_ch[1]);
584 	poll_threads();
585 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_FAILED);
586 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
587 
588 	/*
589 	 * Now complete the first reset, verify that it completed with SUCCESS
590 	 *  status and that bdev->internal.reset_in_progress is also set back to NULL.
591 	 */
592 	set_thread(0);
593 	spdk_put_io_channel(io_ch[0]);
594 	stub_complete_io(g_bdev.io_target, 0);
595 	poll_threads();
596 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
597 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
598 
599 	teardown_test();
600 }
601 
602 static void
603 aborted_reset_no_outstanding_io(void)
604 {
605 	struct spdk_io_channel *io_ch[2];
606 	struct spdk_bdev_channel *bdev_ch[2];
607 	struct spdk_bdev *bdev[2];
608 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
609 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
610 
611 	setup_test();
612 
613 	/*
614 	 * This time we test the reset without any outstanding IO
615 	 * present on the bdev channel, so both resets should finish
616 	 * immediately.
617 	 */
618 
619 	set_thread(0);
620 	/* Set reset_io_drain_timeout to allow bdev
621 	 * reset to stay pending until we call abort. */
622 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
623 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
624 	bdev[0] = bdev_ch[0]->bdev;
625 	bdev[0]->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
626 	CU_ASSERT(io_ch[0] != NULL);
627 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
628 	poll_threads();
629 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
630 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
631 	spdk_put_io_channel(io_ch[0]);
632 
633 	set_thread(1);
634 	/* Set reset_io_drain_timeout to allow bdev
635 	 * reset to stay pending until we call abort. */
636 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
637 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
638 	bdev[1] = bdev_ch[1]->bdev;
639 	bdev[1]->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
640 	CU_ASSERT(io_ch[1] != NULL);
641 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
642 	poll_threads();
643 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
644 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
645 	spdk_put_io_channel(io_ch[1]);
646 
647 	stub_complete_io(g_bdev.io_target, 0);
648 	poll_threads();
649 
650 	teardown_test();
651 }
652 
653 
654 static void
655 io_during_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
656 {
657 	enum spdk_bdev_io_status *status = cb_arg;
658 
659 	*status = bdev_io->internal.status;
660 	spdk_bdev_free_io(bdev_io);
661 }
662 
663 static void
664 io_during_reset(void)
665 {
666 	struct spdk_io_channel *io_ch[2];
667 	struct spdk_bdev_channel *bdev_ch[2];
668 	enum spdk_bdev_io_status status0, status1, status_reset;
669 	int rc;
670 
671 	setup_test();
672 
673 	/*
674 	 * First test normal case - submit an I/O on each of two channels (with no resets)
675 	 *  and verify they complete successfully.
676 	 */
677 	set_thread(0);
678 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
679 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
680 	CU_ASSERT(bdev_ch[0]->flags == 0);
681 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
682 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
683 	CU_ASSERT(rc == 0);
684 
685 	set_thread(1);
686 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
687 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
688 	CU_ASSERT(bdev_ch[1]->flags == 0);
689 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
690 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
691 	CU_ASSERT(rc == 0);
692 
693 	poll_threads();
694 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
695 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
696 
697 	set_thread(0);
698 	stub_complete_io(g_bdev.io_target, 0);
699 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
700 
701 	set_thread(1);
702 	stub_complete_io(g_bdev.io_target, 0);
703 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
704 
705 	/*
706 	 * Now submit a reset, and leave it pending while we submit I/O on two different
707 	 *  channels.  These I/O should be failed by the bdev layer since the reset is in
708 	 *  progress.
709 	 */
710 	set_thread(0);
711 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
712 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &status_reset);
713 	CU_ASSERT(rc == 0);
714 
715 	CU_ASSERT(bdev_ch[0]->flags == 0);
716 	CU_ASSERT(bdev_ch[1]->flags == 0);
717 	poll_threads();
718 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_RESET_IN_PROGRESS);
719 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_RESET_IN_PROGRESS);
720 
721 	set_thread(0);
722 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
723 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
724 	CU_ASSERT(rc == 0);
725 
726 	set_thread(1);
727 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
728 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
729 	CU_ASSERT(rc == 0);
730 
731 	/*
732 	 * A reset is in progress so these read I/O should complete with aborted.  Note that we
733 	 *  need to poll_threads() since I/O completed inline have their completion deferred.
734 	 */
735 	poll_threads();
736 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
737 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
738 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
739 
740 	/*
741 	 * Complete the reset
742 	 */
743 	set_thread(0);
744 	stub_complete_io(g_bdev.io_target, 0);
745 
746 	/*
747 	 * Only poll thread 0. We should not get a completion.
748 	 */
749 	poll_thread(0);
750 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
751 
752 	/*
753 	 * Poll both thread 0 and 1 so the messages can propagate and we
754 	 * get a completion.
755 	 */
756 	poll_threads();
757 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
758 
759 	spdk_put_io_channel(io_ch[0]);
760 	set_thread(1);
761 	spdk_put_io_channel(io_ch[1]);
762 	poll_threads();
763 
764 	teardown_test();
765 }
766 
767 static uint32_t
768 count_queued_resets(void *io_target)
769 {
770 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
771 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
772 	struct spdk_bdev_io *io;
773 	uint32_t submitted_resets = 0;
774 
775 	TAILQ_FOREACH(io, &ch->outstanding_io, module_link) {
776 		if (io->type == SPDK_BDEV_IO_TYPE_RESET) {
777 			submitted_resets++;
778 		}
779 	}
780 
781 	spdk_put_io_channel(_ch);
782 
783 	return submitted_resets;
784 }
785 
786 static void
787 reset_completions(void)
788 {
789 	struct spdk_io_channel *io_ch;
790 	struct spdk_bdev_channel *bdev_ch;
791 	struct spdk_bdev *bdev;
792 	enum spdk_bdev_io_status status0, status_reset;
793 	int rc, iter;
794 
795 	setup_test();
796 
797 	/* This test covers four test cases:
798 	 * 1) reset_io_drain_timeout of a bdev is greater than 0
799 	 * 2) No outstandind IO are present on any bdev channel
800 	 * 3) Outstanding IO finish during bdev reset
801 	 * 4) Outstanding IO do not finish before reset is done waiting
802 	 *    for them.
803 	 *
804 	 * Above conditions mainly affect the timing of bdev reset completion
805 	 * and whether a reset should be skipped via spdk_bdev_io_complete()
806 	 * or sent down to the underlying bdev module via bdev_io_submit_reset(). */
807 
808 	/* Test preparation */
809 	set_thread(0);
810 	io_ch = spdk_bdev_get_io_channel(g_desc);
811 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
812 	CU_ASSERT(bdev_ch->flags == 0);
813 
814 
815 	/* Test case 1) reset_io_drain_timeout set to 0. Reset should be sent down immediately. */
816 	bdev = &g_bdev.bdev;
817 	bdev->reset_io_drain_timeout = 0;
818 
819 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
820 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
821 	CU_ASSERT(rc == 0);
822 	poll_threads();
823 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 1);
824 
825 	/* Call reset completion inside bdev module. */
826 	stub_complete_io(g_bdev.io_target, 0);
827 	poll_threads();
828 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
829 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
830 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
831 
832 
833 	/* Test case 2) no outstanding IO are present. Reset should perform one iteration over
834 	* channels and then be skipped. */
835 	bdev->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
836 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
837 
838 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
839 	CU_ASSERT(rc == 0);
840 	poll_threads();
841 	/* Reset was never submitted to the bdev module. */
842 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
843 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
844 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
845 
846 
847 	/* Test case 3) outstanding IO finish during bdev reset procedure. Reset should initiate
848 	* wait poller to check for IO completions every second, until reset_io_drain_timeout is
849 	* reached, but finish earlier than this threshold. */
850 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
851 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
852 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, io_during_io_done, &status0);
853 	CU_ASSERT(rc == 0);
854 
855 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
856 	CU_ASSERT(rc == 0);
857 	poll_threads();
858 	/* The reset just started and should not have been submitted yet. */
859 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
860 
861 	poll_threads();
862 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
863 	/* Let the poller wait for about half the time then complete outstanding IO. */
864 	for (iter = 0; iter < 2; iter++) {
865 		/* Reset is still processing and not submitted at this point. */
866 		CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
867 		spdk_delay_us(1000 * 1000);
868 		poll_threads();
869 		poll_threads();
870 	}
871 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
872 	stub_complete_io(g_bdev.io_target, 0);
873 	poll_threads();
874 	spdk_delay_us(BDEV_RESET_CHECK_OUTSTANDING_IO_PERIOD);
875 	poll_threads();
876 	poll_threads();
877 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
878 	/* Sending reset to the bdev module has been skipped. */
879 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
880 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
881 
882 
883 	/* Test case 4) outstanding IO are still present after reset_io_drain_timeout
884 	* seconds have passed. */
885 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
886 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
887 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, io_during_io_done, &status0);
888 	CU_ASSERT(rc == 0);
889 
890 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
891 	CU_ASSERT(rc == 0);
892 	poll_threads();
893 	/* The reset just started and should not have been submitted yet. */
894 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
895 
896 	poll_threads();
897 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
898 	/* Let the poller wait for reset_io_drain_timeout seconds. */
899 	for (iter = 0; iter < bdev->reset_io_drain_timeout; iter++) {
900 		CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
901 		spdk_delay_us(BDEV_RESET_CHECK_OUTSTANDING_IO_PERIOD);
902 		poll_threads();
903 		poll_threads();
904 	}
905 
906 	/* After timing out, the reset should have been sent to the module. */
907 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 1);
908 	/* Complete reset submitted to the module and the read IO. */
909 	stub_complete_io(g_bdev.io_target, 0);
910 	poll_threads();
911 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
912 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
913 
914 
915 	/* Destroy the channel and end the test. */
916 	spdk_put_io_channel(io_ch);
917 	poll_threads();
918 
919 	teardown_test();
920 }
921 
922 
923 static void
924 basic_qos(void)
925 {
926 	struct spdk_io_channel *io_ch[2];
927 	struct spdk_bdev_channel *bdev_ch[2];
928 	struct spdk_bdev *bdev;
929 	enum spdk_bdev_io_status status, abort_status;
930 	int rc;
931 
932 	setup_test();
933 
934 	/* Enable QoS */
935 	bdev = &g_bdev.bdev;
936 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
937 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
938 	TAILQ_INIT(&bdev->internal.qos->queued);
939 	/*
940 	 * Enable read/write IOPS, read only byte per second and
941 	 * read/write byte per second rate limits.
942 	 * In this case, all rate limits will take equal effect.
943 	 */
944 	/* 2000 read/write I/O per second, or 2 per millisecond */
945 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
946 	/* 8K read/write byte per millisecond with 4K block size */
947 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
948 	/* 8K read only byte per millisecond with 4K block size */
949 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 8192000;
950 
951 	g_get_io_channel = true;
952 
953 	set_thread(0);
954 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
955 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
956 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
957 
958 	set_thread(1);
959 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
960 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
961 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
962 
963 	/*
964 	 * Send an I/O on thread 0, which is where the QoS thread is running.
965 	 */
966 	set_thread(0);
967 	status = SPDK_BDEV_IO_STATUS_PENDING;
968 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
969 	CU_ASSERT(rc == 0);
970 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
971 	poll_threads();
972 	stub_complete_io(g_bdev.io_target, 0);
973 	poll_threads();
974 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
975 
976 	/* Send an I/O on thread 1. The QoS thread is not running here. */
977 	status = SPDK_BDEV_IO_STATUS_PENDING;
978 	set_thread(1);
979 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
980 	CU_ASSERT(rc == 0);
981 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
982 	poll_threads();
983 	/* Complete I/O on thread 0. This should not complete the I/O we submitted. */
984 	set_thread(0);
985 	stub_complete_io(g_bdev.io_target, 0);
986 	poll_threads();
987 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
988 	/* Now complete I/O on original thread 1. */
989 	set_thread(1);
990 	poll_threads();
991 	stub_complete_io(g_bdev.io_target, 0);
992 	poll_threads();
993 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
994 
995 	/* Reset rate limit for the next test cases. */
996 	spdk_delay_us(SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
997 	poll_threads();
998 
999 	/*
1000 	 * Test abort request when QoS is enabled.
1001 	 */
1002 
1003 	/* Send an I/O on thread 0, which is where the QoS thread is running. */
1004 	set_thread(0);
1005 	status = SPDK_BDEV_IO_STATUS_PENDING;
1006 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
1007 	CU_ASSERT(rc == 0);
1008 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
1009 	/* Send an abort to the I/O on the same thread. */
1010 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
1011 	rc = spdk_bdev_abort(g_desc, io_ch[0], &status, io_during_io_done, &abort_status);
1012 	CU_ASSERT(rc == 0);
1013 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
1014 	poll_threads();
1015 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1016 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
1017 
1018 	/* Send an I/O on thread 1. The QoS thread is not running here. */
1019 	status = SPDK_BDEV_IO_STATUS_PENDING;
1020 	set_thread(1);
1021 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
1022 	CU_ASSERT(rc == 0);
1023 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
1024 	poll_threads();
1025 	/* Send an abort to the I/O on the same thread. */
1026 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
1027 	rc = spdk_bdev_abort(g_desc, io_ch[1], &status, io_during_io_done, &abort_status);
1028 	CU_ASSERT(rc == 0);
1029 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
1030 	poll_threads();
1031 	/* Complete the I/O with failure and the abort with success on thread 1. */
1032 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1033 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
1034 
1035 	set_thread(0);
1036 
1037 	/*
1038 	 * Close the descriptor only, which should stop the qos channel as
1039 	 * the last descriptor removed.
1040 	 */
1041 	spdk_bdev_close(g_desc);
1042 	poll_threads();
1043 	CU_ASSERT(bdev->internal.qos->ch == NULL);
1044 
1045 	/*
1046 	 * Open the bdev again which shall setup the qos channel as the
1047 	 * channels are valid.
1048 	 */
1049 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
1050 	poll_threads();
1051 	CU_ASSERT(bdev->internal.qos->ch != NULL);
1052 
1053 	/* Tear down the channels */
1054 	set_thread(0);
1055 	spdk_put_io_channel(io_ch[0]);
1056 	set_thread(1);
1057 	spdk_put_io_channel(io_ch[1]);
1058 	poll_threads();
1059 	set_thread(0);
1060 
1061 	/* Close the descriptor, which should stop the qos channel */
1062 	spdk_bdev_close(g_desc);
1063 	poll_threads();
1064 	CU_ASSERT(bdev->internal.qos->ch == NULL);
1065 
1066 	/* Open the bdev again, no qos channel setup without valid channels. */
1067 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
1068 	poll_threads();
1069 	CU_ASSERT(bdev->internal.qos->ch == NULL);
1070 
1071 	/* Create the channels in reverse order. */
1072 	set_thread(1);
1073 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1074 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1075 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
1076 
1077 	set_thread(0);
1078 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1079 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1080 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
1081 
1082 	/* Confirm that the qos thread is now thread 1 */
1083 	CU_ASSERT(bdev->internal.qos->ch == bdev_ch[1]);
1084 
1085 	/* Tear down the channels */
1086 	set_thread(0);
1087 	spdk_put_io_channel(io_ch[0]);
1088 	set_thread(1);
1089 	spdk_put_io_channel(io_ch[1]);
1090 	poll_threads();
1091 
1092 	set_thread(0);
1093 
1094 	teardown_test();
1095 }
1096 
1097 static void
1098 io_during_qos_queue(void)
1099 {
1100 	struct spdk_io_channel *io_ch[2];
1101 	struct spdk_bdev_channel *bdev_ch[2];
1102 	struct spdk_bdev *bdev;
1103 	enum spdk_bdev_io_status status0, status1, status2;
1104 	int rc;
1105 
1106 	setup_test();
1107 	MOCK_SET(spdk_get_ticks, 0);
1108 
1109 	/* Enable QoS */
1110 	bdev = &g_bdev.bdev;
1111 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
1112 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
1113 	TAILQ_INIT(&bdev->internal.qos->queued);
1114 	/*
1115 	 * Enable read/write IOPS, read only byte per sec, write only
1116 	 * byte per sec and read/write byte per sec rate limits.
1117 	 * In this case, both read only and write only byte per sec
1118 	 * rate limit will take effect.
1119 	 */
1120 	/* 4000 read/write I/O per second, or 4 per millisecond */
1121 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 4000;
1122 	/* 8K byte per millisecond with 4K block size */
1123 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
1124 	/* 4K byte per millisecond with 4K block size */
1125 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 4096000;
1126 	/* 4K byte per millisecond with 4K block size */
1127 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 4096000;
1128 
1129 	g_get_io_channel = true;
1130 
1131 	/* Create channels */
1132 	set_thread(0);
1133 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1134 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1135 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
1136 
1137 	set_thread(1);
1138 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1139 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1140 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
1141 
1142 	/* Send two read I/Os */
1143 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
1144 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
1145 	CU_ASSERT(rc == 0);
1146 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
1147 	set_thread(0);
1148 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
1149 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
1150 	CU_ASSERT(rc == 0);
1151 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
1152 	/* Send one write I/O */
1153 	status2 = SPDK_BDEV_IO_STATUS_PENDING;
1154 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status2);
1155 	CU_ASSERT(rc == 0);
1156 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_PENDING);
1157 
1158 	/* Complete any I/O that arrived at the disk */
1159 	poll_threads();
1160 	set_thread(1);
1161 	stub_complete_io(g_bdev.io_target, 0);
1162 	set_thread(0);
1163 	stub_complete_io(g_bdev.io_target, 0);
1164 	poll_threads();
1165 
1166 	/* Only one of the two read I/Os should complete. (logical XOR) */
1167 	if (status0 == SPDK_BDEV_IO_STATUS_SUCCESS) {
1168 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
1169 	} else {
1170 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
1171 	}
1172 	/* The write I/O should complete. */
1173 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
1174 
1175 	/* Advance in time by a millisecond */
1176 	spdk_delay_us(1000);
1177 
1178 	/* Complete more I/O */
1179 	poll_threads();
1180 	set_thread(1);
1181 	stub_complete_io(g_bdev.io_target, 0);
1182 	set_thread(0);
1183 	stub_complete_io(g_bdev.io_target, 0);
1184 	poll_threads();
1185 
1186 	/* Now the second read I/O should be done */
1187 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
1188 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
1189 
1190 	/* Tear down the channels */
1191 	set_thread(1);
1192 	spdk_put_io_channel(io_ch[1]);
1193 	set_thread(0);
1194 	spdk_put_io_channel(io_ch[0]);
1195 	poll_threads();
1196 
1197 	teardown_test();
1198 }
1199 
1200 static void
1201 io_during_qos_reset(void)
1202 {
1203 	struct spdk_io_channel *io_ch[2];
1204 	struct spdk_bdev_channel *bdev_ch[2];
1205 	struct spdk_bdev *bdev;
1206 	enum spdk_bdev_io_status status0, status1, reset_status;
1207 	int rc;
1208 
1209 	setup_test();
1210 	MOCK_SET(spdk_get_ticks, 0);
1211 
1212 	/* Enable QoS */
1213 	bdev = &g_bdev.bdev;
1214 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
1215 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
1216 	TAILQ_INIT(&bdev->internal.qos->queued);
1217 	/*
1218 	 * Enable read/write IOPS, write only byte per sec and
1219 	 * read/write byte per second rate limits.
1220 	 * In this case, read/write byte per second rate limit will
1221 	 * take effect first.
1222 	 */
1223 	/* 2000 read/write I/O per second, or 2 per millisecond */
1224 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
1225 	/* 4K byte per millisecond with 4K block size */
1226 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 4096000;
1227 	/* 8K byte per millisecond with 4K block size */
1228 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 8192000;
1229 
1230 	g_get_io_channel = true;
1231 
1232 	/* Create channels */
1233 	set_thread(0);
1234 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1235 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1236 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
1237 
1238 	set_thread(1);
1239 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1240 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1241 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
1242 
1243 	/* Send two I/O. One of these gets queued by QoS. The other is sitting at the disk. */
1244 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
1245 	rc = spdk_bdev_write_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
1246 	CU_ASSERT(rc == 0);
1247 	set_thread(0);
1248 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
1249 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
1250 	CU_ASSERT(rc == 0);
1251 
1252 	poll_threads();
1253 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
1254 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
1255 
1256 	/* Reset the bdev. */
1257 	reset_status = SPDK_BDEV_IO_STATUS_PENDING;
1258 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &reset_status);
1259 	CU_ASSERT(rc == 0);
1260 
1261 	/* Complete any I/O that arrived at the disk */
1262 	poll_threads();
1263 	set_thread(1);
1264 	stub_complete_io(g_bdev.io_target, 0);
1265 	set_thread(0);
1266 	stub_complete_io(g_bdev.io_target, 0);
1267 	poll_threads();
1268 
1269 	CU_ASSERT(reset_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1270 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
1271 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
1272 
1273 	/* Tear down the channels */
1274 	set_thread(1);
1275 	spdk_put_io_channel(io_ch[1]);
1276 	set_thread(0);
1277 	spdk_put_io_channel(io_ch[0]);
1278 	poll_threads();
1279 
1280 	teardown_test();
1281 }
1282 
1283 static void
1284 enomem_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1285 {
1286 	enum spdk_bdev_io_status *status = cb_arg;
1287 
1288 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1289 	spdk_bdev_free_io(bdev_io);
1290 }
1291 
1292 static void
1293 enomem(void)
1294 {
1295 	struct spdk_io_channel *io_ch;
1296 	struct spdk_bdev_channel *bdev_ch;
1297 	struct spdk_bdev_shared_resource *shared_resource;
1298 	struct ut_bdev_channel *ut_ch;
1299 	const uint32_t IO_ARRAY_SIZE = 64;
1300 	const uint32_t AVAIL = 20;
1301 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE], status_reset;
1302 	uint32_t nomem_cnt, i;
1303 	struct spdk_bdev_io *first_io;
1304 	int rc;
1305 
1306 	setup_test();
1307 
1308 	set_thread(0);
1309 	io_ch = spdk_bdev_get_io_channel(g_desc);
1310 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1311 	shared_resource = bdev_ch->shared_resource;
1312 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1313 	ut_ch->avail_cnt = AVAIL;
1314 
1315 	/* First submit a number of IOs equal to what the channel can support. */
1316 	for (i = 0; i < AVAIL; i++) {
1317 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1318 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1319 		CU_ASSERT(rc == 0);
1320 	}
1321 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1322 
1323 	/*
1324 	 * Next, submit one additional I/O.  This one should fail with ENOMEM and then go onto
1325 	 *  the enomem_io list.
1326 	 */
1327 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1328 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1329 	CU_ASSERT(rc == 0);
1330 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1331 	first_io = TAILQ_FIRST(&shared_resource->nomem_io);
1332 
1333 	/*
1334 	 * Now submit a bunch more I/O.  These should all fail with ENOMEM and get queued behind
1335 	 *  the first_io above.
1336 	 */
1337 	for (i = AVAIL + 1; i < IO_ARRAY_SIZE; i++) {
1338 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1339 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1340 		CU_ASSERT(rc == 0);
1341 	}
1342 
1343 	/* Assert that first_io is still at the head of the list. */
1344 	CU_ASSERT(TAILQ_FIRST(&shared_resource->nomem_io) == first_io);
1345 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == (IO_ARRAY_SIZE - AVAIL));
1346 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1347 	CU_ASSERT(shared_resource->nomem_threshold == (AVAIL - NOMEM_THRESHOLD_COUNT));
1348 
1349 	/*
1350 	 * Complete 1 I/O only.  The key check here is bdev_io_tailq_cnt - this should not have
1351 	 *  changed since completing just 1 I/O should not trigger retrying the queued nomem_io
1352 	 *  list.
1353 	 */
1354 	stub_complete_io(g_bdev.io_target, 1);
1355 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1356 
1357 	/*
1358 	 * Complete enough I/O to hit the nomem_threshold.  This should trigger retrying nomem_io,
1359 	 *  and we should see I/O get resubmitted to the test bdev module.
1360 	 */
1361 	stub_complete_io(g_bdev.io_target, NOMEM_THRESHOLD_COUNT - 1);
1362 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) < nomem_cnt);
1363 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1364 
1365 	/* Complete 1 I/O only.  This should not trigger retrying the queued nomem_io. */
1366 	stub_complete_io(g_bdev.io_target, 1);
1367 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1368 
1369 	/*
1370 	 * Send a reset and confirm that all I/O are completed, including the ones that
1371 	 *  were queued on the nomem_io list.
1372 	 */
1373 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
1374 	rc = spdk_bdev_reset(g_desc, io_ch, enomem_done, &status_reset);
1375 	poll_threads();
1376 	CU_ASSERT(rc == 0);
1377 	/* This will complete the reset. */
1378 	stub_complete_io(g_bdev.io_target, 0);
1379 
1380 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == 0);
1381 	CU_ASSERT(shared_resource->io_outstanding == 0);
1382 
1383 	spdk_put_io_channel(io_ch);
1384 	poll_threads();
1385 	teardown_test();
1386 }
1387 
1388 static void
1389 enomem_multi_bdev(void)
1390 {
1391 	struct spdk_io_channel *io_ch;
1392 	struct spdk_bdev_channel *bdev_ch;
1393 	struct spdk_bdev_shared_resource *shared_resource;
1394 	struct ut_bdev_channel *ut_ch;
1395 	const uint32_t IO_ARRAY_SIZE = 64;
1396 	const uint32_t AVAIL = 20;
1397 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1398 	uint32_t i;
1399 	struct ut_bdev *second_bdev;
1400 	struct spdk_bdev_desc *second_desc = NULL;
1401 	struct spdk_bdev_channel *second_bdev_ch;
1402 	struct spdk_io_channel *second_ch;
1403 	int rc;
1404 
1405 	setup_test();
1406 
1407 	/* Register second bdev with the same io_target  */
1408 	second_bdev = calloc(1, sizeof(*second_bdev));
1409 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1410 	register_bdev(second_bdev, "ut_bdev2", g_bdev.io_target);
1411 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1412 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1413 
1414 	set_thread(0);
1415 	io_ch = spdk_bdev_get_io_channel(g_desc);
1416 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1417 	shared_resource = bdev_ch->shared_resource;
1418 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1419 	ut_ch->avail_cnt = AVAIL;
1420 
1421 	second_ch = spdk_bdev_get_io_channel(second_desc);
1422 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1423 	SPDK_CU_ASSERT_FATAL(shared_resource == second_bdev_ch->shared_resource);
1424 
1425 	/* Saturate io_target through bdev A. */
1426 	for (i = 0; i < AVAIL; i++) {
1427 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1428 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1429 		CU_ASSERT(rc == 0);
1430 	}
1431 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1432 
1433 	/*
1434 	 * Now submit I/O through the second bdev. This should fail with ENOMEM
1435 	 * and then go onto the nomem_io list.
1436 	 */
1437 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1438 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1439 	CU_ASSERT(rc == 0);
1440 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1441 
1442 	/* Complete first bdev's I/O. This should retry sending second bdev's nomem_io */
1443 	stub_complete_io(g_bdev.io_target, AVAIL);
1444 
1445 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1446 	CU_ASSERT(shared_resource->io_outstanding == 1);
1447 
1448 	/* Now complete our retried I/O  */
1449 	stub_complete_io(g_bdev.io_target, 1);
1450 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1451 
1452 	spdk_put_io_channel(io_ch);
1453 	spdk_put_io_channel(second_ch);
1454 	spdk_bdev_close(second_desc);
1455 	unregister_bdev(second_bdev);
1456 	poll_threads();
1457 	free(second_bdev);
1458 	teardown_test();
1459 }
1460 
1461 static void
1462 enomem_multi_bdev_unregister(void)
1463 {
1464 	struct spdk_io_channel *io_ch;
1465 	struct spdk_bdev_channel *bdev_ch;
1466 	struct spdk_bdev_shared_resource *shared_resource;
1467 	struct ut_bdev_channel *ut_ch;
1468 	const uint32_t IO_ARRAY_SIZE = 64;
1469 	const uint32_t AVAIL = 20;
1470 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1471 	uint32_t i;
1472 	int rc;
1473 
1474 	setup_test();
1475 
1476 	set_thread(0);
1477 	io_ch = spdk_bdev_get_io_channel(g_desc);
1478 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1479 	shared_resource = bdev_ch->shared_resource;
1480 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1481 	ut_ch->avail_cnt = AVAIL;
1482 
1483 	/* Saturate io_target through the bdev. */
1484 	for (i = 0; i < AVAIL; i++) {
1485 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1486 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1487 		CU_ASSERT(rc == 0);
1488 	}
1489 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1490 
1491 	/*
1492 	 * Now submit I/O through the bdev. This should fail with ENOMEM
1493 	 * and then go onto the nomem_io list.
1494 	 */
1495 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1496 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1497 	CU_ASSERT(rc == 0);
1498 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1499 
1500 	/* Unregister the bdev to abort the IOs from nomem_io queue. */
1501 	unregister_bdev(&g_bdev);
1502 	CU_ASSERT(status[AVAIL] == SPDK_BDEV_IO_STATUS_FAILED);
1503 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1504 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == AVAIL);
1505 
1506 	/* Complete the bdev's I/O. */
1507 	stub_complete_io(g_bdev.io_target, AVAIL);
1508 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1509 
1510 	spdk_put_io_channel(io_ch);
1511 	poll_threads();
1512 	teardown_test();
1513 }
1514 
1515 static void
1516 enomem_multi_io_target(void)
1517 {
1518 	struct spdk_io_channel *io_ch;
1519 	struct spdk_bdev_channel *bdev_ch;
1520 	struct ut_bdev_channel *ut_ch;
1521 	const uint32_t IO_ARRAY_SIZE = 64;
1522 	const uint32_t AVAIL = 20;
1523 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1524 	uint32_t i;
1525 	int new_io_device;
1526 	struct ut_bdev *second_bdev;
1527 	struct spdk_bdev_desc *second_desc = NULL;
1528 	struct spdk_bdev_channel *second_bdev_ch;
1529 	struct spdk_io_channel *second_ch;
1530 	int rc;
1531 
1532 	setup_test();
1533 
1534 	/* Create new io_target and a second bdev using it */
1535 	spdk_io_device_register(&new_io_device, stub_create_ch, stub_destroy_ch,
1536 				sizeof(struct ut_bdev_channel), NULL);
1537 	second_bdev = calloc(1, sizeof(*second_bdev));
1538 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1539 	register_bdev(second_bdev, "ut_bdev2", &new_io_device);
1540 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1541 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1542 
1543 	set_thread(0);
1544 	io_ch = spdk_bdev_get_io_channel(g_desc);
1545 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1546 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1547 	ut_ch->avail_cnt = AVAIL;
1548 
1549 	/* Different io_target should imply a different shared_resource */
1550 	second_ch = spdk_bdev_get_io_channel(second_desc);
1551 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1552 	SPDK_CU_ASSERT_FATAL(bdev_ch->shared_resource != second_bdev_ch->shared_resource);
1553 
1554 	/* Saturate io_target through bdev A. */
1555 	for (i = 0; i < AVAIL; i++) {
1556 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1557 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1558 		CU_ASSERT(rc == 0);
1559 	}
1560 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1561 
1562 	/* Issue one more I/O to fill ENOMEM list. */
1563 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1564 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1565 	CU_ASSERT(rc == 0);
1566 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1567 
1568 	/*
1569 	 * Now submit I/O through the second bdev. This should go through and complete
1570 	 * successfully because we're using a different io_device underneath.
1571 	 */
1572 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1573 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1574 	CU_ASSERT(rc == 0);
1575 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&second_bdev_ch->shared_resource->nomem_io));
1576 	stub_complete_io(second_bdev->io_target, 1);
1577 
1578 	/* Cleanup; Complete outstanding I/O. */
1579 	stub_complete_io(g_bdev.io_target, AVAIL);
1580 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1581 	/* Complete the ENOMEM I/O */
1582 	stub_complete_io(g_bdev.io_target, 1);
1583 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1584 
1585 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1586 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1587 	spdk_put_io_channel(io_ch);
1588 	spdk_put_io_channel(second_ch);
1589 	spdk_bdev_close(second_desc);
1590 	unregister_bdev(second_bdev);
1591 	spdk_io_device_unregister(&new_io_device, NULL);
1592 	poll_threads();
1593 	free(second_bdev);
1594 	teardown_test();
1595 }
1596 
1597 static void
1598 qos_dynamic_enable_done(void *cb_arg, int status)
1599 {
1600 	int *rc = cb_arg;
1601 	*rc = status;
1602 }
1603 
1604 static void
1605 qos_dynamic_enable(void)
1606 {
1607 	struct spdk_io_channel *io_ch[2];
1608 	struct spdk_bdev_channel *bdev_ch[2];
1609 	struct spdk_bdev *bdev;
1610 	enum spdk_bdev_io_status bdev_io_status[2];
1611 	uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
1612 	int status, second_status, rc, i;
1613 
1614 	setup_test();
1615 	MOCK_SET(spdk_get_ticks, 0);
1616 
1617 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1618 		limits[i] = UINT64_MAX;
1619 	}
1620 
1621 	bdev = &g_bdev.bdev;
1622 
1623 	g_get_io_channel = true;
1624 
1625 	/* Create channels */
1626 	set_thread(0);
1627 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1628 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1629 	CU_ASSERT(bdev_ch[0]->flags == 0);
1630 
1631 	set_thread(1);
1632 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1633 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1634 	CU_ASSERT(bdev_ch[1]->flags == 0);
1635 
1636 	set_thread(0);
1637 
1638 	/*
1639 	 * Enable QoS: Read/Write IOPS, Read/Write byte,
1640 	 * Read only byte and Write only byte per second
1641 	 * rate limits.
1642 	 * More than 10 I/Os allowed per timeslice.
1643 	 */
1644 	status = -1;
1645 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1646 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 100;
1647 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 100;
1648 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 10;
1649 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1650 	poll_threads();
1651 	CU_ASSERT(status == 0);
1652 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1653 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1654 
1655 	/*
1656 	 * Submit and complete 10 I/O to fill the QoS allotment for this timeslice.
1657 	 * Additional I/O will then be queued.
1658 	 */
1659 	set_thread(0);
1660 	for (i = 0; i < 10; i++) {
1661 		bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1662 		rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1663 		CU_ASSERT(rc == 0);
1664 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1665 		poll_thread(0);
1666 		stub_complete_io(g_bdev.io_target, 0);
1667 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1668 	}
1669 
1670 	/*
1671 	 * Send two more I/O.  These I/O will be queued since the current timeslice allotment has been
1672 	 * filled already.  We want to test that when QoS is disabled that these two I/O:
1673 	 *  1) are not aborted
1674 	 *  2) are sent back to their original thread for resubmission
1675 	 */
1676 	bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1677 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1678 	CU_ASSERT(rc == 0);
1679 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1680 	set_thread(1);
1681 	bdev_io_status[1] = SPDK_BDEV_IO_STATUS_PENDING;
1682 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &bdev_io_status[1]);
1683 	CU_ASSERT(rc == 0);
1684 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1685 	poll_threads();
1686 
1687 	/*
1688 	 * Disable QoS: Read/Write IOPS, Read/Write byte,
1689 	 * Read only byte rate limits
1690 	 */
1691 	status = -1;
1692 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1693 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 0;
1694 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 0;
1695 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1696 	poll_threads();
1697 	CU_ASSERT(status == 0);
1698 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1699 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1700 
1701 	/* Disable QoS: Write only Byte per second rate limit */
1702 	status = -1;
1703 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 0;
1704 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1705 	poll_threads();
1706 	CU_ASSERT(status == 0);
1707 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1708 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1709 
1710 	/*
1711 	 * All I/O should have been resubmitted back on their original thread.  Complete
1712 	 *  all I/O on thread 0, and ensure that only the thread 0 I/O was completed.
1713 	 */
1714 	set_thread(0);
1715 	stub_complete_io(g_bdev.io_target, 0);
1716 	poll_threads();
1717 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1718 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1719 
1720 	/* Now complete all I/O on thread 1 and ensure the thread 1 I/O was completed. */
1721 	set_thread(1);
1722 	stub_complete_io(g_bdev.io_target, 0);
1723 	poll_threads();
1724 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_SUCCESS);
1725 
1726 	/* Disable QoS again */
1727 	status = -1;
1728 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1729 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1730 	poll_threads();
1731 	CU_ASSERT(status == 0); /* This should succeed */
1732 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1733 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1734 
1735 	/* Enable QoS on thread 0 */
1736 	status = -1;
1737 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1738 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1739 	poll_threads();
1740 	CU_ASSERT(status == 0);
1741 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1742 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1743 
1744 	/* Disable QoS on thread 1 */
1745 	set_thread(1);
1746 	status = -1;
1747 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1748 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1749 	/* Don't poll yet. This should leave the channels with QoS enabled */
1750 	CU_ASSERT(status == -1);
1751 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1752 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1753 
1754 	/* Enable QoS. This should immediately fail because the previous disable QoS hasn't completed. */
1755 	second_status = 0;
1756 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 10;
1757 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &second_status);
1758 	poll_threads();
1759 	CU_ASSERT(status == 0); /* The disable should succeed */
1760 	CU_ASSERT(second_status < 0); /* The enable should fail */
1761 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1762 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1763 
1764 	/* Enable QoS on thread 1. This should succeed now that the disable has completed. */
1765 	status = -1;
1766 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1767 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1768 	poll_threads();
1769 	CU_ASSERT(status == 0);
1770 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1771 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1772 
1773 	/* Tear down the channels */
1774 	set_thread(0);
1775 	spdk_put_io_channel(io_ch[0]);
1776 	set_thread(1);
1777 	spdk_put_io_channel(io_ch[1]);
1778 	poll_threads();
1779 
1780 	set_thread(0);
1781 	teardown_test();
1782 }
1783 
1784 static void
1785 histogram_status_cb(void *cb_arg, int status)
1786 {
1787 	g_status = status;
1788 }
1789 
1790 static void
1791 histogram_data_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
1792 {
1793 	g_status = status;
1794 	g_histogram = histogram;
1795 }
1796 
1797 static void
1798 histogram_io_count(void *ctx, uint64_t start, uint64_t end, uint64_t count,
1799 		   uint64_t total, uint64_t so_far)
1800 {
1801 	g_count += count;
1802 }
1803 
1804 static void
1805 bdev_histograms_mt(void)
1806 {
1807 	struct spdk_io_channel *ch[2];
1808 	struct spdk_histogram_data *histogram;
1809 	uint8_t buf[4096];
1810 	int status = false;
1811 	int rc;
1812 
1813 
1814 	setup_test();
1815 
1816 	set_thread(0);
1817 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1818 	CU_ASSERT(ch[0] != NULL);
1819 
1820 	set_thread(1);
1821 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1822 	CU_ASSERT(ch[1] != NULL);
1823 
1824 
1825 	/* Enable histogram */
1826 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, true);
1827 	poll_threads();
1828 	CU_ASSERT(g_status == 0);
1829 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1830 
1831 	/* Allocate histogram */
1832 	histogram = spdk_histogram_data_alloc();
1833 
1834 	/* Check if histogram is zeroed */
1835 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1836 	poll_threads();
1837 	CU_ASSERT(g_status == 0);
1838 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1839 
1840 	g_count = 0;
1841 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1842 
1843 	CU_ASSERT(g_count == 0);
1844 
1845 	set_thread(0);
1846 	rc = spdk_bdev_write_blocks(g_desc, ch[0], &buf, 0, 1, io_during_io_done, &status);
1847 	CU_ASSERT(rc == 0);
1848 
1849 	spdk_delay_us(10);
1850 	stub_complete_io(g_bdev.io_target, 1);
1851 	poll_threads();
1852 	CU_ASSERT(status == true);
1853 
1854 
1855 	set_thread(1);
1856 	rc = spdk_bdev_read_blocks(g_desc, ch[1], &buf, 0, 1, io_during_io_done, &status);
1857 	CU_ASSERT(rc == 0);
1858 
1859 	spdk_delay_us(10);
1860 	stub_complete_io(g_bdev.io_target, 1);
1861 	poll_threads();
1862 	CU_ASSERT(status == true);
1863 
1864 	set_thread(0);
1865 
1866 	/* Check if histogram gathered data from all I/O channels */
1867 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1868 	poll_threads();
1869 	CU_ASSERT(g_status == 0);
1870 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1871 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1872 
1873 	g_count = 0;
1874 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1875 	CU_ASSERT(g_count == 2);
1876 
1877 	/* Disable histogram */
1878 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, false);
1879 	poll_threads();
1880 	CU_ASSERT(g_status == 0);
1881 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == false);
1882 
1883 	spdk_histogram_data_free(histogram);
1884 
1885 	/* Tear down the channels */
1886 	set_thread(0);
1887 	spdk_put_io_channel(ch[0]);
1888 	set_thread(1);
1889 	spdk_put_io_channel(ch[1]);
1890 	poll_threads();
1891 	set_thread(0);
1892 	teardown_test();
1893 
1894 }
1895 
1896 struct timeout_io_cb_arg {
1897 	struct iovec iov;
1898 	uint8_t type;
1899 };
1900 
1901 static int
1902 bdev_channel_count_submitted_io(struct spdk_bdev_channel *ch)
1903 {
1904 	struct spdk_bdev_io *bdev_io;
1905 	int n = 0;
1906 
1907 	if (!ch) {
1908 		return -1;
1909 	}
1910 
1911 	TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) {
1912 		n++;
1913 	}
1914 
1915 	return n;
1916 }
1917 
1918 static void
1919 bdev_channel_io_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1920 {
1921 	struct timeout_io_cb_arg *ctx = cb_arg;
1922 
1923 	ctx->type = bdev_io->type;
1924 	ctx->iov.iov_base = bdev_io->iov.iov_base;
1925 	ctx->iov.iov_len = bdev_io->iov.iov_len;
1926 }
1927 
1928 static bool g_io_done;
1929 
1930 static void
1931 io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1932 {
1933 	g_io_done = true;
1934 	spdk_bdev_free_io(bdev_io);
1935 }
1936 
1937 static void
1938 bdev_set_io_timeout_mt(void)
1939 {
1940 	struct spdk_io_channel *ch[3];
1941 	struct spdk_bdev_channel *bdev_ch[3];
1942 	struct timeout_io_cb_arg cb_arg;
1943 
1944 	setup_test();
1945 
1946 	g_bdev.bdev.optimal_io_boundary = 16;
1947 	g_bdev.bdev.split_on_optimal_io_boundary = true;
1948 
1949 	set_thread(0);
1950 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1951 	CU_ASSERT(ch[0] != NULL);
1952 
1953 	set_thread(1);
1954 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1955 	CU_ASSERT(ch[1] != NULL);
1956 
1957 	set_thread(2);
1958 	ch[2] = spdk_bdev_get_io_channel(g_desc);
1959 	CU_ASSERT(ch[2] != NULL);
1960 
1961 	/* Multi-thread mode
1962 	 * 1, Check the poller was registered successfully
1963 	 * 2, Check the timeout IO and ensure the IO was the submitted by user
1964 	 * 3, Check the link int the bdev_ch works right.
1965 	 * 4, Close desc and put io channel during the timeout poller is polling
1966 	 */
1967 
1968 	/* In desc thread set the timeout */
1969 	set_thread(0);
1970 	CU_ASSERT(spdk_bdev_set_timeout(g_desc, 5, bdev_channel_io_timeout_cb, &cb_arg) == 0);
1971 	CU_ASSERT(g_desc->io_timeout_poller != NULL);
1972 	CU_ASSERT(g_desc->cb_fn == bdev_channel_io_timeout_cb);
1973 	CU_ASSERT(g_desc->cb_arg == &cb_arg);
1974 
1975 	/* check the IO submitted list and timeout handler */
1976 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x2000, 0, 1, io_done, NULL) == 0);
1977 	bdev_ch[0] = spdk_io_channel_get_ctx(ch[0]);
1978 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 1);
1979 
1980 	set_thread(1);
1981 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1982 	bdev_ch[1] = spdk_io_channel_get_ctx(ch[1]);
1983 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 1);
1984 
1985 	/* Now test that a single-vector command is split correctly.
1986 	 * Offset 14, length 8, payload 0xF000
1987 	 *  Child - Offset 14, length 2, payload 0xF000
1988 	 *  Child - Offset 16, length 6, payload 0xF000 + 2 * 512
1989 	 *
1990 	 * Set up the expected values before calling spdk_bdev_read_blocks
1991 	 */
1992 	set_thread(2);
1993 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0xF000, 14, 8, io_done, NULL) == 0);
1994 	bdev_ch[2] = spdk_io_channel_get_ctx(ch[2]);
1995 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 3);
1996 
1997 	set_thread(0);
1998 	memset(&cb_arg, 0, sizeof(cb_arg));
1999 	spdk_delay_us(3 * spdk_get_ticks_hz());
2000 	poll_threads();
2001 	CU_ASSERT(cb_arg.type == 0);
2002 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
2003 	CU_ASSERT(cb_arg.iov.iov_len == 0);
2004 
2005 	/* Now the time reach the limit */
2006 	spdk_delay_us(3 * spdk_get_ticks_hz());
2007 	poll_thread(0);
2008 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
2009 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
2010 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
2011 	stub_complete_io(g_bdev.io_target, 1);
2012 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 0);
2013 
2014 	memset(&cb_arg, 0, sizeof(cb_arg));
2015 	set_thread(1);
2016 	poll_thread(1);
2017 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
2018 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x1000);
2019 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
2020 	stub_complete_io(g_bdev.io_target, 1);
2021 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 0);
2022 
2023 	memset(&cb_arg, 0, sizeof(cb_arg));
2024 	set_thread(2);
2025 	poll_thread(2);
2026 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
2027 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0xF000);
2028 	CU_ASSERT(cb_arg.iov.iov_len == 8 * g_bdev.bdev.blocklen);
2029 	stub_complete_io(g_bdev.io_target, 1);
2030 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 2);
2031 	stub_complete_io(g_bdev.io_target, 1);
2032 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 0);
2033 
2034 	/* Run poll_timeout_done() it means complete the timeout poller */
2035 	set_thread(0);
2036 	poll_thread(0);
2037 	CU_ASSERT(g_desc->refs == 0);
2038 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x1000, 0, 1, io_done, NULL) == 0);
2039 	set_thread(1);
2040 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x2000, 0, 2, io_done, NULL) == 0);
2041 	set_thread(2);
2042 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0x3000, 0, 3, io_done, NULL) == 0);
2043 
2044 	/* Trigger timeout poller to run again, desc->refs is incremented.
2045 	 * In thread 0 we destroy the io channel before timeout poller runs.
2046 	 * Timeout callback is not called on thread 0.
2047 	 */
2048 	spdk_delay_us(6 * spdk_get_ticks_hz());
2049 	memset(&cb_arg, 0, sizeof(cb_arg));
2050 	set_thread(0);
2051 	stub_complete_io(g_bdev.io_target, 1);
2052 	spdk_put_io_channel(ch[0]);
2053 	poll_thread(0);
2054 	CU_ASSERT(g_desc->refs == 1)
2055 	CU_ASSERT(cb_arg.type == 0);
2056 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
2057 	CU_ASSERT(cb_arg.iov.iov_len == 0);
2058 
2059 	/* In thread 1 timeout poller runs then we destroy the io channel
2060 	 * Timeout callback is called on thread 1.
2061 	 */
2062 	memset(&cb_arg, 0, sizeof(cb_arg));
2063 	set_thread(1);
2064 	poll_thread(1);
2065 	stub_complete_io(g_bdev.io_target, 1);
2066 	spdk_put_io_channel(ch[1]);
2067 	poll_thread(1);
2068 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
2069 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
2070 	CU_ASSERT(cb_arg.iov.iov_len == 2 * g_bdev.bdev.blocklen);
2071 
2072 	/* Close the desc.
2073 	 * Unregister the timeout poller first.
2074 	 * Then decrement desc->refs but it's not zero yet so desc is not freed.
2075 	 */
2076 	set_thread(0);
2077 	spdk_bdev_close(g_desc);
2078 	CU_ASSERT(g_desc->refs == 1);
2079 	CU_ASSERT(g_desc->io_timeout_poller == NULL);
2080 
2081 	/* Timeout poller runs on thread 2 then we destroy the io channel.
2082 	 * Desc is closed so we would exit the timeout poller directly.
2083 	 * timeout callback is not called on thread 2.
2084 	 */
2085 	memset(&cb_arg, 0, sizeof(cb_arg));
2086 	set_thread(2);
2087 	poll_thread(2);
2088 	stub_complete_io(g_bdev.io_target, 1);
2089 	spdk_put_io_channel(ch[2]);
2090 	poll_thread(2);
2091 	CU_ASSERT(cb_arg.type == 0);
2092 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
2093 	CU_ASSERT(cb_arg.iov.iov_len == 0);
2094 
2095 	set_thread(0);
2096 	poll_thread(0);
2097 	g_teardown_done = false;
2098 	unregister_bdev(&g_bdev);
2099 	spdk_io_device_unregister(&g_io_device, NULL);
2100 	spdk_bdev_finish(finish_cb, NULL);
2101 	spdk_iobuf_finish(finish_cb, NULL);
2102 	poll_threads();
2103 	memset(&g_bdev, 0, sizeof(g_bdev));
2104 	CU_ASSERT(g_teardown_done == true);
2105 	g_teardown_done = false;
2106 	free_threads();
2107 	free_cores();
2108 }
2109 
2110 static bool g_io_done2;
2111 static bool g_lock_lba_range_done;
2112 static bool g_unlock_lba_range_done;
2113 
2114 static void
2115 io_done2(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2116 {
2117 	g_io_done2 = true;
2118 	spdk_bdev_free_io(bdev_io);
2119 }
2120 
2121 static void
2122 lock_lba_range_done(void *ctx, int status)
2123 {
2124 	g_lock_lba_range_done = true;
2125 }
2126 
2127 static void
2128 unlock_lba_range_done(void *ctx, int status)
2129 {
2130 	g_unlock_lba_range_done = true;
2131 }
2132 
2133 static uint32_t
2134 stub_channel_outstanding_cnt(void *io_target)
2135 {
2136 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
2137 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2138 	uint32_t outstanding_cnt;
2139 
2140 	outstanding_cnt = ch->outstanding_cnt;
2141 
2142 	spdk_put_io_channel(_ch);
2143 	return outstanding_cnt;
2144 }
2145 
2146 static void
2147 lock_lba_range_then_submit_io(void)
2148 {
2149 	struct spdk_bdev_desc *desc = NULL;
2150 	void *io_target;
2151 	struct spdk_io_channel *io_ch[3];
2152 	struct spdk_bdev_channel *bdev_ch[3];
2153 	struct lba_range *range;
2154 	char buf[4096];
2155 	int ctx0, ctx1, ctx2;
2156 	int rc;
2157 
2158 	setup_test();
2159 
2160 	io_target = g_bdev.io_target;
2161 	desc = g_desc;
2162 
2163 	set_thread(0);
2164 	io_ch[0] = spdk_bdev_get_io_channel(desc);
2165 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
2166 	CU_ASSERT(io_ch[0] != NULL);
2167 
2168 	set_thread(1);
2169 	io_ch[1] = spdk_bdev_get_io_channel(desc);
2170 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
2171 	CU_ASSERT(io_ch[1] != NULL);
2172 
2173 	set_thread(0);
2174 	g_lock_lba_range_done = false;
2175 	rc = bdev_lock_lba_range(desc, io_ch[0], 20, 10, lock_lba_range_done, &ctx0);
2176 	CU_ASSERT(rc == 0);
2177 	poll_threads();
2178 
2179 	/* The lock should immediately become valid, since there are no outstanding
2180 	 * write I/O.
2181 	 */
2182 	CU_ASSERT(g_lock_lba_range_done == true);
2183 	range = TAILQ_FIRST(&bdev_ch[0]->locked_ranges);
2184 	SPDK_CU_ASSERT_FATAL(range != NULL);
2185 	CU_ASSERT(range->offset == 20);
2186 	CU_ASSERT(range->length == 10);
2187 	CU_ASSERT(range->owner_ch == bdev_ch[0]);
2188 
2189 	g_io_done = false;
2190 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2191 	rc = spdk_bdev_read_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
2192 	CU_ASSERT(rc == 0);
2193 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2194 
2195 	stub_complete_io(io_target, 1);
2196 	poll_threads();
2197 	CU_ASSERT(g_io_done == true);
2198 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2199 
2200 	/* Try a write I/O.  This should actually be allowed to execute, since the channel
2201 	 * holding the lock is submitting the write I/O.
2202 	 */
2203 	g_io_done = false;
2204 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2205 	rc = spdk_bdev_write_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
2206 	CU_ASSERT(rc == 0);
2207 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2208 
2209 	stub_complete_io(io_target, 1);
2210 	poll_threads();
2211 	CU_ASSERT(g_io_done == true);
2212 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2213 
2214 	/* Try a write I/O.  This should get queued in the io_locked tailq. */
2215 	set_thread(1);
2216 	g_io_done = false;
2217 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
2218 	rc = spdk_bdev_write_blocks(desc, io_ch[1], buf, 20, 1, io_done, &ctx1);
2219 	CU_ASSERT(rc == 0);
2220 	poll_threads();
2221 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
2222 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[1]->io_locked));
2223 	CU_ASSERT(g_io_done == false);
2224 
2225 	/* Try to unlock the lba range using thread 1's io_ch.  This should fail. */
2226 	rc = bdev_unlock_lba_range(desc, io_ch[1], 20, 10, unlock_lba_range_done, &ctx1);
2227 	CU_ASSERT(rc == -EINVAL);
2228 
2229 	/* Now create a new channel and submit a write I/O with it.  This should also be queued.
2230 	 * The new channel should inherit the active locks from the bdev's internal list.
2231 	 */
2232 	set_thread(2);
2233 	io_ch[2] = spdk_bdev_get_io_channel(desc);
2234 	bdev_ch[2] = spdk_io_channel_get_ctx(io_ch[2]);
2235 	CU_ASSERT(io_ch[2] != NULL);
2236 
2237 	g_io_done2 = false;
2238 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2239 	rc = spdk_bdev_write_blocks(desc, io_ch[2], buf, 22, 2, io_done2, &ctx2);
2240 	CU_ASSERT(rc == 0);
2241 	poll_threads();
2242 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
2243 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2244 	CU_ASSERT(g_io_done2 == false);
2245 
2246 	set_thread(0);
2247 	rc = bdev_unlock_lba_range(desc, io_ch[0], 20, 10, unlock_lba_range_done, &ctx0);
2248 	CU_ASSERT(rc == 0);
2249 	poll_threads();
2250 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->locked_ranges));
2251 
2252 	/* The LBA range is unlocked, so the write IOs should now have started execution. */
2253 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
2254 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2255 
2256 	set_thread(1);
2257 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2258 	stub_complete_io(io_target, 1);
2259 	set_thread(2);
2260 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2261 	stub_complete_io(io_target, 1);
2262 
2263 	poll_threads();
2264 	CU_ASSERT(g_io_done == true);
2265 	CU_ASSERT(g_io_done2 == true);
2266 
2267 	/* Tear down the channels */
2268 	set_thread(0);
2269 	spdk_put_io_channel(io_ch[0]);
2270 	set_thread(1);
2271 	spdk_put_io_channel(io_ch[1]);
2272 	set_thread(2);
2273 	spdk_put_io_channel(io_ch[2]);
2274 	poll_threads();
2275 	set_thread(0);
2276 	teardown_test();
2277 }
2278 
2279 /* spdk_bdev_reset() freezes and unfreezes I/O channels by using spdk_for_each_channel().
2280  * spdk_bdev_unregister() calls spdk_io_device_unregister() in the end. However
2281  * spdk_io_device_unregister() fails if it is called while executing spdk_for_each_channel().
2282  * Hence, in this case, spdk_io_device_unregister() is deferred until spdk_bdev_reset()
2283  * completes. Test this behavior.
2284  */
2285 static void
2286 unregister_during_reset(void)
2287 {
2288 	struct spdk_io_channel *io_ch[2];
2289 	bool done_reset = false, done_unregister = false;
2290 	int rc;
2291 
2292 	setup_test();
2293 	set_thread(0);
2294 
2295 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
2296 	SPDK_CU_ASSERT_FATAL(io_ch[0] != NULL);
2297 
2298 	set_thread(1);
2299 
2300 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
2301 	SPDK_CU_ASSERT_FATAL(io_ch[1] != NULL);
2302 
2303 	set_thread(0);
2304 
2305 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
2306 
2307 	rc = spdk_bdev_reset(g_desc, io_ch[0], reset_done, &done_reset);
2308 	CU_ASSERT(rc == 0);
2309 
2310 	set_thread(0);
2311 
2312 	poll_thread_times(0, 1);
2313 
2314 	spdk_bdev_close(g_desc);
2315 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done_unregister);
2316 
2317 	CU_ASSERT(done_reset == false);
2318 	CU_ASSERT(done_unregister == false);
2319 
2320 	poll_threads();
2321 
2322 	stub_complete_io(g_bdev.io_target, 0);
2323 
2324 	poll_threads();
2325 
2326 	CU_ASSERT(done_reset == true);
2327 	CU_ASSERT(done_unregister == false);
2328 
2329 	spdk_put_io_channel(io_ch[0]);
2330 
2331 	set_thread(1);
2332 
2333 	spdk_put_io_channel(io_ch[1]);
2334 
2335 	poll_threads();
2336 
2337 	CU_ASSERT(done_unregister == true);
2338 
2339 	/* Restore the original g_bdev so that we can use teardown_test(). */
2340 	set_thread(0);
2341 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
2342 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
2343 	teardown_test();
2344 }
2345 
2346 static void
2347 bdev_init_wt_cb(void *done, int rc)
2348 {
2349 }
2350 
2351 
2352 static uint64_t
2353 get_wrong_thread_hits(void)
2354 {
2355 	static uint64_t previous = 0;
2356 	uint64_t ret, current;
2357 
2358 	current = spdk_deprecation_get_hits(_deprecated_bdev_register_examine_thread);
2359 	ret = current - previous;
2360 	previous = current;
2361 
2362 	return ret;
2363 }
2364 
2365 static int
2366 wrong_thread_setup(void)
2367 {
2368 	allocate_cores(1);
2369 	allocate_threads(2);
2370 	set_thread(0);
2371 
2372 	spdk_bdev_initialize(bdev_init_wt_cb, NULL);
2373 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
2374 				sizeof(struct ut_bdev_channel), NULL);
2375 
2376 	set_thread(1);
2377 
2378 	/* Ignore return, just setting the base for the next time it is called. */
2379 	get_wrong_thread_hits();
2380 
2381 	return 0;
2382 }
2383 
2384 static int
2385 wrong_thread_teardown(void)
2386 {
2387 	int rc;
2388 
2389 	rc = get_wrong_thread_hits();
2390 
2391 	set_thread(0);
2392 
2393 	g_teardown_done = false;
2394 	spdk_io_device_unregister(&g_io_device, NULL);
2395 	spdk_bdev_finish(finish_cb, NULL);
2396 	poll_threads();
2397 	memset(&g_bdev, 0, sizeof(g_bdev));
2398 	if (!g_teardown_done) {
2399 		fprintf(stderr, "%s:%d %s: teardown not done\n", __FILE__, __LINE__, __func__);
2400 		rc = -1;
2401 	}
2402 	g_teardown_done = false;
2403 
2404 	free_threads();
2405 	free_cores();
2406 
2407 	return rc;
2408 }
2409 
2410 static void
2411 spdk_bdev_register_wt(void)
2412 {
2413 	struct spdk_bdev bdev = { 0 };
2414 	int rc;
2415 	bool done;
2416 
2417 	bdev.name = "wt_bdev";
2418 	bdev.fn_table = &fn_table;
2419 	bdev.module = &bdev_ut_if;
2420 	bdev.blocklen = 4096;
2421 	bdev.blockcnt = 1024;
2422 
2423 	/* Can register only on app thread */
2424 	rc = spdk_bdev_register(&bdev);
2425 	CU_ASSERT(rc == 0);
2426 	CU_ASSERT(get_wrong_thread_hits() == 1);
2427 
2428 	/* Can unregister on any thread */
2429 	done = false;
2430 	spdk_bdev_unregister(&bdev, _bdev_unregistered, &done);
2431 	poll_threads();
2432 	CU_ASSERT(get_wrong_thread_hits() == 0);
2433 	CU_ASSERT(done);
2434 
2435 	/* Can unregister by name on any thread */
2436 	rc = spdk_bdev_register(&bdev);
2437 	CU_ASSERT(rc == 0);
2438 	CU_ASSERT(get_wrong_thread_hits() == 1);
2439 	done = false;
2440 	rc = spdk_bdev_unregister_by_name(bdev.name, bdev.module, _bdev_unregistered, &done);
2441 	CU_ASSERT(rc == 0);
2442 	poll_threads();
2443 	CU_ASSERT(get_wrong_thread_hits() == 0);
2444 	CU_ASSERT(done);
2445 }
2446 
2447 static void
2448 wait_for_examine_cb(void *arg)
2449 {
2450 	struct spdk_thread **thread = arg;
2451 
2452 	*thread = spdk_get_thread();
2453 }
2454 
2455 static void
2456 spdk_bdev_examine_wt(void)
2457 {
2458 	int rc;
2459 	bool save_auto_examine = g_bdev_opts.bdev_auto_examine;
2460 	struct spdk_thread *thread;
2461 
2462 	g_bdev_opts.bdev_auto_examine = false;
2463 
2464 	set_thread(0);
2465 	register_bdev(&g_bdev, "ut_bdev_wt", &g_io_device);
2466 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") != NULL);
2467 	set_thread(1);
2468 
2469 	/* Can examine only on the app thread */
2470 	rc = spdk_bdev_examine("ut_bdev_wt");
2471 	CU_ASSERT(rc == 0);
2472 	poll_threads();
2473 	CU_ASSERT(get_wrong_thread_hits() == 1);
2474 	unregister_bdev(&g_bdev);
2475 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") == NULL);
2476 	CU_ASSERT(get_wrong_thread_hits() == 0);
2477 
2478 	/* Can wait for examine on app thread, callback called on app thread. */
2479 	set_thread(0);
2480 	register_bdev(&g_bdev, "ut_bdev_wt", &g_io_device);
2481 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") != NULL);
2482 	thread = NULL;
2483 	rc = spdk_bdev_wait_for_examine(wait_for_examine_cb, &thread);
2484 	CU_ASSERT(rc == 0);
2485 	poll_threads();
2486 	CU_ASSERT(get_wrong_thread_hits() == 0);
2487 	CU_ASSERT(thread == spdk_get_thread());
2488 	unregister_bdev(&g_bdev);
2489 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") == NULL);
2490 	CU_ASSERT(get_wrong_thread_hits() == 0);
2491 
2492 	/* Can wait for examine on non-app thread, callback called on same thread. */
2493 	set_thread(0);
2494 	register_bdev(&g_bdev, "ut_bdev_wt", &g_io_device);
2495 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") != NULL);
2496 	thread = NULL;
2497 	rc = spdk_bdev_wait_for_examine(wait_for_examine_cb, &thread);
2498 	CU_ASSERT(rc == 0);
2499 	poll_threads();
2500 	CU_ASSERT(get_wrong_thread_hits() == 0);
2501 	CU_ASSERT(thread == spdk_get_thread());
2502 	unregister_bdev(&g_bdev);
2503 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") == NULL);
2504 	CU_ASSERT(get_wrong_thread_hits() == 0);
2505 
2506 	unregister_bdev(&g_bdev);
2507 	g_bdev_opts.bdev_auto_examine = save_auto_examine;
2508 }
2509 
2510 int
2511 main(int argc, char **argv)
2512 {
2513 	CU_pSuite	suite = NULL;
2514 	CU_pSuite	suite_wt = NULL;
2515 	unsigned int	num_failures;
2516 
2517 	CU_set_error_action(CUEA_ABORT);
2518 	CU_initialize_registry();
2519 
2520 	suite = CU_add_suite("bdev", NULL, NULL);
2521 	suite_wt = CU_add_suite("bdev_wrong_thread", wrong_thread_setup, wrong_thread_teardown);
2522 
2523 	CU_ADD_TEST(suite, basic);
2524 	CU_ADD_TEST(suite, unregister_and_close);
2525 	CU_ADD_TEST(suite, unregister_and_close_different_threads);
2526 	CU_ADD_TEST(suite, basic_qos);
2527 	CU_ADD_TEST(suite, put_channel_during_reset);
2528 	CU_ADD_TEST(suite, aborted_reset);
2529 	CU_ADD_TEST(suite, aborted_reset_no_outstanding_io);
2530 	CU_ADD_TEST(suite, io_during_reset);
2531 	CU_ADD_TEST(suite, reset_completions);
2532 	CU_ADD_TEST(suite, io_during_qos_queue);
2533 	CU_ADD_TEST(suite, io_during_qos_reset);
2534 	CU_ADD_TEST(suite, enomem);
2535 	CU_ADD_TEST(suite, enomem_multi_bdev);
2536 	CU_ADD_TEST(suite, enomem_multi_bdev_unregister);
2537 	CU_ADD_TEST(suite, enomem_multi_io_target);
2538 	CU_ADD_TEST(suite, qos_dynamic_enable);
2539 	CU_ADD_TEST(suite, bdev_histograms_mt);
2540 	CU_ADD_TEST(suite, bdev_set_io_timeout_mt);
2541 	CU_ADD_TEST(suite, lock_lba_range_then_submit_io);
2542 	CU_ADD_TEST(suite, unregister_during_reset);
2543 	CU_ADD_TEST(suite_wt, spdk_bdev_register_wt);
2544 	CU_ADD_TEST(suite_wt, spdk_bdev_examine_wt);
2545 
2546 	CU_basic_set_mode(CU_BRM_VERBOSE);
2547 	CU_basic_run_tests();
2548 	num_failures = CU_get_number_of_failures();
2549 	CU_cleanup_registry();
2550 	return num_failures;
2551 }
2552