xref: /spdk/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c (revision eb7506a1b4fb1589911dbb1ffb5ac60a048202be)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk_cunit.h"
8 
9 #include "common/lib/ut_multithread.c"
10 #include "unit/lib/json_mock.c"
11 
12 #include "spdk/config.h"
13 /* HACK: disable VTune integration so the unit test doesn't need VTune headers and libs to build */
14 #undef SPDK_CONFIG_VTUNE
15 
16 #include "bdev/bdev.c"
17 
18 #define BDEV_UT_NUM_THREADS 3
19 
20 DEFINE_STUB(spdk_notify_send, uint64_t, (const char *type, const char *ctx), 0);
21 DEFINE_STUB(spdk_notify_type_register, struct spdk_notify_type *, (const char *type), NULL);
22 DEFINE_STUB_V(spdk_scsi_nvme_translate, (const struct spdk_bdev_io *bdev_io, int *sc, int *sk,
23 		int *asc, int *ascq));
24 DEFINE_STUB(spdk_memory_domain_get_dma_device_id, const char *, (struct spdk_memory_domain *domain),
25 	    "test_domain");
26 DEFINE_STUB(spdk_memory_domain_get_dma_device_type, enum spdk_dma_device_type,
27 	    (struct spdk_memory_domain *domain), 0);
28 
29 DEFINE_RETURN_MOCK(spdk_memory_domain_pull_data, int);
30 int
31 spdk_memory_domain_pull_data(struct spdk_memory_domain *src_domain, void *src_domain_ctx,
32 			     struct iovec *src_iov, uint32_t src_iov_cnt, struct iovec *dst_iov, uint32_t dst_iov_cnt,
33 			     spdk_memory_domain_data_cpl_cb cpl_cb, void *cpl_cb_arg)
34 {
35 	HANDLE_RETURN_MOCK(spdk_memory_domain_pull_data);
36 
37 	cpl_cb(cpl_cb_arg, 0);
38 	return 0;
39 }
40 
41 DEFINE_RETURN_MOCK(spdk_memory_domain_push_data, int);
42 int
43 spdk_memory_domain_push_data(struct spdk_memory_domain *dst_domain, void *dst_domain_ctx,
44 			     struct iovec *dst_iov, uint32_t dst_iovcnt, struct iovec *src_iov, uint32_t src_iovcnt,
45 			     spdk_memory_domain_data_cpl_cb cpl_cb, void *cpl_cb_arg)
46 {
47 	HANDLE_RETURN_MOCK(spdk_memory_domain_push_data);
48 
49 	cpl_cb(cpl_cb_arg, 0);
50 	return 0;
51 }
52 
53 struct ut_bdev {
54 	struct spdk_bdev	bdev;
55 	void			*io_target;
56 };
57 
58 struct ut_bdev_channel {
59 	TAILQ_HEAD(, spdk_bdev_io)	outstanding_io;
60 	uint32_t			outstanding_cnt;
61 	uint32_t			avail_cnt;
62 };
63 
64 int g_io_device;
65 struct ut_bdev g_bdev;
66 struct spdk_bdev_desc *g_desc;
67 bool g_teardown_done = false;
68 bool g_get_io_channel = true;
69 bool g_create_ch = true;
70 bool g_init_complete_called = false;
71 bool g_fini_start_called = true;
72 int g_status = 0;
73 int g_count = 0;
74 struct spdk_histogram_data *g_histogram = NULL;
75 
76 static int
77 stub_create_ch(void *io_device, void *ctx_buf)
78 {
79 	struct ut_bdev_channel *ch = ctx_buf;
80 
81 	if (g_create_ch == false) {
82 		return -1;
83 	}
84 
85 	TAILQ_INIT(&ch->outstanding_io);
86 	ch->outstanding_cnt = 0;
87 	/*
88 	 * When avail gets to 0, the submit_request function will return ENOMEM.
89 	 *  Most tests to not want ENOMEM to occur, so by default set this to a
90 	 *  big value that won't get hit.  The ENOMEM tests can then override this
91 	 *  value to something much smaller to induce ENOMEM conditions.
92 	 */
93 	ch->avail_cnt = 2048;
94 	return 0;
95 }
96 
97 static void
98 stub_destroy_ch(void *io_device, void *ctx_buf)
99 {
100 }
101 
102 static struct spdk_io_channel *
103 stub_get_io_channel(void *ctx)
104 {
105 	struct ut_bdev *ut_bdev = ctx;
106 
107 	if (g_get_io_channel == true) {
108 		return spdk_get_io_channel(ut_bdev->io_target);
109 	} else {
110 		return NULL;
111 	}
112 }
113 
114 static int
115 stub_destruct(void *ctx)
116 {
117 	return 0;
118 }
119 
120 static void
121 stub_submit_request(struct spdk_io_channel *_ch, struct spdk_bdev_io *bdev_io)
122 {
123 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
124 	struct spdk_bdev_io *io;
125 
126 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
127 		while (!TAILQ_EMPTY(&ch->outstanding_io)) {
128 			io = TAILQ_FIRST(&ch->outstanding_io);
129 			TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
130 			ch->outstanding_cnt--;
131 			spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
132 			ch->avail_cnt++;
133 		}
134 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) {
135 		TAILQ_FOREACH(io, &ch->outstanding_io, module_link) {
136 			if (io == bdev_io->u.abort.bio_to_abort) {
137 				TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
138 				ch->outstanding_cnt--;
139 				spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_ABORTED);
140 				ch->avail_cnt++;
141 
142 				spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
143 				return;
144 			}
145 		}
146 
147 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
148 		return;
149 	}
150 
151 	if (ch->avail_cnt > 0) {
152 		TAILQ_INSERT_TAIL(&ch->outstanding_io, bdev_io, module_link);
153 		ch->outstanding_cnt++;
154 		ch->avail_cnt--;
155 	} else {
156 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
157 	}
158 }
159 
160 static uint32_t
161 stub_complete_io(void *io_target, uint32_t num_to_complete)
162 {
163 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
164 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
165 	struct spdk_bdev_io *io;
166 	bool complete_all = (num_to_complete == 0);
167 	uint32_t num_completed = 0;
168 
169 	while (complete_all || num_completed < num_to_complete) {
170 		if (TAILQ_EMPTY(&ch->outstanding_io)) {
171 			break;
172 		}
173 		io = TAILQ_FIRST(&ch->outstanding_io);
174 		TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
175 		ch->outstanding_cnt--;
176 		spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_SUCCESS);
177 		ch->avail_cnt++;
178 		num_completed++;
179 	}
180 	spdk_put_io_channel(_ch);
181 	return num_completed;
182 }
183 
184 static bool
185 stub_io_type_supported(void *ctx, enum spdk_bdev_io_type type)
186 {
187 	return true;
188 }
189 
190 static struct spdk_bdev_fn_table fn_table = {
191 	.get_io_channel =	stub_get_io_channel,
192 	.destruct =		stub_destruct,
193 	.submit_request =	stub_submit_request,
194 	.io_type_supported =	stub_io_type_supported,
195 };
196 
197 struct spdk_bdev_module bdev_ut_if;
198 
199 static int
200 module_init(void)
201 {
202 	spdk_bdev_module_init_done(&bdev_ut_if);
203 	return 0;
204 }
205 
206 static void
207 module_fini(void)
208 {
209 }
210 
211 static void
212 init_complete(void)
213 {
214 	g_init_complete_called = true;
215 }
216 
217 static void
218 fini_start(void)
219 {
220 	g_fini_start_called = true;
221 }
222 
223 struct spdk_bdev_module bdev_ut_if = {
224 	.name = "bdev_ut",
225 	.module_init = module_init,
226 	.module_fini = module_fini,
227 	.async_init = true,
228 	.init_complete = init_complete,
229 	.fini_start = fini_start,
230 };
231 
232 SPDK_BDEV_MODULE_REGISTER(bdev_ut, &bdev_ut_if)
233 
234 static void
235 register_bdev(struct ut_bdev *ut_bdev, char *name, void *io_target)
236 {
237 	memset(ut_bdev, 0, sizeof(*ut_bdev));
238 
239 	ut_bdev->io_target = io_target;
240 	ut_bdev->bdev.ctxt = ut_bdev;
241 	ut_bdev->bdev.name = name;
242 	ut_bdev->bdev.fn_table = &fn_table;
243 	ut_bdev->bdev.module = &bdev_ut_if;
244 	ut_bdev->bdev.blocklen = 4096;
245 	ut_bdev->bdev.blockcnt = 1024;
246 
247 	spdk_bdev_register(&ut_bdev->bdev);
248 }
249 
250 static void
251 unregister_bdev(struct ut_bdev *ut_bdev)
252 {
253 	/* Handle any deferred messages. */
254 	poll_threads();
255 	spdk_bdev_unregister(&ut_bdev->bdev, NULL, NULL);
256 	/* Handle the async bdev unregister. */
257 	poll_threads();
258 }
259 
260 static void
261 bdev_init_cb(void *done, int rc)
262 {
263 	CU_ASSERT(rc == 0);
264 	*(bool *)done = true;
265 }
266 
267 static void
268 _bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
269 	       void *event_ctx)
270 {
271 	switch (type) {
272 	case SPDK_BDEV_EVENT_REMOVE:
273 		if (event_ctx != NULL) {
274 			*(bool *)event_ctx = true;
275 		}
276 		break;
277 	default:
278 		CU_ASSERT(false);
279 		break;
280 	}
281 }
282 
283 static void
284 setup_test(void)
285 {
286 	bool done = false;
287 	int rc;
288 
289 	allocate_cores(BDEV_UT_NUM_THREADS);
290 	allocate_threads(BDEV_UT_NUM_THREADS);
291 	set_thread(0);
292 
293 	rc = spdk_iobuf_initialize();
294 	CU_ASSERT(rc == 0);
295 	spdk_bdev_initialize(bdev_init_cb, &done);
296 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
297 				sizeof(struct ut_bdev_channel), NULL);
298 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
299 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
300 }
301 
302 static void
303 finish_cb(void *cb_arg)
304 {
305 	g_teardown_done = true;
306 }
307 
308 static void
309 teardown_test(void)
310 {
311 	set_thread(0);
312 	g_teardown_done = false;
313 	spdk_bdev_close(g_desc);
314 	g_desc = NULL;
315 	unregister_bdev(&g_bdev);
316 	spdk_io_device_unregister(&g_io_device, NULL);
317 	spdk_bdev_finish(finish_cb, NULL);
318 	spdk_iobuf_finish(finish_cb, NULL);
319 	poll_threads();
320 	memset(&g_bdev, 0, sizeof(g_bdev));
321 	CU_ASSERT(g_teardown_done == true);
322 	g_teardown_done = false;
323 	free_threads();
324 	free_cores();
325 }
326 
327 static uint32_t
328 bdev_io_tailq_cnt(bdev_io_tailq_t *tailq)
329 {
330 	struct spdk_bdev_io *io;
331 	uint32_t cnt = 0;
332 
333 	TAILQ_FOREACH(io, tailq, internal.link) {
334 		cnt++;
335 	}
336 
337 	return cnt;
338 }
339 
340 static void
341 basic(void)
342 {
343 	g_init_complete_called = false;
344 	setup_test();
345 	CU_ASSERT(g_init_complete_called == true);
346 
347 	set_thread(0);
348 
349 	g_get_io_channel = false;
350 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
351 	CU_ASSERT(g_ut_threads[0].ch == NULL);
352 
353 	g_get_io_channel = true;
354 	g_create_ch = false;
355 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
356 	CU_ASSERT(g_ut_threads[0].ch == NULL);
357 
358 	g_get_io_channel = true;
359 	g_create_ch = true;
360 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
361 	CU_ASSERT(g_ut_threads[0].ch != NULL);
362 	spdk_put_io_channel(g_ut_threads[0].ch);
363 
364 	g_fini_start_called = false;
365 	teardown_test();
366 	CU_ASSERT(g_fini_start_called == true);
367 }
368 
369 static void
370 _bdev_unregistered(void *done, int rc)
371 {
372 	CU_ASSERT(rc == 0);
373 	*(bool *)done = true;
374 }
375 
376 static void
377 unregister_and_close(void)
378 {
379 	bool done, remove_notify;
380 	struct spdk_bdev_desc *desc = NULL;
381 
382 	setup_test();
383 	set_thread(0);
384 
385 	/* setup_test() automatically opens the bdev,
386 	 * but this test needs to do that in a different
387 	 * way. */
388 	spdk_bdev_close(g_desc);
389 	poll_threads();
390 
391 	/* Try hotremoving a bdev with descriptors which don't provide
392 	 * any context to the notification callback */
393 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &desc);
394 	SPDK_CU_ASSERT_FATAL(desc != NULL);
395 
396 	/* There is an open descriptor on the device. Unregister it,
397 	 * which can't proceed until the descriptor is closed. */
398 	done = false;
399 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
400 
401 	/* Poll the threads to allow all events to be processed */
402 	poll_threads();
403 
404 	/* Make sure the bdev was not unregistered. We still have a
405 	 * descriptor open */
406 	CU_ASSERT(done == false);
407 
408 	spdk_bdev_close(desc);
409 	poll_threads();
410 	desc = NULL;
411 
412 	/* The unregister should have completed */
413 	CU_ASSERT(done == true);
414 
415 
416 	/* Register the bdev again */
417 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
418 
419 	remove_notify = false;
420 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, &remove_notify, &desc);
421 	SPDK_CU_ASSERT_FATAL(desc != NULL);
422 	CU_ASSERT(remove_notify == false);
423 
424 	/* There is an open descriptor on the device. Unregister it,
425 	 * which can't proceed until the descriptor is closed. */
426 	done = false;
427 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
428 	/* No polling has occurred, so neither of these should execute */
429 	CU_ASSERT(remove_notify == false);
430 	CU_ASSERT(done == false);
431 
432 	/* Prior to the unregister completing, close the descriptor */
433 	spdk_bdev_close(desc);
434 
435 	/* Poll the threads to allow all events to be processed */
436 	poll_threads();
437 
438 	/* Remove notify should not have been called because the
439 	 * descriptor is already closed. */
440 	CU_ASSERT(remove_notify == false);
441 
442 	/* The unregister should have completed */
443 	CU_ASSERT(done == true);
444 
445 	/* Restore the original g_bdev so that we can use teardown_test(). */
446 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
447 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
448 	teardown_test();
449 }
450 
451 static void
452 reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
453 {
454 	bool *done = cb_arg;
455 
456 	CU_ASSERT(success == true);
457 	*done = true;
458 	spdk_bdev_free_io(bdev_io);
459 }
460 
461 static void
462 put_channel_during_reset(void)
463 {
464 	struct spdk_io_channel *io_ch;
465 	bool done = false;
466 
467 	setup_test();
468 
469 	set_thread(0);
470 	io_ch = spdk_bdev_get_io_channel(g_desc);
471 	CU_ASSERT(io_ch != NULL);
472 
473 	/*
474 	 * Start a reset, but then put the I/O channel before
475 	 *  the deferred messages for the reset get a chance to
476 	 *  execute.
477 	 */
478 	spdk_bdev_reset(g_desc, io_ch, reset_done, &done);
479 	spdk_put_io_channel(io_ch);
480 	poll_threads();
481 	stub_complete_io(g_bdev.io_target, 0);
482 
483 	teardown_test();
484 }
485 
486 static void
487 aborted_reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
488 {
489 	enum spdk_bdev_io_status *status = cb_arg;
490 
491 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
492 	spdk_bdev_free_io(bdev_io);
493 }
494 
495 static void io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
496 
497 static void
498 aborted_reset(void)
499 {
500 	struct spdk_io_channel *io_ch[2];
501 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
502 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
503 
504 	setup_test();
505 
506 	set_thread(0);
507 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
508 	CU_ASSERT(io_ch[0] != NULL);
509 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
510 	poll_threads();
511 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
512 
513 	/*
514 	 * First reset has been submitted on ch0.  Now submit a second
515 	 *  reset on ch1 which will get queued since there is already a
516 	 *  reset in progress.
517 	 */
518 	set_thread(1);
519 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
520 	CU_ASSERT(io_ch[1] != NULL);
521 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
522 	poll_threads();
523 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
524 
525 	/*
526 	 * Now destroy ch1.  This will abort the queued reset.  Check that
527 	 *  the second reset was completed with failed status.  Also check
528 	 *  that bdev->internal.reset_in_progress != NULL, since the
529 	 *  original reset has not been completed yet.  This ensures that
530 	 *  the bdev code is correctly noticing that the failed reset is
531 	 *  *not* the one that had been submitted to the bdev module.
532 	 */
533 	set_thread(1);
534 	spdk_put_io_channel(io_ch[1]);
535 	poll_threads();
536 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_FAILED);
537 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
538 
539 	/*
540 	 * Now complete the first reset, verify that it completed with SUCCESS
541 	 *  status and that bdev->internal.reset_in_progress is also set back to NULL.
542 	 */
543 	set_thread(0);
544 	spdk_put_io_channel(io_ch[0]);
545 	stub_complete_io(g_bdev.io_target, 0);
546 	poll_threads();
547 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
548 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
549 
550 	teardown_test();
551 }
552 
553 static void
554 aborted_reset_no_outstanding_io(void)
555 {
556 	struct spdk_io_channel *io_ch[2];
557 	struct spdk_bdev_channel *bdev_ch[2];
558 	struct spdk_bdev *bdev[2];
559 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
560 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
561 
562 	setup_test();
563 
564 	/*
565 	 * This time we test the reset without any outstanding IO
566 	 * present on the bdev channel, so both resets should finish
567 	 * immediately.
568 	 */
569 
570 	set_thread(0);
571 	/* Set reset_io_drain_timeout to allow bdev
572 	 * reset to stay pending until we call abort. */
573 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
574 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
575 	bdev[0] = bdev_ch[0]->bdev;
576 	bdev[0]->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
577 	CU_ASSERT(io_ch[0] != NULL);
578 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
579 	poll_threads();
580 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
581 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
582 	spdk_put_io_channel(io_ch[0]);
583 
584 	set_thread(1);
585 	/* Set reset_io_drain_timeout to allow bdev
586 	 * reset to stay pending until we call abort. */
587 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
588 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
589 	bdev[1] = bdev_ch[1]->bdev;
590 	bdev[1]->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
591 	CU_ASSERT(io_ch[1] != NULL);
592 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
593 	poll_threads();
594 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
595 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
596 	spdk_put_io_channel(io_ch[1]);
597 
598 	stub_complete_io(g_bdev.io_target, 0);
599 	poll_threads();
600 
601 	teardown_test();
602 }
603 
604 
605 static void
606 io_during_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
607 {
608 	enum spdk_bdev_io_status *status = cb_arg;
609 
610 	*status = bdev_io->internal.status;
611 	spdk_bdev_free_io(bdev_io);
612 }
613 
614 static void
615 io_during_reset(void)
616 {
617 	struct spdk_io_channel *io_ch[2];
618 	struct spdk_bdev_channel *bdev_ch[2];
619 	enum spdk_bdev_io_status status0, status1, status_reset;
620 	int rc;
621 
622 	setup_test();
623 
624 	/*
625 	 * First test normal case - submit an I/O on each of two channels (with no resets)
626 	 *  and verify they complete successfully.
627 	 */
628 	set_thread(0);
629 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
630 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
631 	CU_ASSERT(bdev_ch[0]->flags == 0);
632 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
633 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
634 	CU_ASSERT(rc == 0);
635 
636 	set_thread(1);
637 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
638 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
639 	CU_ASSERT(bdev_ch[1]->flags == 0);
640 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
641 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
642 	CU_ASSERT(rc == 0);
643 
644 	poll_threads();
645 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
646 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
647 
648 	set_thread(0);
649 	stub_complete_io(g_bdev.io_target, 0);
650 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
651 
652 	set_thread(1);
653 	stub_complete_io(g_bdev.io_target, 0);
654 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
655 
656 	/*
657 	 * Now submit a reset, and leave it pending while we submit I/O on two different
658 	 *  channels.  These I/O should be failed by the bdev layer since the reset is in
659 	 *  progress.
660 	 */
661 	set_thread(0);
662 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
663 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &status_reset);
664 	CU_ASSERT(rc == 0);
665 
666 	CU_ASSERT(bdev_ch[0]->flags == 0);
667 	CU_ASSERT(bdev_ch[1]->flags == 0);
668 	poll_threads();
669 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_RESET_IN_PROGRESS);
670 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_RESET_IN_PROGRESS);
671 
672 	set_thread(0);
673 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
674 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
675 	CU_ASSERT(rc == 0);
676 
677 	set_thread(1);
678 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
679 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
680 	CU_ASSERT(rc == 0);
681 
682 	/*
683 	 * A reset is in progress so these read I/O should complete with aborted.  Note that we
684 	 *  need to poll_threads() since I/O completed inline have their completion deferred.
685 	 */
686 	poll_threads();
687 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
688 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
689 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
690 
691 	/*
692 	 * Complete the reset
693 	 */
694 	set_thread(0);
695 	stub_complete_io(g_bdev.io_target, 0);
696 
697 	/*
698 	 * Only poll thread 0. We should not get a completion.
699 	 */
700 	poll_thread(0);
701 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
702 
703 	/*
704 	 * Poll both thread 0 and 1 so the messages can propagate and we
705 	 * get a completion.
706 	 */
707 	poll_threads();
708 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
709 
710 	spdk_put_io_channel(io_ch[0]);
711 	set_thread(1);
712 	spdk_put_io_channel(io_ch[1]);
713 	poll_threads();
714 
715 	teardown_test();
716 }
717 
718 static uint32_t
719 count_queued_resets(void *io_target)
720 {
721 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
722 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
723 	struct spdk_bdev_io *io;
724 	uint32_t submitted_resets = 0;
725 
726 	TAILQ_FOREACH(io, &ch->outstanding_io, module_link) {
727 		if (io->type == SPDK_BDEV_IO_TYPE_RESET) {
728 			submitted_resets++;
729 		}
730 	}
731 
732 	spdk_put_io_channel(_ch);
733 
734 	return submitted_resets;
735 }
736 
737 static void
738 reset_completions(void)
739 {
740 	struct spdk_io_channel *io_ch;
741 	struct spdk_bdev_channel *bdev_ch;
742 	struct spdk_bdev *bdev;
743 	enum spdk_bdev_io_status status0, status_reset;
744 	int rc, iter;
745 
746 	setup_test();
747 
748 	/* This test covers four test cases:
749 	 * 1) reset_io_drain_timeout of a bdev is greater than 0
750 	 * 2) No outstandind IO are present on any bdev channel
751 	 * 3) Outstanding IO finish during bdev reset
752 	 * 4) Outstanding IO do not finish before reset is done waiting
753 	 *    for them.
754 	 *
755 	 * Above conditions mainly affect the timing of bdev reset completion
756 	 * and whether a reset should be skipped via spdk_bdev_io_complete()
757 	 * or sent down to the underlying bdev module via bdev_io_submit_reset(). */
758 
759 	/* Test preparation */
760 	set_thread(0);
761 	io_ch = spdk_bdev_get_io_channel(g_desc);
762 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
763 	CU_ASSERT(bdev_ch->flags == 0);
764 
765 
766 	/* Test case 1) reset_io_drain_timeout set to 0. Reset should be sent down immediately. */
767 	bdev = &g_bdev.bdev;
768 	bdev->reset_io_drain_timeout = 0;
769 
770 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
771 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
772 	CU_ASSERT(rc == 0);
773 	poll_threads();
774 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 1);
775 
776 	/* Call reset completion inside bdev module. */
777 	stub_complete_io(g_bdev.io_target, 0);
778 	poll_threads();
779 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
780 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
781 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
782 
783 
784 	/* Test case 2) no outstanding IO are present. Reset should perform one iteration over
785 	* channels and then be skipped. */
786 	bdev->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
787 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
788 
789 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
790 	CU_ASSERT(rc == 0);
791 	poll_threads();
792 	/* Reset was never submitted to the bdev module. */
793 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
794 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
795 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
796 
797 
798 	/* Test case 3) outstanding IO finish during bdev reset procedure. Reset should initiate
799 	* wait poller to check for IO completions every second, until reset_io_drain_timeout is
800 	* reached, but finish earlier than this threshold. */
801 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
802 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
803 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, io_during_io_done, &status0);
804 	CU_ASSERT(rc == 0);
805 
806 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
807 	CU_ASSERT(rc == 0);
808 	poll_threads();
809 	/* The reset just started and should not have been submitted yet. */
810 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
811 
812 	poll_threads();
813 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
814 	/* Let the poller wait for about half the time then complete outstanding IO. */
815 	for (iter = 0; iter < 2; iter++) {
816 		/* Reset is still processing and not submitted at this point. */
817 		CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
818 		spdk_delay_us(1000 * 1000);
819 		poll_threads();
820 		poll_threads();
821 	}
822 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
823 	stub_complete_io(g_bdev.io_target, 0);
824 	poll_threads();
825 	spdk_delay_us(BDEV_RESET_CHECK_OUTSTANDING_IO_PERIOD);
826 	poll_threads();
827 	poll_threads();
828 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
829 	/* Sending reset to the bdev module has been skipped. */
830 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
831 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
832 
833 
834 	/* Test case 4) outstanding IO are still present after reset_io_drain_timeout
835 	* seconds have passed. */
836 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
837 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
838 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, io_during_io_done, &status0);
839 	CU_ASSERT(rc == 0);
840 
841 	rc = spdk_bdev_reset(g_desc, io_ch, io_during_io_done, &status_reset);
842 	CU_ASSERT(rc == 0);
843 	poll_threads();
844 	/* The reset just started and should not have been submitted yet. */
845 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
846 
847 	poll_threads();
848 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
849 	/* Let the poller wait for reset_io_drain_timeout seconds. */
850 	for (iter = 0; iter < bdev->reset_io_drain_timeout; iter++) {
851 		CU_ASSERT(count_queued_resets(g_bdev.io_target) == 0);
852 		spdk_delay_us(BDEV_RESET_CHECK_OUTSTANDING_IO_PERIOD);
853 		poll_threads();
854 		poll_threads();
855 	}
856 
857 	/* After timing out, the reset should have been sent to the module. */
858 	CU_ASSERT(count_queued_resets(g_bdev.io_target) == 1);
859 	/* Complete reset submitted to the module and the read IO. */
860 	stub_complete_io(g_bdev.io_target, 0);
861 	poll_threads();
862 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
863 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
864 
865 
866 	/* Destroy the channel and end the test. */
867 	spdk_put_io_channel(io_ch);
868 	poll_threads();
869 
870 	teardown_test();
871 }
872 
873 
874 static void
875 basic_qos(void)
876 {
877 	struct spdk_io_channel *io_ch[2];
878 	struct spdk_bdev_channel *bdev_ch[2];
879 	struct spdk_bdev *bdev;
880 	enum spdk_bdev_io_status status, abort_status;
881 	int rc;
882 
883 	setup_test();
884 
885 	/* Enable QoS */
886 	bdev = &g_bdev.bdev;
887 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
888 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
889 	TAILQ_INIT(&bdev->internal.qos->queued);
890 	/*
891 	 * Enable read/write IOPS, read only byte per second and
892 	 * read/write byte per second rate limits.
893 	 * In this case, all rate limits will take equal effect.
894 	 */
895 	/* 2000 read/write I/O per second, or 2 per millisecond */
896 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
897 	/* 8K read/write byte per millisecond with 4K block size */
898 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
899 	/* 8K read only byte per millisecond with 4K block size */
900 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 8192000;
901 
902 	g_get_io_channel = true;
903 
904 	set_thread(0);
905 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
906 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
907 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
908 
909 	set_thread(1);
910 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
911 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
912 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
913 
914 	/*
915 	 * Send an I/O on thread 0, which is where the QoS thread is running.
916 	 */
917 	set_thread(0);
918 	status = SPDK_BDEV_IO_STATUS_PENDING;
919 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
920 	CU_ASSERT(rc == 0);
921 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
922 	poll_threads();
923 	stub_complete_io(g_bdev.io_target, 0);
924 	poll_threads();
925 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
926 
927 	/* Send an I/O on thread 1. The QoS thread is not running here. */
928 	status = SPDK_BDEV_IO_STATUS_PENDING;
929 	set_thread(1);
930 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
931 	CU_ASSERT(rc == 0);
932 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
933 	poll_threads();
934 	/* Complete I/O on thread 0. This should not complete the I/O we submitted. */
935 	set_thread(0);
936 	stub_complete_io(g_bdev.io_target, 0);
937 	poll_threads();
938 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
939 	/* Now complete I/O on original thread 1. */
940 	set_thread(1);
941 	poll_threads();
942 	stub_complete_io(g_bdev.io_target, 0);
943 	poll_threads();
944 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
945 
946 	/* Reset rate limit for the next test cases. */
947 	spdk_delay_us(SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
948 	poll_threads();
949 
950 	/*
951 	 * Test abort request when QoS is enabled.
952 	 */
953 
954 	/* Send an I/O on thread 0, which is where the QoS thread is running. */
955 	set_thread(0);
956 	status = SPDK_BDEV_IO_STATUS_PENDING;
957 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
958 	CU_ASSERT(rc == 0);
959 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
960 	/* Send an abort to the I/O on the same thread. */
961 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
962 	rc = spdk_bdev_abort(g_desc, io_ch[0], &status, io_during_io_done, &abort_status);
963 	CU_ASSERT(rc == 0);
964 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
965 	poll_threads();
966 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
967 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
968 
969 	/* Send an I/O on thread 1. The QoS thread is not running here. */
970 	status = SPDK_BDEV_IO_STATUS_PENDING;
971 	set_thread(1);
972 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
973 	CU_ASSERT(rc == 0);
974 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
975 	poll_threads();
976 	/* Send an abort to the I/O on the same thread. */
977 	abort_status = SPDK_BDEV_IO_STATUS_PENDING;
978 	rc = spdk_bdev_abort(g_desc, io_ch[1], &status, io_during_io_done, &abort_status);
979 	CU_ASSERT(rc == 0);
980 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_PENDING);
981 	poll_threads();
982 	/* Complete the I/O with failure and the abort with success on thread 1. */
983 	CU_ASSERT(abort_status == SPDK_BDEV_IO_STATUS_SUCCESS);
984 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_ABORTED);
985 
986 	set_thread(0);
987 
988 	/*
989 	 * Close the descriptor only, which should stop the qos channel as
990 	 * the last descriptor removed.
991 	 */
992 	spdk_bdev_close(g_desc);
993 	poll_threads();
994 	CU_ASSERT(bdev->internal.qos->ch == NULL);
995 
996 	/*
997 	 * Open the bdev again which shall setup the qos channel as the
998 	 * channels are valid.
999 	 */
1000 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
1001 	poll_threads();
1002 	CU_ASSERT(bdev->internal.qos->ch != NULL);
1003 
1004 	/* Tear down the channels */
1005 	set_thread(0);
1006 	spdk_put_io_channel(io_ch[0]);
1007 	set_thread(1);
1008 	spdk_put_io_channel(io_ch[1]);
1009 	poll_threads();
1010 	set_thread(0);
1011 
1012 	/* Close the descriptor, which should stop the qos channel */
1013 	spdk_bdev_close(g_desc);
1014 	poll_threads();
1015 	CU_ASSERT(bdev->internal.qos->ch == NULL);
1016 
1017 	/* Open the bdev again, no qos channel setup without valid channels. */
1018 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
1019 	poll_threads();
1020 	CU_ASSERT(bdev->internal.qos->ch == NULL);
1021 
1022 	/* Create the channels in reverse order. */
1023 	set_thread(1);
1024 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1025 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1026 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
1027 
1028 	set_thread(0);
1029 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1030 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1031 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
1032 
1033 	/* Confirm that the qos thread is now thread 1 */
1034 	CU_ASSERT(bdev->internal.qos->ch == bdev_ch[1]);
1035 
1036 	/* Tear down the channels */
1037 	set_thread(0);
1038 	spdk_put_io_channel(io_ch[0]);
1039 	set_thread(1);
1040 	spdk_put_io_channel(io_ch[1]);
1041 	poll_threads();
1042 
1043 	set_thread(0);
1044 
1045 	teardown_test();
1046 }
1047 
1048 static void
1049 io_during_qos_queue(void)
1050 {
1051 	struct spdk_io_channel *io_ch[2];
1052 	struct spdk_bdev_channel *bdev_ch[2];
1053 	struct spdk_bdev *bdev;
1054 	enum spdk_bdev_io_status status0, status1, status2;
1055 	int rc;
1056 
1057 	setup_test();
1058 	MOCK_SET(spdk_get_ticks, 0);
1059 
1060 	/* Enable QoS */
1061 	bdev = &g_bdev.bdev;
1062 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
1063 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
1064 	TAILQ_INIT(&bdev->internal.qos->queued);
1065 	/*
1066 	 * Enable read/write IOPS, read only byte per sec, write only
1067 	 * byte per sec and read/write byte per sec rate limits.
1068 	 * In this case, both read only and write only byte per sec
1069 	 * rate limit will take effect.
1070 	 */
1071 	/* 4000 read/write I/O per second, or 4 per millisecond */
1072 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 4000;
1073 	/* 8K byte per millisecond with 4K block size */
1074 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
1075 	/* 4K byte per millisecond with 4K block size */
1076 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 4096000;
1077 	/* 4K byte per millisecond with 4K block size */
1078 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 4096000;
1079 
1080 	g_get_io_channel = true;
1081 
1082 	/* Create channels */
1083 	set_thread(0);
1084 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1085 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1086 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
1087 
1088 	set_thread(1);
1089 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1090 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1091 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
1092 
1093 	/* Send two read I/Os */
1094 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
1095 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
1096 	CU_ASSERT(rc == 0);
1097 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
1098 	set_thread(0);
1099 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
1100 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
1101 	CU_ASSERT(rc == 0);
1102 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
1103 	/* Send one write I/O */
1104 	status2 = SPDK_BDEV_IO_STATUS_PENDING;
1105 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status2);
1106 	CU_ASSERT(rc == 0);
1107 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_PENDING);
1108 
1109 	/* Complete any I/O that arrived at the disk */
1110 	poll_threads();
1111 	set_thread(1);
1112 	stub_complete_io(g_bdev.io_target, 0);
1113 	set_thread(0);
1114 	stub_complete_io(g_bdev.io_target, 0);
1115 	poll_threads();
1116 
1117 	/* Only one of the two read I/Os should complete. (logical XOR) */
1118 	if (status0 == SPDK_BDEV_IO_STATUS_SUCCESS) {
1119 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
1120 	} else {
1121 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
1122 	}
1123 	/* The write I/O should complete. */
1124 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
1125 
1126 	/* Advance in time by a millisecond */
1127 	spdk_delay_us(1000);
1128 
1129 	/* Complete more I/O */
1130 	poll_threads();
1131 	set_thread(1);
1132 	stub_complete_io(g_bdev.io_target, 0);
1133 	set_thread(0);
1134 	stub_complete_io(g_bdev.io_target, 0);
1135 	poll_threads();
1136 
1137 	/* Now the second read I/O should be done */
1138 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
1139 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
1140 
1141 	/* Tear down the channels */
1142 	set_thread(1);
1143 	spdk_put_io_channel(io_ch[1]);
1144 	set_thread(0);
1145 	spdk_put_io_channel(io_ch[0]);
1146 	poll_threads();
1147 
1148 	teardown_test();
1149 }
1150 
1151 static void
1152 io_during_qos_reset(void)
1153 {
1154 	struct spdk_io_channel *io_ch[2];
1155 	struct spdk_bdev_channel *bdev_ch[2];
1156 	struct spdk_bdev *bdev;
1157 	enum spdk_bdev_io_status status0, status1, reset_status;
1158 	int rc;
1159 
1160 	setup_test();
1161 	MOCK_SET(spdk_get_ticks, 0);
1162 
1163 	/* Enable QoS */
1164 	bdev = &g_bdev.bdev;
1165 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
1166 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
1167 	TAILQ_INIT(&bdev->internal.qos->queued);
1168 	/*
1169 	 * Enable read/write IOPS, write only byte per sec and
1170 	 * read/write byte per second rate limits.
1171 	 * In this case, read/write byte per second rate limit will
1172 	 * take effect first.
1173 	 */
1174 	/* 2000 read/write I/O per second, or 2 per millisecond */
1175 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
1176 	/* 4K byte per millisecond with 4K block size */
1177 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 4096000;
1178 	/* 8K byte per millisecond with 4K block size */
1179 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 8192000;
1180 
1181 	g_get_io_channel = true;
1182 
1183 	/* Create channels */
1184 	set_thread(0);
1185 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1186 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1187 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
1188 
1189 	set_thread(1);
1190 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1191 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1192 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
1193 
1194 	/* Send two I/O. One of these gets queued by QoS. The other is sitting at the disk. */
1195 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
1196 	rc = spdk_bdev_write_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
1197 	CU_ASSERT(rc == 0);
1198 	set_thread(0);
1199 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
1200 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
1201 	CU_ASSERT(rc == 0);
1202 
1203 	poll_threads();
1204 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
1205 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
1206 
1207 	/* Reset the bdev. */
1208 	reset_status = SPDK_BDEV_IO_STATUS_PENDING;
1209 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &reset_status);
1210 	CU_ASSERT(rc == 0);
1211 
1212 	/* Complete any I/O that arrived at the disk */
1213 	poll_threads();
1214 	set_thread(1);
1215 	stub_complete_io(g_bdev.io_target, 0);
1216 	set_thread(0);
1217 	stub_complete_io(g_bdev.io_target, 0);
1218 	poll_threads();
1219 
1220 	CU_ASSERT(reset_status == SPDK_BDEV_IO_STATUS_SUCCESS);
1221 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_ABORTED);
1222 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_ABORTED);
1223 
1224 	/* Tear down the channels */
1225 	set_thread(1);
1226 	spdk_put_io_channel(io_ch[1]);
1227 	set_thread(0);
1228 	spdk_put_io_channel(io_ch[0]);
1229 	poll_threads();
1230 
1231 	teardown_test();
1232 }
1233 
1234 static void
1235 enomem_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1236 {
1237 	enum spdk_bdev_io_status *status = cb_arg;
1238 
1239 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1240 	spdk_bdev_free_io(bdev_io);
1241 }
1242 
1243 static void
1244 enomem(void)
1245 {
1246 	struct spdk_io_channel *io_ch;
1247 	struct spdk_bdev_channel *bdev_ch;
1248 	struct spdk_bdev_shared_resource *shared_resource;
1249 	struct ut_bdev_channel *ut_ch;
1250 	const uint32_t IO_ARRAY_SIZE = 64;
1251 	const uint32_t AVAIL = 20;
1252 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE], status_reset;
1253 	uint32_t nomem_cnt, i;
1254 	struct spdk_bdev_io *first_io;
1255 	int rc;
1256 
1257 	setup_test();
1258 
1259 	set_thread(0);
1260 	io_ch = spdk_bdev_get_io_channel(g_desc);
1261 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1262 	shared_resource = bdev_ch->shared_resource;
1263 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1264 	ut_ch->avail_cnt = AVAIL;
1265 
1266 	/* First submit a number of IOs equal to what the channel can support. */
1267 	for (i = 0; i < AVAIL; i++) {
1268 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1269 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1270 		CU_ASSERT(rc == 0);
1271 	}
1272 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1273 
1274 	/*
1275 	 * Next, submit one additional I/O.  This one should fail with ENOMEM and then go onto
1276 	 *  the enomem_io list.
1277 	 */
1278 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1279 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1280 	CU_ASSERT(rc == 0);
1281 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1282 	first_io = TAILQ_FIRST(&shared_resource->nomem_io);
1283 
1284 	/*
1285 	 * Now submit a bunch more I/O.  These should all fail with ENOMEM and get queued behind
1286 	 *  the first_io above.
1287 	 */
1288 	for (i = AVAIL + 1; i < IO_ARRAY_SIZE; i++) {
1289 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1290 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1291 		CU_ASSERT(rc == 0);
1292 	}
1293 
1294 	/* Assert that first_io is still at the head of the list. */
1295 	CU_ASSERT(TAILQ_FIRST(&shared_resource->nomem_io) == first_io);
1296 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == (IO_ARRAY_SIZE - AVAIL));
1297 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1298 	CU_ASSERT(shared_resource->nomem_threshold == (AVAIL - NOMEM_THRESHOLD_COUNT));
1299 
1300 	/*
1301 	 * Complete 1 I/O only.  The key check here is bdev_io_tailq_cnt - this should not have
1302 	 *  changed since completing just 1 I/O should not trigger retrying the queued nomem_io
1303 	 *  list.
1304 	 */
1305 	stub_complete_io(g_bdev.io_target, 1);
1306 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1307 
1308 	/*
1309 	 * Complete enough I/O to hit the nomem_threshold.  This should trigger retrying nomem_io,
1310 	 *  and we should see I/O get resubmitted to the test bdev module.
1311 	 */
1312 	stub_complete_io(g_bdev.io_target, NOMEM_THRESHOLD_COUNT - 1);
1313 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) < nomem_cnt);
1314 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1315 
1316 	/* Complete 1 I/O only.  This should not trigger retrying the queued nomem_io. */
1317 	stub_complete_io(g_bdev.io_target, 1);
1318 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1319 
1320 	/*
1321 	 * Send a reset and confirm that all I/O are completed, including the ones that
1322 	 *  were queued on the nomem_io list.
1323 	 */
1324 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
1325 	rc = spdk_bdev_reset(g_desc, io_ch, enomem_done, &status_reset);
1326 	poll_threads();
1327 	CU_ASSERT(rc == 0);
1328 	/* This will complete the reset. */
1329 	stub_complete_io(g_bdev.io_target, 0);
1330 
1331 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == 0);
1332 	CU_ASSERT(shared_resource->io_outstanding == 0);
1333 
1334 	spdk_put_io_channel(io_ch);
1335 	poll_threads();
1336 	teardown_test();
1337 }
1338 
1339 static void
1340 enomem_multi_bdev(void)
1341 {
1342 	struct spdk_io_channel *io_ch;
1343 	struct spdk_bdev_channel *bdev_ch;
1344 	struct spdk_bdev_shared_resource *shared_resource;
1345 	struct ut_bdev_channel *ut_ch;
1346 	const uint32_t IO_ARRAY_SIZE = 64;
1347 	const uint32_t AVAIL = 20;
1348 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1349 	uint32_t i;
1350 	struct ut_bdev *second_bdev;
1351 	struct spdk_bdev_desc *second_desc = NULL;
1352 	struct spdk_bdev_channel *second_bdev_ch;
1353 	struct spdk_io_channel *second_ch;
1354 	int rc;
1355 
1356 	setup_test();
1357 
1358 	/* Register second bdev with the same io_target  */
1359 	second_bdev = calloc(1, sizeof(*second_bdev));
1360 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1361 	register_bdev(second_bdev, "ut_bdev2", g_bdev.io_target);
1362 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1363 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1364 
1365 	set_thread(0);
1366 	io_ch = spdk_bdev_get_io_channel(g_desc);
1367 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1368 	shared_resource = bdev_ch->shared_resource;
1369 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1370 	ut_ch->avail_cnt = AVAIL;
1371 
1372 	second_ch = spdk_bdev_get_io_channel(second_desc);
1373 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1374 	SPDK_CU_ASSERT_FATAL(shared_resource == second_bdev_ch->shared_resource);
1375 
1376 	/* Saturate io_target through bdev A. */
1377 	for (i = 0; i < AVAIL; i++) {
1378 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1379 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1380 		CU_ASSERT(rc == 0);
1381 	}
1382 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1383 
1384 	/*
1385 	 * Now submit I/O through the second bdev. This should fail with ENOMEM
1386 	 * and then go onto the nomem_io list.
1387 	 */
1388 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1389 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1390 	CU_ASSERT(rc == 0);
1391 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1392 
1393 	/* Complete first bdev's I/O. This should retry sending second bdev's nomem_io */
1394 	stub_complete_io(g_bdev.io_target, AVAIL);
1395 
1396 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1397 	CU_ASSERT(shared_resource->io_outstanding == 1);
1398 
1399 	/* Now complete our retried I/O  */
1400 	stub_complete_io(g_bdev.io_target, 1);
1401 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1402 
1403 	spdk_put_io_channel(io_ch);
1404 	spdk_put_io_channel(second_ch);
1405 	spdk_bdev_close(second_desc);
1406 	unregister_bdev(second_bdev);
1407 	poll_threads();
1408 	free(second_bdev);
1409 	teardown_test();
1410 }
1411 
1412 static void
1413 enomem_multi_bdev_unregister(void)
1414 {
1415 	struct spdk_io_channel *io_ch;
1416 	struct spdk_bdev_channel *bdev_ch;
1417 	struct spdk_bdev_shared_resource *shared_resource;
1418 	struct ut_bdev_channel *ut_ch;
1419 	const uint32_t IO_ARRAY_SIZE = 64;
1420 	const uint32_t AVAIL = 20;
1421 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1422 	uint32_t i;
1423 	int rc;
1424 
1425 	setup_test();
1426 
1427 	set_thread(0);
1428 	io_ch = spdk_bdev_get_io_channel(g_desc);
1429 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1430 	shared_resource = bdev_ch->shared_resource;
1431 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1432 	ut_ch->avail_cnt = AVAIL;
1433 
1434 	/* Saturate io_target through the bdev. */
1435 	for (i = 0; i < AVAIL; i++) {
1436 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1437 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1438 		CU_ASSERT(rc == 0);
1439 	}
1440 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1441 
1442 	/*
1443 	 * Now submit I/O through the bdev. This should fail with ENOMEM
1444 	 * and then go onto the nomem_io list.
1445 	 */
1446 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1447 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1448 	CU_ASSERT(rc == 0);
1449 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1450 
1451 	/* Unregister the bdev to abort the IOs from nomem_io queue. */
1452 	unregister_bdev(&g_bdev);
1453 	CU_ASSERT(status[AVAIL] == SPDK_BDEV_IO_STATUS_FAILED);
1454 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1455 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == AVAIL);
1456 
1457 	/* Complete the bdev's I/O. */
1458 	stub_complete_io(g_bdev.io_target, AVAIL);
1459 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1460 
1461 	spdk_put_io_channel(io_ch);
1462 	poll_threads();
1463 	teardown_test();
1464 }
1465 
1466 static void
1467 enomem_multi_io_target(void)
1468 {
1469 	struct spdk_io_channel *io_ch;
1470 	struct spdk_bdev_channel *bdev_ch;
1471 	struct ut_bdev_channel *ut_ch;
1472 	const uint32_t IO_ARRAY_SIZE = 64;
1473 	const uint32_t AVAIL = 20;
1474 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1475 	uint32_t i;
1476 	int new_io_device;
1477 	struct ut_bdev *second_bdev;
1478 	struct spdk_bdev_desc *second_desc = NULL;
1479 	struct spdk_bdev_channel *second_bdev_ch;
1480 	struct spdk_io_channel *second_ch;
1481 	int rc;
1482 
1483 	setup_test();
1484 
1485 	/* Create new io_target and a second bdev using it */
1486 	spdk_io_device_register(&new_io_device, stub_create_ch, stub_destroy_ch,
1487 				sizeof(struct ut_bdev_channel), NULL);
1488 	second_bdev = calloc(1, sizeof(*second_bdev));
1489 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1490 	register_bdev(second_bdev, "ut_bdev2", &new_io_device);
1491 	spdk_bdev_open_ext("ut_bdev2", true, _bdev_event_cb, NULL, &second_desc);
1492 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1493 
1494 	set_thread(0);
1495 	io_ch = spdk_bdev_get_io_channel(g_desc);
1496 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1497 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1498 	ut_ch->avail_cnt = AVAIL;
1499 
1500 	/* Different io_target should imply a different shared_resource */
1501 	second_ch = spdk_bdev_get_io_channel(second_desc);
1502 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1503 	SPDK_CU_ASSERT_FATAL(bdev_ch->shared_resource != second_bdev_ch->shared_resource);
1504 
1505 	/* Saturate io_target through bdev A. */
1506 	for (i = 0; i < AVAIL; i++) {
1507 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1508 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1509 		CU_ASSERT(rc == 0);
1510 	}
1511 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1512 
1513 	/* Issue one more I/O to fill ENOMEM list. */
1514 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1515 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1516 	CU_ASSERT(rc == 0);
1517 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1518 
1519 	/*
1520 	 * Now submit I/O through the second bdev. This should go through and complete
1521 	 * successfully because we're using a different io_device underneath.
1522 	 */
1523 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1524 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1525 	CU_ASSERT(rc == 0);
1526 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&second_bdev_ch->shared_resource->nomem_io));
1527 	stub_complete_io(second_bdev->io_target, 1);
1528 
1529 	/* Cleanup; Complete outstanding I/O. */
1530 	stub_complete_io(g_bdev.io_target, AVAIL);
1531 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1532 	/* Complete the ENOMEM I/O */
1533 	stub_complete_io(g_bdev.io_target, 1);
1534 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1535 
1536 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1537 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1538 	spdk_put_io_channel(io_ch);
1539 	spdk_put_io_channel(second_ch);
1540 	spdk_bdev_close(second_desc);
1541 	unregister_bdev(second_bdev);
1542 	spdk_io_device_unregister(&new_io_device, NULL);
1543 	poll_threads();
1544 	free(second_bdev);
1545 	teardown_test();
1546 }
1547 
1548 static void
1549 qos_dynamic_enable_done(void *cb_arg, int status)
1550 {
1551 	int *rc = cb_arg;
1552 	*rc = status;
1553 }
1554 
1555 static void
1556 qos_dynamic_enable(void)
1557 {
1558 	struct spdk_io_channel *io_ch[2];
1559 	struct spdk_bdev_channel *bdev_ch[2];
1560 	struct spdk_bdev *bdev;
1561 	enum spdk_bdev_io_status bdev_io_status[2];
1562 	uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
1563 	int status, second_status, rc, i;
1564 
1565 	setup_test();
1566 	MOCK_SET(spdk_get_ticks, 0);
1567 
1568 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1569 		limits[i] = UINT64_MAX;
1570 	}
1571 
1572 	bdev = &g_bdev.bdev;
1573 
1574 	g_get_io_channel = true;
1575 
1576 	/* Create channels */
1577 	set_thread(0);
1578 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1579 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1580 	CU_ASSERT(bdev_ch[0]->flags == 0);
1581 
1582 	set_thread(1);
1583 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1584 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1585 	CU_ASSERT(bdev_ch[1]->flags == 0);
1586 
1587 	set_thread(0);
1588 
1589 	/*
1590 	 * Enable QoS: Read/Write IOPS, Read/Write byte,
1591 	 * Read only byte and Write only byte per second
1592 	 * rate limits.
1593 	 * More than 10 I/Os allowed per timeslice.
1594 	 */
1595 	status = -1;
1596 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1597 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 100;
1598 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 100;
1599 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 10;
1600 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1601 	poll_threads();
1602 	CU_ASSERT(status == 0);
1603 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1604 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1605 
1606 	/*
1607 	 * Submit and complete 10 I/O to fill the QoS allotment for this timeslice.
1608 	 * Additional I/O will then be queued.
1609 	 */
1610 	set_thread(0);
1611 	for (i = 0; i < 10; i++) {
1612 		bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1613 		rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1614 		CU_ASSERT(rc == 0);
1615 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1616 		poll_thread(0);
1617 		stub_complete_io(g_bdev.io_target, 0);
1618 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1619 	}
1620 
1621 	/*
1622 	 * Send two more I/O.  These I/O will be queued since the current timeslice allotment has been
1623 	 * filled already.  We want to test that when QoS is disabled that these two I/O:
1624 	 *  1) are not aborted
1625 	 *  2) are sent back to their original thread for resubmission
1626 	 */
1627 	bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1628 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1629 	CU_ASSERT(rc == 0);
1630 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1631 	set_thread(1);
1632 	bdev_io_status[1] = SPDK_BDEV_IO_STATUS_PENDING;
1633 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &bdev_io_status[1]);
1634 	CU_ASSERT(rc == 0);
1635 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1636 	poll_threads();
1637 
1638 	/*
1639 	 * Disable QoS: Read/Write IOPS, Read/Write byte,
1640 	 * Read only byte rate limits
1641 	 */
1642 	status = -1;
1643 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1644 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 0;
1645 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 0;
1646 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1647 	poll_threads();
1648 	CU_ASSERT(status == 0);
1649 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1650 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1651 
1652 	/* Disable QoS: Write only Byte per second rate limit */
1653 	status = -1;
1654 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 0;
1655 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1656 	poll_threads();
1657 	CU_ASSERT(status == 0);
1658 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1659 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1660 
1661 	/*
1662 	 * All I/O should have been resubmitted back on their original thread.  Complete
1663 	 *  all I/O on thread 0, and ensure that only the thread 0 I/O was completed.
1664 	 */
1665 	set_thread(0);
1666 	stub_complete_io(g_bdev.io_target, 0);
1667 	poll_threads();
1668 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1669 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1670 
1671 	/* Now complete all I/O on thread 1 and ensure the thread 1 I/O was completed. */
1672 	set_thread(1);
1673 	stub_complete_io(g_bdev.io_target, 0);
1674 	poll_threads();
1675 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_SUCCESS);
1676 
1677 	/* Disable QoS again */
1678 	status = -1;
1679 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1680 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1681 	poll_threads();
1682 	CU_ASSERT(status == 0); /* This should succeed */
1683 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1684 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1685 
1686 	/* Enable QoS on thread 0 */
1687 	status = -1;
1688 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1689 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1690 	poll_threads();
1691 	CU_ASSERT(status == 0);
1692 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1693 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1694 
1695 	/* Disable QoS on thread 1 */
1696 	set_thread(1);
1697 	status = -1;
1698 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1699 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1700 	/* Don't poll yet. This should leave the channels with QoS enabled */
1701 	CU_ASSERT(status == -1);
1702 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1703 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1704 
1705 	/* Enable QoS. This should immediately fail because the previous disable QoS hasn't completed. */
1706 	second_status = 0;
1707 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 10;
1708 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &second_status);
1709 	poll_threads();
1710 	CU_ASSERT(status == 0); /* The disable should succeed */
1711 	CU_ASSERT(second_status < 0); /* The enable should fail */
1712 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1713 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1714 
1715 	/* Enable QoS on thread 1. This should succeed now that the disable has completed. */
1716 	status = -1;
1717 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1718 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1719 	poll_threads();
1720 	CU_ASSERT(status == 0);
1721 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1722 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1723 
1724 	/* Tear down the channels */
1725 	set_thread(0);
1726 	spdk_put_io_channel(io_ch[0]);
1727 	set_thread(1);
1728 	spdk_put_io_channel(io_ch[1]);
1729 	poll_threads();
1730 
1731 	set_thread(0);
1732 	teardown_test();
1733 }
1734 
1735 static void
1736 histogram_status_cb(void *cb_arg, int status)
1737 {
1738 	g_status = status;
1739 }
1740 
1741 static void
1742 histogram_data_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
1743 {
1744 	g_status = status;
1745 	g_histogram = histogram;
1746 }
1747 
1748 static void
1749 histogram_io_count(void *ctx, uint64_t start, uint64_t end, uint64_t count,
1750 		   uint64_t total, uint64_t so_far)
1751 {
1752 	g_count += count;
1753 }
1754 
1755 static void
1756 bdev_histograms_mt(void)
1757 {
1758 	struct spdk_io_channel *ch[2];
1759 	struct spdk_histogram_data *histogram;
1760 	uint8_t buf[4096];
1761 	int status = false;
1762 	int rc;
1763 
1764 
1765 	setup_test();
1766 
1767 	set_thread(0);
1768 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1769 	CU_ASSERT(ch[0] != NULL);
1770 
1771 	set_thread(1);
1772 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1773 	CU_ASSERT(ch[1] != NULL);
1774 
1775 
1776 	/* Enable histogram */
1777 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, true);
1778 	poll_threads();
1779 	CU_ASSERT(g_status == 0);
1780 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1781 
1782 	/* Allocate histogram */
1783 	histogram = spdk_histogram_data_alloc();
1784 
1785 	/* Check if histogram is zeroed */
1786 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1787 	poll_threads();
1788 	CU_ASSERT(g_status == 0);
1789 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1790 
1791 	g_count = 0;
1792 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1793 
1794 	CU_ASSERT(g_count == 0);
1795 
1796 	set_thread(0);
1797 	rc = spdk_bdev_write_blocks(g_desc, ch[0], &buf, 0, 1, io_during_io_done, &status);
1798 	CU_ASSERT(rc == 0);
1799 
1800 	spdk_delay_us(10);
1801 	stub_complete_io(g_bdev.io_target, 1);
1802 	poll_threads();
1803 	CU_ASSERT(status == true);
1804 
1805 
1806 	set_thread(1);
1807 	rc = spdk_bdev_read_blocks(g_desc, ch[1], &buf, 0, 1, io_during_io_done, &status);
1808 	CU_ASSERT(rc == 0);
1809 
1810 	spdk_delay_us(10);
1811 	stub_complete_io(g_bdev.io_target, 1);
1812 	poll_threads();
1813 	CU_ASSERT(status == true);
1814 
1815 	set_thread(0);
1816 
1817 	/* Check if histogram gathered data from all I/O channels */
1818 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1819 	poll_threads();
1820 	CU_ASSERT(g_status == 0);
1821 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1822 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1823 
1824 	g_count = 0;
1825 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1826 	CU_ASSERT(g_count == 2);
1827 
1828 	/* Disable histogram */
1829 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, false);
1830 	poll_threads();
1831 	CU_ASSERT(g_status == 0);
1832 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == false);
1833 
1834 	spdk_histogram_data_free(histogram);
1835 
1836 	/* Tear down the channels */
1837 	set_thread(0);
1838 	spdk_put_io_channel(ch[0]);
1839 	set_thread(1);
1840 	spdk_put_io_channel(ch[1]);
1841 	poll_threads();
1842 	set_thread(0);
1843 	teardown_test();
1844 
1845 }
1846 
1847 struct timeout_io_cb_arg {
1848 	struct iovec iov;
1849 	uint8_t type;
1850 };
1851 
1852 static int
1853 bdev_channel_count_submitted_io(struct spdk_bdev_channel *ch)
1854 {
1855 	struct spdk_bdev_io *bdev_io;
1856 	int n = 0;
1857 
1858 	if (!ch) {
1859 		return -1;
1860 	}
1861 
1862 	TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) {
1863 		n++;
1864 	}
1865 
1866 	return n;
1867 }
1868 
1869 static void
1870 bdev_channel_io_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1871 {
1872 	struct timeout_io_cb_arg *ctx = cb_arg;
1873 
1874 	ctx->type = bdev_io->type;
1875 	ctx->iov.iov_base = bdev_io->iov.iov_base;
1876 	ctx->iov.iov_len = bdev_io->iov.iov_len;
1877 }
1878 
1879 static bool g_io_done;
1880 
1881 static void
1882 io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1883 {
1884 	g_io_done = true;
1885 	spdk_bdev_free_io(bdev_io);
1886 }
1887 
1888 static void
1889 bdev_set_io_timeout_mt(void)
1890 {
1891 	struct spdk_io_channel *ch[3];
1892 	struct spdk_bdev_channel *bdev_ch[3];
1893 	struct timeout_io_cb_arg cb_arg;
1894 
1895 	setup_test();
1896 
1897 	g_bdev.bdev.optimal_io_boundary = 16;
1898 	g_bdev.bdev.split_on_optimal_io_boundary = true;
1899 
1900 	set_thread(0);
1901 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1902 	CU_ASSERT(ch[0] != NULL);
1903 
1904 	set_thread(1);
1905 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1906 	CU_ASSERT(ch[1] != NULL);
1907 
1908 	set_thread(2);
1909 	ch[2] = spdk_bdev_get_io_channel(g_desc);
1910 	CU_ASSERT(ch[2] != NULL);
1911 
1912 	/* Multi-thread mode
1913 	 * 1, Check the poller was registered successfully
1914 	 * 2, Check the timeout IO and ensure the IO was the submitted by user
1915 	 * 3, Check the link int the bdev_ch works right.
1916 	 * 4, Close desc and put io channel during the timeout poller is polling
1917 	 */
1918 
1919 	/* In desc thread set the timeout */
1920 	set_thread(0);
1921 	CU_ASSERT(spdk_bdev_set_timeout(g_desc, 5, bdev_channel_io_timeout_cb, &cb_arg) == 0);
1922 	CU_ASSERT(g_desc->io_timeout_poller != NULL);
1923 	CU_ASSERT(g_desc->cb_fn == bdev_channel_io_timeout_cb);
1924 	CU_ASSERT(g_desc->cb_arg == &cb_arg);
1925 
1926 	/* check the IO submitted list and timeout handler */
1927 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x2000, 0, 1, io_done, NULL) == 0);
1928 	bdev_ch[0] = spdk_io_channel_get_ctx(ch[0]);
1929 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 1);
1930 
1931 	set_thread(1);
1932 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1933 	bdev_ch[1] = spdk_io_channel_get_ctx(ch[1]);
1934 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 1);
1935 
1936 	/* Now test that a single-vector command is split correctly.
1937 	 * Offset 14, length 8, payload 0xF000
1938 	 *  Child - Offset 14, length 2, payload 0xF000
1939 	 *  Child - Offset 16, length 6, payload 0xF000 + 2 * 512
1940 	 *
1941 	 * Set up the expected values before calling spdk_bdev_read_blocks
1942 	 */
1943 	set_thread(2);
1944 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0xF000, 14, 8, io_done, NULL) == 0);
1945 	bdev_ch[2] = spdk_io_channel_get_ctx(ch[2]);
1946 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 3);
1947 
1948 	set_thread(0);
1949 	memset(&cb_arg, 0, sizeof(cb_arg));
1950 	spdk_delay_us(3 * spdk_get_ticks_hz());
1951 	poll_threads();
1952 	CU_ASSERT(cb_arg.type == 0);
1953 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
1954 	CU_ASSERT(cb_arg.iov.iov_len == 0);
1955 
1956 	/* Now the time reach the limit */
1957 	spdk_delay_us(3 * spdk_get_ticks_hz());
1958 	poll_thread(0);
1959 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1960 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
1961 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1962 	stub_complete_io(g_bdev.io_target, 1);
1963 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[0]) == 0);
1964 
1965 	memset(&cb_arg, 0, sizeof(cb_arg));
1966 	set_thread(1);
1967 	poll_thread(1);
1968 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
1969 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x1000);
1970 	CU_ASSERT(cb_arg.iov.iov_len == 1 * g_bdev.bdev.blocklen);
1971 	stub_complete_io(g_bdev.io_target, 1);
1972 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[1]) == 0);
1973 
1974 	memset(&cb_arg, 0, sizeof(cb_arg));
1975 	set_thread(2);
1976 	poll_thread(2);
1977 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_READ);
1978 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0xF000);
1979 	CU_ASSERT(cb_arg.iov.iov_len == 8 * g_bdev.bdev.blocklen);
1980 	stub_complete_io(g_bdev.io_target, 1);
1981 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 2);
1982 	stub_complete_io(g_bdev.io_target, 1);
1983 	CU_ASSERT(bdev_channel_count_submitted_io(bdev_ch[2]) == 0);
1984 
1985 	/* Run poll_timeout_done() it means complete the timeout poller */
1986 	set_thread(0);
1987 	poll_thread(0);
1988 	CU_ASSERT(g_desc->refs == 0);
1989 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[0], (void *)0x1000, 0, 1, io_done, NULL) == 0);
1990 	set_thread(1);
1991 	CU_ASSERT(spdk_bdev_write_blocks(g_desc, ch[1], (void *)0x2000, 0, 2, io_done, NULL) == 0);
1992 	set_thread(2);
1993 	CU_ASSERT(spdk_bdev_read_blocks(g_desc, ch[2], (void *)0x3000, 0, 3, io_done, NULL) == 0);
1994 
1995 	/* Trigger timeout poller to run again, desc->refs is incremented.
1996 	 * In thread 0 we destroy the io channel before timeout poller runs.
1997 	 * Timeout callback is not called on thread 0.
1998 	 */
1999 	spdk_delay_us(6 * spdk_get_ticks_hz());
2000 	memset(&cb_arg, 0, sizeof(cb_arg));
2001 	set_thread(0);
2002 	stub_complete_io(g_bdev.io_target, 1);
2003 	spdk_put_io_channel(ch[0]);
2004 	poll_thread(0);
2005 	CU_ASSERT(g_desc->refs == 1)
2006 	CU_ASSERT(cb_arg.type == 0);
2007 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
2008 	CU_ASSERT(cb_arg.iov.iov_len == 0);
2009 
2010 	/* In thread 1 timeout poller runs then we destroy the io channel
2011 	 * Timeout callback is called on thread 1.
2012 	 */
2013 	memset(&cb_arg, 0, sizeof(cb_arg));
2014 	set_thread(1);
2015 	poll_thread(1);
2016 	stub_complete_io(g_bdev.io_target, 1);
2017 	spdk_put_io_channel(ch[1]);
2018 	poll_thread(1);
2019 	CU_ASSERT(cb_arg.type == SPDK_BDEV_IO_TYPE_WRITE);
2020 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x2000);
2021 	CU_ASSERT(cb_arg.iov.iov_len == 2 * g_bdev.bdev.blocklen);
2022 
2023 	/* Close the desc.
2024 	 * Unregister the timeout poller first.
2025 	 * Then decrement desc->refs but it's not zero yet so desc is not freed.
2026 	 */
2027 	set_thread(0);
2028 	spdk_bdev_close(g_desc);
2029 	CU_ASSERT(g_desc->refs == 1);
2030 	CU_ASSERT(g_desc->io_timeout_poller == NULL);
2031 
2032 	/* Timeout poller runs on thread 2 then we destroy the io channel.
2033 	 * Desc is closed so we would exit the timeout poller directly.
2034 	 * timeout callback is not called on thread 2.
2035 	 */
2036 	memset(&cb_arg, 0, sizeof(cb_arg));
2037 	set_thread(2);
2038 	poll_thread(2);
2039 	stub_complete_io(g_bdev.io_target, 1);
2040 	spdk_put_io_channel(ch[2]);
2041 	poll_thread(2);
2042 	CU_ASSERT(cb_arg.type == 0);
2043 	CU_ASSERT(cb_arg.iov.iov_base == (void *)0x0);
2044 	CU_ASSERT(cb_arg.iov.iov_len == 0);
2045 
2046 	set_thread(0);
2047 	poll_thread(0);
2048 	g_teardown_done = false;
2049 	unregister_bdev(&g_bdev);
2050 	spdk_io_device_unregister(&g_io_device, NULL);
2051 	spdk_bdev_finish(finish_cb, NULL);
2052 	spdk_iobuf_finish(finish_cb, NULL);
2053 	poll_threads();
2054 	memset(&g_bdev, 0, sizeof(g_bdev));
2055 	CU_ASSERT(g_teardown_done == true);
2056 	g_teardown_done = false;
2057 	free_threads();
2058 	free_cores();
2059 }
2060 
2061 static bool g_io_done2;
2062 static bool g_lock_lba_range_done;
2063 static bool g_unlock_lba_range_done;
2064 
2065 static void
2066 io_done2(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2067 {
2068 	g_io_done2 = true;
2069 	spdk_bdev_free_io(bdev_io);
2070 }
2071 
2072 static void
2073 lock_lba_range_done(void *ctx, int status)
2074 {
2075 	g_lock_lba_range_done = true;
2076 }
2077 
2078 static void
2079 unlock_lba_range_done(void *ctx, int status)
2080 {
2081 	g_unlock_lba_range_done = true;
2082 }
2083 
2084 static uint32_t
2085 stub_channel_outstanding_cnt(void *io_target)
2086 {
2087 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
2088 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2089 	uint32_t outstanding_cnt;
2090 
2091 	outstanding_cnt = ch->outstanding_cnt;
2092 
2093 	spdk_put_io_channel(_ch);
2094 	return outstanding_cnt;
2095 }
2096 
2097 static void
2098 lock_lba_range_then_submit_io(void)
2099 {
2100 	struct spdk_bdev_desc *desc = NULL;
2101 	void *io_target;
2102 	struct spdk_io_channel *io_ch[3];
2103 	struct spdk_bdev_channel *bdev_ch[3];
2104 	struct lba_range *range;
2105 	char buf[4096];
2106 	int ctx0, ctx1, ctx2;
2107 	int rc;
2108 
2109 	setup_test();
2110 
2111 	io_target = g_bdev.io_target;
2112 	desc = g_desc;
2113 
2114 	set_thread(0);
2115 	io_ch[0] = spdk_bdev_get_io_channel(desc);
2116 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
2117 	CU_ASSERT(io_ch[0] != NULL);
2118 
2119 	set_thread(1);
2120 	io_ch[1] = spdk_bdev_get_io_channel(desc);
2121 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
2122 	CU_ASSERT(io_ch[1] != NULL);
2123 
2124 	set_thread(0);
2125 	g_lock_lba_range_done = false;
2126 	rc = bdev_lock_lba_range(desc, io_ch[0], 20, 10, lock_lba_range_done, &ctx0);
2127 	CU_ASSERT(rc == 0);
2128 	poll_threads();
2129 
2130 	/* The lock should immediately become valid, since there are no outstanding
2131 	 * write I/O.
2132 	 */
2133 	CU_ASSERT(g_lock_lba_range_done == true);
2134 	range = TAILQ_FIRST(&bdev_ch[0]->locked_ranges);
2135 	SPDK_CU_ASSERT_FATAL(range != NULL);
2136 	CU_ASSERT(range->offset == 20);
2137 	CU_ASSERT(range->length == 10);
2138 	CU_ASSERT(range->owner_ch == bdev_ch[0]);
2139 
2140 	g_io_done = false;
2141 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2142 	rc = spdk_bdev_read_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
2143 	CU_ASSERT(rc == 0);
2144 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2145 
2146 	stub_complete_io(io_target, 1);
2147 	poll_threads();
2148 	CU_ASSERT(g_io_done == true);
2149 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2150 
2151 	/* Try a write I/O.  This should actually be allowed to execute, since the channel
2152 	 * holding the lock is submitting the write I/O.
2153 	 */
2154 	g_io_done = false;
2155 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2156 	rc = spdk_bdev_write_blocks(desc, io_ch[0], buf, 20, 1, io_done, &ctx0);
2157 	CU_ASSERT(rc == 0);
2158 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2159 
2160 	stub_complete_io(io_target, 1);
2161 	poll_threads();
2162 	CU_ASSERT(g_io_done == true);
2163 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->io_locked));
2164 
2165 	/* Try a write I/O.  This should get queued in the io_locked tailq. */
2166 	set_thread(1);
2167 	g_io_done = false;
2168 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
2169 	rc = spdk_bdev_write_blocks(desc, io_ch[1], buf, 20, 1, io_done, &ctx1);
2170 	CU_ASSERT(rc == 0);
2171 	poll_threads();
2172 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
2173 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[1]->io_locked));
2174 	CU_ASSERT(g_io_done == false);
2175 
2176 	/* Try to unlock the lba range using thread 1's io_ch.  This should fail. */
2177 	rc = bdev_unlock_lba_range(desc, io_ch[1], 20, 10, unlock_lba_range_done, &ctx1);
2178 	CU_ASSERT(rc == -EINVAL);
2179 
2180 	/* Now create a new channel and submit a write I/O with it.  This should also be queued.
2181 	 * The new channel should inherit the active locks from the bdev's internal list.
2182 	 */
2183 	set_thread(2);
2184 	io_ch[2] = spdk_bdev_get_io_channel(desc);
2185 	bdev_ch[2] = spdk_io_channel_get_ctx(io_ch[2]);
2186 	CU_ASSERT(io_ch[2] != NULL);
2187 
2188 	g_io_done2 = false;
2189 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2190 	rc = spdk_bdev_write_blocks(desc, io_ch[2], buf, 22, 2, io_done2, &ctx2);
2191 	CU_ASSERT(rc == 0);
2192 	poll_threads();
2193 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 0);
2194 	CU_ASSERT(!TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2195 	CU_ASSERT(g_io_done2 == false);
2196 
2197 	set_thread(0);
2198 	rc = bdev_unlock_lba_range(desc, io_ch[0], 20, 10, unlock_lba_range_done, &ctx0);
2199 	CU_ASSERT(rc == 0);
2200 	poll_threads();
2201 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[0]->locked_ranges));
2202 
2203 	/* The LBA range is unlocked, so the write IOs should now have started execution. */
2204 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[1]->io_locked));
2205 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch[2]->io_locked));
2206 
2207 	set_thread(1);
2208 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2209 	stub_complete_io(io_target, 1);
2210 	set_thread(2);
2211 	CU_ASSERT(stub_channel_outstanding_cnt(io_target) == 1);
2212 	stub_complete_io(io_target, 1);
2213 
2214 	poll_threads();
2215 	CU_ASSERT(g_io_done == true);
2216 	CU_ASSERT(g_io_done2 == true);
2217 
2218 	/* Tear down the channels */
2219 	set_thread(0);
2220 	spdk_put_io_channel(io_ch[0]);
2221 	set_thread(1);
2222 	spdk_put_io_channel(io_ch[1]);
2223 	set_thread(2);
2224 	spdk_put_io_channel(io_ch[2]);
2225 	poll_threads();
2226 	set_thread(0);
2227 	teardown_test();
2228 }
2229 
2230 /* spdk_bdev_reset() freezes and unfreezes I/O channels by using spdk_for_each_channel().
2231  * spdk_bdev_unregister() calls spdk_io_device_unregister() in the end. However
2232  * spdk_io_device_unregister() fails if it is called while executing spdk_for_each_channel().
2233  * Hence, in this case, spdk_io_device_unregister() is deferred until spdk_bdev_reset()
2234  * completes. Test this behavior.
2235  */
2236 static void
2237 unregister_during_reset(void)
2238 {
2239 	struct spdk_io_channel *io_ch[2];
2240 	bool done_reset = false, done_unregister = false;
2241 	int rc;
2242 
2243 	setup_test();
2244 	set_thread(0);
2245 
2246 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
2247 	SPDK_CU_ASSERT_FATAL(io_ch[0] != NULL);
2248 
2249 	set_thread(1);
2250 
2251 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
2252 	SPDK_CU_ASSERT_FATAL(io_ch[1] != NULL);
2253 
2254 	set_thread(0);
2255 
2256 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
2257 
2258 	rc = spdk_bdev_reset(g_desc, io_ch[0], reset_done, &done_reset);
2259 	CU_ASSERT(rc == 0);
2260 
2261 	set_thread(0);
2262 
2263 	poll_thread_times(0, 1);
2264 
2265 	spdk_bdev_close(g_desc);
2266 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done_unregister);
2267 
2268 	CU_ASSERT(done_reset == false);
2269 	CU_ASSERT(done_unregister == false);
2270 
2271 	poll_threads();
2272 
2273 	stub_complete_io(g_bdev.io_target, 0);
2274 
2275 	poll_threads();
2276 
2277 	CU_ASSERT(done_reset == true);
2278 	CU_ASSERT(done_unregister == false);
2279 
2280 	spdk_put_io_channel(io_ch[0]);
2281 
2282 	set_thread(1);
2283 
2284 	spdk_put_io_channel(io_ch[1]);
2285 
2286 	poll_threads();
2287 
2288 	CU_ASSERT(done_unregister == true);
2289 
2290 	/* Restore the original g_bdev so that we can use teardown_test(). */
2291 	set_thread(0);
2292 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
2293 	spdk_bdev_open_ext("ut_bdev", true, _bdev_event_cb, NULL, &g_desc);
2294 	teardown_test();
2295 }
2296 
2297 static void
2298 bdev_init_wt_cb(void *done, int rc)
2299 {
2300 }
2301 
2302 
2303 static uint64_t
2304 get_wrong_thread_hits(void)
2305 {
2306 	static uint64_t previous = 0;
2307 	uint64_t ret, current;
2308 
2309 	current = spdk_deprecation_get_hits(_deprecated_bdev_register_examine_thread);
2310 	ret = current - previous;
2311 	previous = current;
2312 
2313 	return ret;
2314 }
2315 
2316 static int
2317 wrong_thread_setup(void)
2318 {
2319 	allocate_cores(1);
2320 	allocate_threads(2);
2321 	set_thread(0);
2322 
2323 	spdk_bdev_initialize(bdev_init_wt_cb, NULL);
2324 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
2325 				sizeof(struct ut_bdev_channel), NULL);
2326 
2327 	set_thread(1);
2328 
2329 	/* Ignore return, just setting the base for the next time it is called. */
2330 	get_wrong_thread_hits();
2331 
2332 	return 0;
2333 }
2334 
2335 static int
2336 wrong_thread_teardown(void)
2337 {
2338 	int rc;
2339 
2340 	rc = get_wrong_thread_hits();
2341 
2342 	set_thread(0);
2343 
2344 	g_teardown_done = false;
2345 	spdk_io_device_unregister(&g_io_device, NULL);
2346 	spdk_bdev_finish(finish_cb, NULL);
2347 	poll_threads();
2348 	memset(&g_bdev, 0, sizeof(g_bdev));
2349 	if (!g_teardown_done) {
2350 		fprintf(stderr, "%s:%d %s: teardown not done\n", __FILE__, __LINE__, __func__);
2351 		rc = -1;
2352 	}
2353 	g_teardown_done = false;
2354 
2355 	free_threads();
2356 	free_cores();
2357 
2358 	return rc;
2359 }
2360 
2361 static void
2362 spdk_bdev_register_wt(void)
2363 {
2364 	struct spdk_bdev bdev = { 0 };
2365 	int rc;
2366 	bool done;
2367 
2368 	bdev.name = "wt_bdev";
2369 	bdev.fn_table = &fn_table;
2370 	bdev.module = &bdev_ut_if;
2371 	bdev.blocklen = 4096;
2372 	bdev.blockcnt = 1024;
2373 
2374 	/* Can register only on app thread */
2375 	rc = spdk_bdev_register(&bdev);
2376 	CU_ASSERT(rc == 0);
2377 	CU_ASSERT(get_wrong_thread_hits() == 1);
2378 
2379 	/* Can unregister on any thread */
2380 	done = false;
2381 	spdk_bdev_unregister(&bdev, _bdev_unregistered, &done);
2382 	poll_threads();
2383 	CU_ASSERT(get_wrong_thread_hits() == 0);
2384 	CU_ASSERT(done);
2385 
2386 	/* Can unregister by name on any thread */
2387 	rc = spdk_bdev_register(&bdev);
2388 	CU_ASSERT(rc == 0);
2389 	CU_ASSERT(get_wrong_thread_hits() == 1);
2390 	done = false;
2391 	rc = spdk_bdev_unregister_by_name(bdev.name, bdev.module, _bdev_unregistered, &done);
2392 	CU_ASSERT(rc == 0);
2393 	poll_threads();
2394 	CU_ASSERT(get_wrong_thread_hits() == 0);
2395 	CU_ASSERT(done);
2396 }
2397 
2398 static void
2399 wait_for_examine_cb(void *arg)
2400 {
2401 	struct spdk_thread **thread = arg;
2402 
2403 	*thread = spdk_get_thread();
2404 }
2405 
2406 static void
2407 spdk_bdev_examine_wt(void)
2408 {
2409 	int rc;
2410 	bool save_auto_examine = g_bdev_opts.bdev_auto_examine;
2411 	struct spdk_thread *thread;
2412 
2413 	g_bdev_opts.bdev_auto_examine = false;
2414 
2415 	set_thread(0);
2416 	register_bdev(&g_bdev, "ut_bdev_wt", &g_io_device);
2417 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") != NULL);
2418 	set_thread(1);
2419 
2420 	/* Can examine only on the app thread */
2421 	rc = spdk_bdev_examine("ut_bdev_wt");
2422 	CU_ASSERT(rc == 0);
2423 	poll_threads();
2424 	CU_ASSERT(get_wrong_thread_hits() == 1);
2425 	unregister_bdev(&g_bdev);
2426 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") == NULL);
2427 	CU_ASSERT(get_wrong_thread_hits() == 0);
2428 
2429 	/* Can wait for examine on app thread, callback called on app thread. */
2430 	set_thread(0);
2431 	register_bdev(&g_bdev, "ut_bdev_wt", &g_io_device);
2432 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") != NULL);
2433 	thread = NULL;
2434 	rc = spdk_bdev_wait_for_examine(wait_for_examine_cb, &thread);
2435 	CU_ASSERT(rc == 0);
2436 	poll_threads();
2437 	CU_ASSERT(get_wrong_thread_hits() == 0);
2438 	CU_ASSERT(thread == spdk_get_thread());
2439 	unregister_bdev(&g_bdev);
2440 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") == NULL);
2441 	CU_ASSERT(get_wrong_thread_hits() == 0);
2442 
2443 	/* Can wait for examine on non-app thread, callback called on same thread. */
2444 	set_thread(0);
2445 	register_bdev(&g_bdev, "ut_bdev_wt", &g_io_device);
2446 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") != NULL);
2447 	thread = NULL;
2448 	rc = spdk_bdev_wait_for_examine(wait_for_examine_cb, &thread);
2449 	CU_ASSERT(rc == 0);
2450 	poll_threads();
2451 	CU_ASSERT(get_wrong_thread_hits() == 0);
2452 	CU_ASSERT(thread == spdk_get_thread());
2453 	unregister_bdev(&g_bdev);
2454 	CU_ASSERT(spdk_bdev_get_by_name("ut_bdev_wt") == NULL);
2455 	CU_ASSERT(get_wrong_thread_hits() == 0);
2456 
2457 	unregister_bdev(&g_bdev);
2458 	g_bdev_opts.bdev_auto_examine = save_auto_examine;
2459 }
2460 
2461 int
2462 main(int argc, char **argv)
2463 {
2464 	CU_pSuite	suite = NULL;
2465 	CU_pSuite	suite_wt = NULL;
2466 	unsigned int	num_failures;
2467 
2468 	CU_set_error_action(CUEA_ABORT);
2469 	CU_initialize_registry();
2470 
2471 	suite = CU_add_suite("bdev", NULL, NULL);
2472 	suite_wt = CU_add_suite("bdev_wrong_thread", wrong_thread_setup, wrong_thread_teardown);
2473 
2474 	CU_ADD_TEST(suite, basic);
2475 	CU_ADD_TEST(suite, unregister_and_close);
2476 	CU_ADD_TEST(suite, basic_qos);
2477 	CU_ADD_TEST(suite, put_channel_during_reset);
2478 	CU_ADD_TEST(suite, aborted_reset);
2479 	CU_ADD_TEST(suite, aborted_reset_no_outstanding_io);
2480 	CU_ADD_TEST(suite, io_during_reset);
2481 	CU_ADD_TEST(suite, reset_completions);
2482 	CU_ADD_TEST(suite, io_during_qos_queue);
2483 	CU_ADD_TEST(suite, io_during_qos_reset);
2484 	CU_ADD_TEST(suite, enomem);
2485 	CU_ADD_TEST(suite, enomem_multi_bdev);
2486 	CU_ADD_TEST(suite, enomem_multi_bdev_unregister);
2487 	CU_ADD_TEST(suite, enomem_multi_io_target);
2488 	CU_ADD_TEST(suite, qos_dynamic_enable);
2489 	CU_ADD_TEST(suite, bdev_histograms_mt);
2490 	CU_ADD_TEST(suite, bdev_set_io_timeout_mt);
2491 	CU_ADD_TEST(suite, lock_lba_range_then_submit_io);
2492 	CU_ADD_TEST(suite, unregister_during_reset);
2493 	CU_ADD_TEST(suite_wt, spdk_bdev_register_wt);
2494 	CU_ADD_TEST(suite_wt, spdk_bdev_examine_wt);
2495 
2496 	CU_basic_set_mode(CU_BRM_VERBOSE);
2497 	CU_basic_run_tests();
2498 	num_failures = CU_get_number_of_failures();
2499 	CU_cleanup_registry();
2500 	return num_failures;
2501 }
2502