xref: /spdk/test/unit/lib/bdev/mt/bdev.c/bdev_ut.c (revision bb488d2829a9b7863daab45917dd2174905cc0ae)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_cunit.h"
35 
36 #include "common/lib/ut_multithread.c"
37 #include "unit/lib/json_mock.c"
38 
39 #include "spdk/config.h"
40 /* HACK: disable VTune integration so the unit test doesn't need VTune headers and libs to build */
41 #undef SPDK_CONFIG_VTUNE
42 
43 #include "bdev/bdev.c"
44 
45 #define BDEV_UT_NUM_THREADS 3
46 
47 DEFINE_STUB(spdk_conf_find_section, struct spdk_conf_section *, (struct spdk_conf *cp,
48 		const char *name), NULL);
49 DEFINE_STUB(spdk_conf_section_get_nmval, char *,
50 	    (struct spdk_conf_section *sp, const char *key, int idx1, int idx2), NULL);
51 DEFINE_STUB(spdk_conf_section_get_intval, int, (struct spdk_conf_section *sp, const char *key), -1);
52 
53 struct spdk_trace_histories *g_trace_histories;
54 DEFINE_STUB_V(spdk_trace_add_register_fn, (struct spdk_trace_register_fn *reg_fn));
55 DEFINE_STUB_V(spdk_trace_register_owner, (uint8_t type, char id_prefix));
56 DEFINE_STUB_V(spdk_trace_register_object, (uint8_t type, char id_prefix));
57 DEFINE_STUB_V(spdk_trace_register_description, (const char *name, const char *short_name,
58 		uint16_t tpoint_id, uint8_t owner_type,
59 		uint8_t object_type, uint8_t new_object,
60 		uint8_t arg1_is_ptr, const char *arg1_name));
61 DEFINE_STUB_V(_spdk_trace_record, (uint64_t tsc, uint16_t tpoint_id, uint16_t poller_id,
62 				   uint32_t size, uint64_t object_id, uint64_t arg1));
63 DEFINE_STUB(spdk_notify_send, uint64_t, (const char *type, const char *ctx), 0);
64 DEFINE_STUB(spdk_notify_type_register, struct spdk_notify_type *, (const char *type), NULL);
65 
66 struct ut_bdev {
67 	struct spdk_bdev	bdev;
68 	void			*io_target;
69 };
70 
71 struct ut_bdev_channel {
72 	TAILQ_HEAD(, spdk_bdev_io)	outstanding_io;
73 	uint32_t			outstanding_cnt;
74 	uint32_t			avail_cnt;
75 };
76 
77 int g_io_device;
78 struct ut_bdev g_bdev;
79 struct spdk_bdev_desc *g_desc;
80 bool g_teardown_done = false;
81 bool g_get_io_channel = true;
82 bool g_create_ch = true;
83 bool g_init_complete_called = false;
84 bool g_fini_start_called = true;
85 int g_status = 0;
86 int g_count = 0;
87 struct spdk_histogram_data *g_histogram = NULL;
88 
89 static int
90 stub_create_ch(void *io_device, void *ctx_buf)
91 {
92 	struct ut_bdev_channel *ch = ctx_buf;
93 
94 	if (g_create_ch == false) {
95 		return -1;
96 	}
97 
98 	TAILQ_INIT(&ch->outstanding_io);
99 	ch->outstanding_cnt = 0;
100 	/*
101 	 * When avail gets to 0, the submit_request function will return ENOMEM.
102 	 *  Most tests to not want ENOMEM to occur, so by default set this to a
103 	 *  big value that won't get hit.  The ENOMEM tests can then override this
104 	 *  value to something much smaller to induce ENOMEM conditions.
105 	 */
106 	ch->avail_cnt = 2048;
107 	return 0;
108 }
109 
110 static void
111 stub_destroy_ch(void *io_device, void *ctx_buf)
112 {
113 }
114 
115 static struct spdk_io_channel *
116 stub_get_io_channel(void *ctx)
117 {
118 	struct ut_bdev *ut_bdev = ctx;
119 
120 	if (g_get_io_channel == true) {
121 		return spdk_get_io_channel(ut_bdev->io_target);
122 	} else {
123 		return NULL;
124 	}
125 }
126 
127 static int
128 stub_destruct(void *ctx)
129 {
130 	return 0;
131 }
132 
133 static void
134 stub_submit_request(struct spdk_io_channel *_ch, struct spdk_bdev_io *bdev_io)
135 {
136 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
137 
138 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
139 		struct spdk_bdev_io *io;
140 
141 		while (!TAILQ_EMPTY(&ch->outstanding_io)) {
142 			io = TAILQ_FIRST(&ch->outstanding_io);
143 			TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
144 			ch->outstanding_cnt--;
145 			spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_FAILED);
146 			ch->avail_cnt++;
147 		}
148 	}
149 
150 	if (ch->avail_cnt > 0) {
151 		TAILQ_INSERT_TAIL(&ch->outstanding_io, bdev_io, module_link);
152 		ch->outstanding_cnt++;
153 		ch->avail_cnt--;
154 	} else {
155 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
156 	}
157 }
158 
159 static uint32_t
160 stub_complete_io(void *io_target, uint32_t num_to_complete)
161 {
162 	struct spdk_io_channel *_ch = spdk_get_io_channel(io_target);
163 	struct ut_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
164 	struct spdk_bdev_io *io;
165 	bool complete_all = (num_to_complete == 0);
166 	uint32_t num_completed = 0;
167 
168 	while (complete_all || num_completed < num_to_complete) {
169 		if (TAILQ_EMPTY(&ch->outstanding_io)) {
170 			break;
171 		}
172 		io = TAILQ_FIRST(&ch->outstanding_io);
173 		TAILQ_REMOVE(&ch->outstanding_io, io, module_link);
174 		ch->outstanding_cnt--;
175 		spdk_bdev_io_complete(io, SPDK_BDEV_IO_STATUS_SUCCESS);
176 		ch->avail_cnt++;
177 		num_completed++;
178 	}
179 
180 	spdk_put_io_channel(_ch);
181 	return num_completed;
182 }
183 
184 static struct spdk_bdev_fn_table fn_table = {
185 	.get_io_channel =	stub_get_io_channel,
186 	.destruct =		stub_destruct,
187 	.submit_request =	stub_submit_request,
188 };
189 
190 static int
191 module_init(void)
192 {
193 	return 0;
194 }
195 
196 static void
197 module_fini(void)
198 {
199 }
200 
201 static void
202 init_complete(void)
203 {
204 	g_init_complete_called = true;
205 }
206 
207 static void
208 fini_start(void)
209 {
210 	g_fini_start_called = true;
211 }
212 
213 struct spdk_bdev_module bdev_ut_if = {
214 	.name = "bdev_ut",
215 	.module_init = module_init,
216 	.module_fini = module_fini,
217 	.init_complete = init_complete,
218 	.fini_start = fini_start,
219 };
220 
221 SPDK_BDEV_MODULE_REGISTER(bdev_ut, &bdev_ut_if)
222 
223 static void
224 register_bdev(struct ut_bdev *ut_bdev, char *name, void *io_target)
225 {
226 	memset(ut_bdev, 0, sizeof(*ut_bdev));
227 
228 	ut_bdev->io_target = io_target;
229 	ut_bdev->bdev.ctxt = ut_bdev;
230 	ut_bdev->bdev.name = name;
231 	ut_bdev->bdev.fn_table = &fn_table;
232 	ut_bdev->bdev.module = &bdev_ut_if;
233 	ut_bdev->bdev.blocklen = 4096;
234 	ut_bdev->bdev.blockcnt = 1024;
235 
236 	spdk_bdev_register(&ut_bdev->bdev);
237 }
238 
239 static void
240 unregister_bdev(struct ut_bdev *ut_bdev)
241 {
242 	/* Handle any deferred messages. */
243 	poll_threads();
244 	spdk_bdev_unregister(&ut_bdev->bdev, NULL, NULL);
245 }
246 
247 static void
248 bdev_init_cb(void *done, int rc)
249 {
250 	CU_ASSERT(rc == 0);
251 	*(bool *)done = true;
252 }
253 
254 static void
255 setup_test(void)
256 {
257 	bool done = false;
258 
259 	allocate_threads(BDEV_UT_NUM_THREADS);
260 	set_thread(0);
261 	spdk_bdev_initialize(bdev_init_cb, &done);
262 	spdk_io_device_register(&g_io_device, stub_create_ch, stub_destroy_ch,
263 				sizeof(struct ut_bdev_channel), NULL);
264 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
265 	spdk_bdev_open(&g_bdev.bdev, true, NULL, NULL, &g_desc);
266 }
267 
268 static void
269 finish_cb(void *cb_arg)
270 {
271 	g_teardown_done = true;
272 }
273 
274 static void
275 teardown_test(void)
276 {
277 	set_thread(0);
278 	g_teardown_done = false;
279 	spdk_bdev_close(g_desc);
280 	g_desc = NULL;
281 	unregister_bdev(&g_bdev);
282 	spdk_io_device_unregister(&g_io_device, NULL);
283 	spdk_bdev_finish(finish_cb, NULL);
284 	poll_threads();
285 	memset(&g_bdev, 0, sizeof(g_bdev));
286 	CU_ASSERT(g_teardown_done == true);
287 	g_teardown_done = false;
288 	free_threads();
289 }
290 
291 static uint32_t
292 bdev_io_tailq_cnt(bdev_io_tailq_t *tailq)
293 {
294 	struct spdk_bdev_io *io;
295 	uint32_t cnt = 0;
296 
297 	TAILQ_FOREACH(io, tailq, internal.link) {
298 		cnt++;
299 	}
300 
301 	return cnt;
302 }
303 
304 static void
305 basic(void)
306 {
307 	g_init_complete_called = false;
308 	setup_test();
309 	CU_ASSERT(g_init_complete_called == true);
310 
311 	set_thread(0);
312 
313 	g_get_io_channel = false;
314 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
315 	CU_ASSERT(g_ut_threads[0].ch == NULL);
316 
317 	g_get_io_channel = true;
318 	g_create_ch = false;
319 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
320 	CU_ASSERT(g_ut_threads[0].ch == NULL);
321 
322 	g_get_io_channel = true;
323 	g_create_ch = true;
324 	g_ut_threads[0].ch = spdk_bdev_get_io_channel(g_desc);
325 	CU_ASSERT(g_ut_threads[0].ch != NULL);
326 	spdk_put_io_channel(g_ut_threads[0].ch);
327 
328 	g_fini_start_called = false;
329 	teardown_test();
330 	CU_ASSERT(g_fini_start_called == true);
331 }
332 
333 static void
334 _bdev_removed(void *done)
335 {
336 	*(bool *)done = true;
337 }
338 
339 static void
340 _bdev_unregistered(void *done, int rc)
341 {
342 	CU_ASSERT(rc == 0);
343 	*(bool *)done = true;
344 }
345 
346 static void
347 unregister_and_close(void)
348 {
349 	bool done, remove_notify;
350 	struct spdk_bdev_desc *desc;
351 
352 	setup_test();
353 	set_thread(0);
354 
355 	/* setup_test() automatically opens the bdev,
356 	 * but this test needs to do that in a different
357 	 * way. */
358 	spdk_bdev_close(g_desc);
359 	poll_threads();
360 
361 	remove_notify = false;
362 	spdk_bdev_open(&g_bdev.bdev, true, _bdev_removed, &remove_notify, &desc);
363 	CU_ASSERT(remove_notify == false);
364 	CU_ASSERT(desc != NULL);
365 
366 	/* There is an open descriptor on the device. Unregister it,
367 	 * which can't proceed until the descriptor is closed. */
368 	done = false;
369 	spdk_bdev_unregister(&g_bdev.bdev, _bdev_unregistered, &done);
370 	/* No polling has occurred, so neither of these should execute */
371 	CU_ASSERT(remove_notify == false);
372 	CU_ASSERT(done == false);
373 
374 	/* Prior to the unregister completing, close the descriptor */
375 	spdk_bdev_close(desc);
376 
377 	/* Poll the threads to allow all events to be processed */
378 	poll_threads();
379 
380 	/* Remove notify should not have been called because the
381 	 * descriptor is already closed. */
382 	CU_ASSERT(remove_notify == false);
383 
384 	/* The unregister should have completed */
385 	CU_ASSERT(done == true);
386 
387 	/* Restore the original g_bdev so that we can use teardown_test(). */
388 	register_bdev(&g_bdev, "ut_bdev", &g_io_device);
389 	spdk_bdev_open(&g_bdev.bdev, true, NULL, NULL, &g_desc);
390 	teardown_test();
391 }
392 
393 static void
394 reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
395 {
396 	bool *done = cb_arg;
397 
398 	CU_ASSERT(success == true);
399 	*done = true;
400 	spdk_bdev_free_io(bdev_io);
401 }
402 
403 static void
404 put_channel_during_reset(void)
405 {
406 	struct spdk_io_channel *io_ch;
407 	bool done = false;
408 
409 	setup_test();
410 
411 	set_thread(0);
412 	io_ch = spdk_bdev_get_io_channel(g_desc);
413 	CU_ASSERT(io_ch != NULL);
414 
415 	/*
416 	 * Start a reset, but then put the I/O channel before
417 	 *  the deferred messages for the reset get a chance to
418 	 *  execute.
419 	 */
420 	spdk_bdev_reset(g_desc, io_ch, reset_done, &done);
421 	spdk_put_io_channel(io_ch);
422 	poll_threads();
423 	stub_complete_io(g_bdev.io_target, 0);
424 
425 	teardown_test();
426 }
427 
428 static void
429 aborted_reset_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
430 {
431 	enum spdk_bdev_io_status *status = cb_arg;
432 
433 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
434 	spdk_bdev_free_io(bdev_io);
435 }
436 
437 static void
438 aborted_reset(void)
439 {
440 	struct spdk_io_channel *io_ch[2];
441 	enum spdk_bdev_io_status status1 = SPDK_BDEV_IO_STATUS_PENDING,
442 				 status2 = SPDK_BDEV_IO_STATUS_PENDING;
443 
444 	setup_test();
445 
446 	set_thread(0);
447 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
448 	CU_ASSERT(io_ch[0] != NULL);
449 	spdk_bdev_reset(g_desc, io_ch[0], aborted_reset_done, &status1);
450 	poll_threads();
451 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
452 
453 	/*
454 	 * First reset has been submitted on ch0.  Now submit a second
455 	 *  reset on ch1 which will get queued since there is already a
456 	 *  reset in progress.
457 	 */
458 	set_thread(1);
459 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
460 	CU_ASSERT(io_ch[1] != NULL);
461 	spdk_bdev_reset(g_desc, io_ch[1], aborted_reset_done, &status2);
462 	poll_threads();
463 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
464 
465 	/*
466 	 * Now destroy ch1.  This will abort the queued reset.  Check that
467 	 *  the second reset was completed with failed status.  Also check
468 	 *  that bdev->internal.reset_in_progress != NULL, since the
469 	 *  original reset has not been completed yet.  This ensures that
470 	 *  the bdev code is correctly noticing that the failed reset is
471 	 *  *not* the one that had been submitted to the bdev module.
472 	 */
473 	set_thread(1);
474 	spdk_put_io_channel(io_ch[1]);
475 	poll_threads();
476 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_FAILED);
477 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress != NULL);
478 
479 	/*
480 	 * Now complete the first reset, verify that it completed with SUCCESS
481 	 *  status and that bdev->internal.reset_in_progress is also set back to NULL.
482 	 */
483 	set_thread(0);
484 	spdk_put_io_channel(io_ch[0]);
485 	stub_complete_io(g_bdev.io_target, 0);
486 	poll_threads();
487 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
488 	CU_ASSERT(g_bdev.bdev.internal.reset_in_progress == NULL);
489 
490 	teardown_test();
491 }
492 
493 static void
494 io_during_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
495 {
496 	enum spdk_bdev_io_status *status = cb_arg;
497 
498 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
499 	spdk_bdev_free_io(bdev_io);
500 }
501 
502 static void
503 io_during_reset(void)
504 {
505 	struct spdk_io_channel *io_ch[2];
506 	struct spdk_bdev_channel *bdev_ch[2];
507 	enum spdk_bdev_io_status status0, status1, status_reset;
508 	int rc;
509 
510 	setup_test();
511 
512 	/*
513 	 * First test normal case - submit an I/O on each of two channels (with no resets)
514 	 *  and verify they complete successfully.
515 	 */
516 	set_thread(0);
517 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
518 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
519 	CU_ASSERT(bdev_ch[0]->flags == 0);
520 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
521 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
522 	CU_ASSERT(rc == 0);
523 
524 	set_thread(1);
525 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
526 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
527 	CU_ASSERT(bdev_ch[1]->flags == 0);
528 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
529 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
530 	CU_ASSERT(rc == 0);
531 
532 	poll_threads();
533 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
534 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
535 
536 	set_thread(0);
537 	stub_complete_io(g_bdev.io_target, 0);
538 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
539 
540 	set_thread(1);
541 	stub_complete_io(g_bdev.io_target, 0);
542 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
543 
544 	/*
545 	 * Now submit a reset, and leave it pending while we submit I/O on two different
546 	 *  channels.  These I/O should be failed by the bdev layer since the reset is in
547 	 *  progress.
548 	 */
549 	set_thread(0);
550 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
551 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &status_reset);
552 	CU_ASSERT(rc == 0);
553 
554 	CU_ASSERT(bdev_ch[0]->flags == 0);
555 	CU_ASSERT(bdev_ch[1]->flags == 0);
556 	poll_threads();
557 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_RESET_IN_PROGRESS);
558 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_RESET_IN_PROGRESS);
559 
560 	set_thread(0);
561 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
562 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
563 	CU_ASSERT(rc == 0);
564 
565 	set_thread(1);
566 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
567 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
568 	CU_ASSERT(rc == 0);
569 
570 	/*
571 	 * A reset is in progress so these read I/O should complete with failure.  Note that we
572 	 *  need to poll_threads() since I/O completed inline have their completion deferred.
573 	 */
574 	poll_threads();
575 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
576 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_FAILED);
577 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_FAILED);
578 
579 	/*
580 	 * Complete the reset
581 	 */
582 	set_thread(0);
583 	stub_complete_io(g_bdev.io_target, 0);
584 
585 	/*
586 	 * Only poll thread 0. We should not get a completion.
587 	 */
588 	poll_thread(0);
589 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_PENDING);
590 
591 	/*
592 	 * Poll both thread 0 and 1 so the messages can propagate and we
593 	 * get a completion.
594 	 */
595 	poll_threads();
596 	CU_ASSERT(status_reset == SPDK_BDEV_IO_STATUS_SUCCESS);
597 
598 	spdk_put_io_channel(io_ch[0]);
599 	set_thread(1);
600 	spdk_put_io_channel(io_ch[1]);
601 	poll_threads();
602 
603 	teardown_test();
604 }
605 
606 static void
607 basic_qos(void)
608 {
609 	struct spdk_io_channel *io_ch[2];
610 	struct spdk_bdev_channel *bdev_ch[2];
611 	struct spdk_bdev *bdev;
612 	enum spdk_bdev_io_status status;
613 	int rc;
614 
615 	setup_test();
616 
617 	/* Enable QoS */
618 	bdev = &g_bdev.bdev;
619 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
620 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
621 	TAILQ_INIT(&bdev->internal.qos->queued);
622 	/*
623 	 * Enable read/write IOPS, read only byte per second and
624 	 * read/write byte per second rate limits.
625 	 * In this case, all rate limits will take equal effect.
626 	 */
627 	/* 2000 read/write I/O per second, or 2 per millisecond */
628 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
629 	/* 8K read/write byte per millisecond with 4K block size */
630 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
631 	/* 8K read only byte per millisecond with 4K block size */
632 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 8192000;
633 
634 	g_get_io_channel = true;
635 
636 	set_thread(0);
637 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
638 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
639 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
640 
641 	set_thread(1);
642 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
643 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
644 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
645 
646 	/*
647 	 * Send an I/O on thread 0, which is where the QoS thread is running.
648 	 */
649 	set_thread(0);
650 	status = SPDK_BDEV_IO_STATUS_PENDING;
651 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status);
652 	CU_ASSERT(rc == 0);
653 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
654 	poll_threads();
655 	stub_complete_io(g_bdev.io_target, 0);
656 	poll_threads();
657 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
658 
659 	/* Send an I/O on thread 1. The QoS thread is not running here. */
660 	status = SPDK_BDEV_IO_STATUS_PENDING;
661 	set_thread(1);
662 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status);
663 	CU_ASSERT(rc == 0);
664 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
665 	poll_threads();
666 	/* Complete I/O on thread 1. This should not complete the I/O we submitted */
667 	stub_complete_io(g_bdev.io_target, 0);
668 	poll_threads();
669 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_PENDING);
670 	/* Now complete I/O on thread 0 */
671 	set_thread(0);
672 	poll_threads();
673 	stub_complete_io(g_bdev.io_target, 0);
674 	poll_threads();
675 	CU_ASSERT(status == SPDK_BDEV_IO_STATUS_SUCCESS);
676 
677 	/*
678 	 * Close the descriptor only, which should stop the qos channel as
679 	 * the last descriptor removed.
680 	 */
681 	spdk_bdev_close(g_desc);
682 	poll_threads();
683 	CU_ASSERT(bdev->internal.qos->ch == NULL);
684 
685 	/*
686 	 * Open the bdev again which shall setup the qos channel as the
687 	 * channels are valid.
688 	 */
689 	spdk_bdev_open(bdev, true, NULL, NULL, &g_desc);
690 	poll_threads();
691 	CU_ASSERT(bdev->internal.qos->ch != NULL);
692 
693 	/* Tear down the channels */
694 	set_thread(0);
695 	spdk_put_io_channel(io_ch[0]);
696 	set_thread(1);
697 	spdk_put_io_channel(io_ch[1]);
698 	poll_threads();
699 	set_thread(0);
700 
701 	/* Close the descriptor, which should stop the qos channel */
702 	spdk_bdev_close(g_desc);
703 	poll_threads();
704 	CU_ASSERT(bdev->internal.qos->ch == NULL);
705 
706 	/* Open the bdev again, no qos channel setup without valid channels. */
707 	spdk_bdev_open(bdev, true, NULL, NULL, &g_desc);
708 	poll_threads();
709 	CU_ASSERT(bdev->internal.qos->ch == NULL);
710 
711 	/* Create the channels in reverse order. */
712 	set_thread(1);
713 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
714 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
715 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
716 
717 	set_thread(0);
718 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
719 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
720 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
721 
722 	/* Confirm that the qos thread is now thread 1 */
723 	CU_ASSERT(bdev->internal.qos->ch == bdev_ch[1]);
724 
725 	/* Tear down the channels */
726 	set_thread(0);
727 	spdk_put_io_channel(io_ch[0]);
728 	set_thread(1);
729 	spdk_put_io_channel(io_ch[1]);
730 	poll_threads();
731 
732 	set_thread(0);
733 
734 	teardown_test();
735 }
736 
737 static void
738 io_during_qos_queue(void)
739 {
740 	struct spdk_io_channel *io_ch[2];
741 	struct spdk_bdev_channel *bdev_ch[2];
742 	struct spdk_bdev *bdev;
743 	enum spdk_bdev_io_status status0, status1, status2;
744 	int rc;
745 
746 	setup_test();
747 	MOCK_SET(spdk_get_ticks, 0);
748 
749 	/* Enable QoS */
750 	bdev = &g_bdev.bdev;
751 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
752 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
753 	TAILQ_INIT(&bdev->internal.qos->queued);
754 	/*
755 	 * Enable read/write IOPS, read only byte per sec, write only
756 	 * byte per sec and read/write byte per sec rate limits.
757 	 * In this case, both read only and write only byte per sec
758 	 * rate limit will take effect.
759 	 */
760 	/* 4000 read/write I/O per second, or 4 per millisecond */
761 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 4000;
762 	/* 8K byte per millisecond with 4K block size */
763 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 8192000;
764 	/* 4K byte per millisecond with 4K block size */
765 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT].limit = 4096000;
766 	/* 4K byte per millisecond with 4K block size */
767 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 4096000;
768 
769 	g_get_io_channel = true;
770 
771 	/* Create channels */
772 	set_thread(0);
773 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
774 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
775 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
776 
777 	set_thread(1);
778 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
779 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
780 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
781 
782 	/* Send two read I/Os */
783 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
784 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
785 	CU_ASSERT(rc == 0);
786 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
787 	set_thread(0);
788 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
789 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
790 	CU_ASSERT(rc == 0);
791 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
792 	/* Send one write I/O */
793 	status2 = SPDK_BDEV_IO_STATUS_PENDING;
794 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status2);
795 	CU_ASSERT(rc == 0);
796 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_PENDING);
797 
798 	/* Complete any I/O that arrived at the disk */
799 	poll_threads();
800 	set_thread(1);
801 	stub_complete_io(g_bdev.io_target, 0);
802 	set_thread(0);
803 	stub_complete_io(g_bdev.io_target, 0);
804 	poll_threads();
805 
806 	/* Only one of the two read I/Os should complete. (logical XOR) */
807 	if (status0 == SPDK_BDEV_IO_STATUS_SUCCESS) {
808 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
809 	} else {
810 		CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
811 	}
812 	/* The write I/O should complete. */
813 	CU_ASSERT(status2 == SPDK_BDEV_IO_STATUS_SUCCESS);
814 
815 	/* Advance in time by a millisecond */
816 	spdk_delay_us(1000);
817 
818 	/* Complete more I/O */
819 	poll_threads();
820 	set_thread(1);
821 	stub_complete_io(g_bdev.io_target, 0);
822 	set_thread(0);
823 	stub_complete_io(g_bdev.io_target, 0);
824 	poll_threads();
825 
826 	/* Now the second read I/O should be done */
827 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_SUCCESS);
828 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_SUCCESS);
829 
830 	/* Tear down the channels */
831 	set_thread(1);
832 	spdk_put_io_channel(io_ch[1]);
833 	set_thread(0);
834 	spdk_put_io_channel(io_ch[0]);
835 	poll_threads();
836 
837 	teardown_test();
838 }
839 
840 static void
841 io_during_qos_reset(void)
842 {
843 	struct spdk_io_channel *io_ch[2];
844 	struct spdk_bdev_channel *bdev_ch[2];
845 	struct spdk_bdev *bdev;
846 	enum spdk_bdev_io_status status0, status1, reset_status;
847 	int rc;
848 
849 	setup_test();
850 	MOCK_SET(spdk_get_ticks, 0);
851 
852 	/* Enable QoS */
853 	bdev = &g_bdev.bdev;
854 	bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
855 	SPDK_CU_ASSERT_FATAL(bdev->internal.qos != NULL);
856 	TAILQ_INIT(&bdev->internal.qos->queued);
857 	/*
858 	 * Enable read/write IOPS, write only byte per sec and
859 	 * read/write byte per second rate limits.
860 	 * In this case, read/write byte per second rate limit will
861 	 * take effect first.
862 	 */
863 	/* 2000 read/write I/O per second, or 2 per millisecond */
864 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT].limit = 2000;
865 	/* 4K byte per millisecond with 4K block size */
866 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT].limit = 4096000;
867 	/* 8K byte per millisecond with 4K block size */
868 	bdev->internal.qos->rate_limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT].limit = 8192000;
869 
870 	g_get_io_channel = true;
871 
872 	/* Create channels */
873 	set_thread(0);
874 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
875 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
876 	CU_ASSERT(bdev_ch[0]->flags == BDEV_CH_QOS_ENABLED);
877 
878 	set_thread(1);
879 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
880 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
881 	CU_ASSERT(bdev_ch[1]->flags == BDEV_CH_QOS_ENABLED);
882 
883 	/* Send two I/O. One of these gets queued by QoS. The other is sitting at the disk. */
884 	status1 = SPDK_BDEV_IO_STATUS_PENDING;
885 	rc = spdk_bdev_write_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &status1);
886 	CU_ASSERT(rc == 0);
887 	set_thread(0);
888 	status0 = SPDK_BDEV_IO_STATUS_PENDING;
889 	rc = spdk_bdev_write_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &status0);
890 	CU_ASSERT(rc == 0);
891 
892 	poll_threads();
893 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_PENDING);
894 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_PENDING);
895 
896 	/* Reset the bdev. */
897 	reset_status = SPDK_BDEV_IO_STATUS_PENDING;
898 	rc = spdk_bdev_reset(g_desc, io_ch[0], io_during_io_done, &reset_status);
899 	CU_ASSERT(rc == 0);
900 
901 	/* Complete any I/O that arrived at the disk */
902 	poll_threads();
903 	set_thread(1);
904 	stub_complete_io(g_bdev.io_target, 0);
905 	set_thread(0);
906 	stub_complete_io(g_bdev.io_target, 0);
907 	poll_threads();
908 
909 	CU_ASSERT(reset_status == SPDK_BDEV_IO_STATUS_SUCCESS);
910 	CU_ASSERT(status0 == SPDK_BDEV_IO_STATUS_FAILED);
911 	CU_ASSERT(status1 == SPDK_BDEV_IO_STATUS_FAILED);
912 
913 	/* Tear down the channels */
914 	set_thread(1);
915 	spdk_put_io_channel(io_ch[1]);
916 	set_thread(0);
917 	spdk_put_io_channel(io_ch[0]);
918 	poll_threads();
919 
920 	teardown_test();
921 }
922 
923 static void
924 enomem_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
925 {
926 	enum spdk_bdev_io_status *status = cb_arg;
927 
928 	*status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
929 	spdk_bdev_free_io(bdev_io);
930 }
931 
932 static void
933 enomem(void)
934 {
935 	struct spdk_io_channel *io_ch;
936 	struct spdk_bdev_channel *bdev_ch;
937 	struct spdk_bdev_shared_resource *shared_resource;
938 	struct ut_bdev_channel *ut_ch;
939 	const uint32_t IO_ARRAY_SIZE = 64;
940 	const uint32_t AVAIL = 20;
941 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE], status_reset;
942 	uint32_t nomem_cnt, i;
943 	struct spdk_bdev_io *first_io;
944 	int rc;
945 
946 	setup_test();
947 
948 	set_thread(0);
949 	io_ch = spdk_bdev_get_io_channel(g_desc);
950 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
951 	shared_resource = bdev_ch->shared_resource;
952 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
953 	ut_ch->avail_cnt = AVAIL;
954 
955 	/* First submit a number of IOs equal to what the channel can support. */
956 	for (i = 0; i < AVAIL; i++) {
957 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
958 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
959 		CU_ASSERT(rc == 0);
960 	}
961 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
962 
963 	/*
964 	 * Next, submit one additional I/O.  This one should fail with ENOMEM and then go onto
965 	 *  the enomem_io list.
966 	 */
967 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
968 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
969 	CU_ASSERT(rc == 0);
970 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
971 	first_io = TAILQ_FIRST(&shared_resource->nomem_io);
972 
973 	/*
974 	 * Now submit a bunch more I/O.  These should all fail with ENOMEM and get queued behind
975 	 *  the first_io above.
976 	 */
977 	for (i = AVAIL + 1; i < IO_ARRAY_SIZE; i++) {
978 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
979 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
980 		CU_ASSERT(rc == 0);
981 	}
982 
983 	/* Assert that first_io is still at the head of the list. */
984 	CU_ASSERT(TAILQ_FIRST(&shared_resource->nomem_io) == first_io);
985 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == (IO_ARRAY_SIZE - AVAIL));
986 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
987 	CU_ASSERT(shared_resource->nomem_threshold == (AVAIL - NOMEM_THRESHOLD_COUNT));
988 
989 	/*
990 	 * Complete 1 I/O only.  The key check here is bdev_io_tailq_cnt - this should not have
991 	 *  changed since completing just 1 I/O should not trigger retrying the queued nomem_io
992 	 *  list.
993 	 */
994 	stub_complete_io(g_bdev.io_target, 1);
995 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
996 
997 	/*
998 	 * Complete enough I/O to hit the nomem_theshold.  This should trigger retrying nomem_io,
999 	 *  and we should see I/O get resubmitted to the test bdev module.
1000 	 */
1001 	stub_complete_io(g_bdev.io_target, NOMEM_THRESHOLD_COUNT - 1);
1002 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) < nomem_cnt);
1003 	nomem_cnt = bdev_io_tailq_cnt(&shared_resource->nomem_io);
1004 
1005 	/* Complete 1 I/O only.  This should not trigger retrying the queued nomem_io. */
1006 	stub_complete_io(g_bdev.io_target, 1);
1007 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == nomem_cnt);
1008 
1009 	/*
1010 	 * Send a reset and confirm that all I/O are completed, including the ones that
1011 	 *  were queued on the nomem_io list.
1012 	 */
1013 	status_reset = SPDK_BDEV_IO_STATUS_PENDING;
1014 	rc = spdk_bdev_reset(g_desc, io_ch, enomem_done, &status_reset);
1015 	poll_threads();
1016 	CU_ASSERT(rc == 0);
1017 	/* This will complete the reset. */
1018 	stub_complete_io(g_bdev.io_target, 0);
1019 
1020 	CU_ASSERT(bdev_io_tailq_cnt(&shared_resource->nomem_io) == 0);
1021 	CU_ASSERT(shared_resource->io_outstanding == 0);
1022 
1023 	spdk_put_io_channel(io_ch);
1024 	poll_threads();
1025 	teardown_test();
1026 }
1027 
1028 static void
1029 enomem_multi_bdev(void)
1030 {
1031 	struct spdk_io_channel *io_ch;
1032 	struct spdk_bdev_channel *bdev_ch;
1033 	struct spdk_bdev_shared_resource *shared_resource;
1034 	struct ut_bdev_channel *ut_ch;
1035 	const uint32_t IO_ARRAY_SIZE = 64;
1036 	const uint32_t AVAIL = 20;
1037 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1038 	uint32_t i;
1039 	struct ut_bdev *second_bdev;
1040 	struct spdk_bdev_desc *second_desc = NULL;
1041 	struct spdk_bdev_channel *second_bdev_ch;
1042 	struct spdk_io_channel *second_ch;
1043 	int rc;
1044 
1045 	setup_test();
1046 
1047 	/* Register second bdev with the same io_target  */
1048 	second_bdev = calloc(1, sizeof(*second_bdev));
1049 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1050 	register_bdev(second_bdev, "ut_bdev2", g_bdev.io_target);
1051 	spdk_bdev_open(&second_bdev->bdev, true, NULL, NULL, &second_desc);
1052 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1053 
1054 	set_thread(0);
1055 	io_ch = spdk_bdev_get_io_channel(g_desc);
1056 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1057 	shared_resource = bdev_ch->shared_resource;
1058 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1059 	ut_ch->avail_cnt = AVAIL;
1060 
1061 	second_ch = spdk_bdev_get_io_channel(second_desc);
1062 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1063 	SPDK_CU_ASSERT_FATAL(shared_resource == second_bdev_ch->shared_resource);
1064 
1065 	/* Saturate io_target through bdev A. */
1066 	for (i = 0; i < AVAIL; i++) {
1067 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1068 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1069 		CU_ASSERT(rc == 0);
1070 	}
1071 	CU_ASSERT(TAILQ_EMPTY(&shared_resource->nomem_io));
1072 
1073 	/*
1074 	 * Now submit I/O through the second bdev. This should fail with ENOMEM
1075 	 * and then go onto the nomem_io list.
1076 	 */
1077 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1078 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1079 	CU_ASSERT(rc == 0);
1080 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&shared_resource->nomem_io));
1081 
1082 	/* Complete first bdev's I/O. This should retry sending second bdev's nomem_io */
1083 	stub_complete_io(g_bdev.io_target, AVAIL);
1084 
1085 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&shared_resource->nomem_io));
1086 	CU_ASSERT(shared_resource->io_outstanding == 1);
1087 
1088 	/* Now complete our retried I/O  */
1089 	stub_complete_io(g_bdev.io_target, 1);
1090 	SPDK_CU_ASSERT_FATAL(shared_resource->io_outstanding == 0);
1091 
1092 	spdk_put_io_channel(io_ch);
1093 	spdk_put_io_channel(second_ch);
1094 	spdk_bdev_close(second_desc);
1095 	unregister_bdev(second_bdev);
1096 	poll_threads();
1097 	free(second_bdev);
1098 	teardown_test();
1099 }
1100 
1101 
1102 static void
1103 enomem_multi_io_target(void)
1104 {
1105 	struct spdk_io_channel *io_ch;
1106 	struct spdk_bdev_channel *bdev_ch;
1107 	struct ut_bdev_channel *ut_ch;
1108 	const uint32_t IO_ARRAY_SIZE = 64;
1109 	const uint32_t AVAIL = 20;
1110 	enum spdk_bdev_io_status status[IO_ARRAY_SIZE];
1111 	uint32_t i;
1112 	int new_io_device;
1113 	struct ut_bdev *second_bdev;
1114 	struct spdk_bdev_desc *second_desc = NULL;
1115 	struct spdk_bdev_channel *second_bdev_ch;
1116 	struct spdk_io_channel *second_ch;
1117 	int rc;
1118 
1119 	setup_test();
1120 
1121 	/* Create new io_target and a second bdev using it */
1122 	spdk_io_device_register(&new_io_device, stub_create_ch, stub_destroy_ch,
1123 				sizeof(struct ut_bdev_channel), NULL);
1124 	second_bdev = calloc(1, sizeof(*second_bdev));
1125 	SPDK_CU_ASSERT_FATAL(second_bdev != NULL);
1126 	register_bdev(second_bdev, "ut_bdev2", &new_io_device);
1127 	spdk_bdev_open(&second_bdev->bdev, true, NULL, NULL, &second_desc);
1128 	SPDK_CU_ASSERT_FATAL(second_desc != NULL);
1129 
1130 	set_thread(0);
1131 	io_ch = spdk_bdev_get_io_channel(g_desc);
1132 	bdev_ch = spdk_io_channel_get_ctx(io_ch);
1133 	ut_ch = spdk_io_channel_get_ctx(bdev_ch->channel);
1134 	ut_ch->avail_cnt = AVAIL;
1135 
1136 	/* Different io_target should imply a different shared_resource */
1137 	second_ch = spdk_bdev_get_io_channel(second_desc);
1138 	second_bdev_ch = spdk_io_channel_get_ctx(second_ch);
1139 	SPDK_CU_ASSERT_FATAL(bdev_ch->shared_resource != second_bdev_ch->shared_resource);
1140 
1141 	/* Saturate io_target through bdev A. */
1142 	for (i = 0; i < AVAIL; i++) {
1143 		status[i] = SPDK_BDEV_IO_STATUS_PENDING;
1144 		rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[i]);
1145 		CU_ASSERT(rc == 0);
1146 	}
1147 	CU_ASSERT(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1148 
1149 	/* Issue one more I/O to fill ENOMEM list. */
1150 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1151 	rc = spdk_bdev_read_blocks(g_desc, io_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1152 	CU_ASSERT(rc == 0);
1153 	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1154 
1155 	/*
1156 	 * Now submit I/O through the second bdev. This should go through and complete
1157 	 * successfully because we're using a different io_device underneath.
1158 	 */
1159 	status[AVAIL] = SPDK_BDEV_IO_STATUS_PENDING;
1160 	rc = spdk_bdev_read_blocks(second_desc, second_ch, NULL, 0, 1, enomem_done, &status[AVAIL]);
1161 	CU_ASSERT(rc == 0);
1162 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&second_bdev_ch->shared_resource->nomem_io));
1163 	stub_complete_io(second_bdev->io_target, 1);
1164 
1165 	/* Cleanup; Complete outstanding I/O. */
1166 	stub_complete_io(g_bdev.io_target, AVAIL);
1167 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1168 	/* Complete the ENOMEM I/O */
1169 	stub_complete_io(g_bdev.io_target, 1);
1170 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1171 
1172 	SPDK_CU_ASSERT_FATAL(TAILQ_EMPTY(&bdev_ch->shared_resource->nomem_io));
1173 	CU_ASSERT(bdev_ch->shared_resource->io_outstanding == 0);
1174 	spdk_put_io_channel(io_ch);
1175 	spdk_put_io_channel(second_ch);
1176 	spdk_bdev_close(second_desc);
1177 	unregister_bdev(second_bdev);
1178 	spdk_io_device_unregister(&new_io_device, NULL);
1179 	poll_threads();
1180 	free(second_bdev);
1181 	teardown_test();
1182 }
1183 
1184 static void
1185 qos_dynamic_enable_done(void *cb_arg, int status)
1186 {
1187 	int *rc = cb_arg;
1188 	*rc = status;
1189 }
1190 
1191 static void
1192 qos_dynamic_enable(void)
1193 {
1194 	struct spdk_io_channel *io_ch[2];
1195 	struct spdk_bdev_channel *bdev_ch[2];
1196 	struct spdk_bdev *bdev;
1197 	enum spdk_bdev_io_status bdev_io_status[2];
1198 	uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
1199 	int status, second_status, rc, i;
1200 
1201 	setup_test();
1202 	MOCK_SET(spdk_get_ticks, 0);
1203 
1204 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1205 		limits[i] = UINT64_MAX;
1206 	}
1207 
1208 	bdev = &g_bdev.bdev;
1209 
1210 	g_get_io_channel = true;
1211 
1212 	/* Create channels */
1213 	set_thread(0);
1214 	io_ch[0] = spdk_bdev_get_io_channel(g_desc);
1215 	bdev_ch[0] = spdk_io_channel_get_ctx(io_ch[0]);
1216 	CU_ASSERT(bdev_ch[0]->flags == 0);
1217 
1218 	set_thread(1);
1219 	io_ch[1] = spdk_bdev_get_io_channel(g_desc);
1220 	bdev_ch[1] = spdk_io_channel_get_ctx(io_ch[1]);
1221 	CU_ASSERT(bdev_ch[1]->flags == 0);
1222 
1223 	set_thread(0);
1224 
1225 	/*
1226 	 * Enable QoS: Read/Write IOPS, Read/Write byte,
1227 	 * Read only byte and Write only byte per second
1228 	 * rate limits.
1229 	 * More than 10 I/Os allowed per timeslice.
1230 	 */
1231 	status = -1;
1232 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1233 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 100;
1234 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 100;
1235 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 10;
1236 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1237 	poll_threads();
1238 	CU_ASSERT(status == 0);
1239 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1240 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1241 
1242 	/*
1243 	 * Submit and complete 10 I/O to fill the QoS allotment for this timeslice.
1244 	 * Additional I/O will then be queued.
1245 	 */
1246 	set_thread(0);
1247 	for (i = 0; i < 10; i++) {
1248 		bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1249 		rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1250 		CU_ASSERT(rc == 0);
1251 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1252 		poll_thread(0);
1253 		stub_complete_io(g_bdev.io_target, 0);
1254 		CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1255 	}
1256 
1257 	/*
1258 	 * Send two more I/O.  These I/O will be queued since the current timeslice allotment has been
1259 	 * filled already.  We want to test that when QoS is disabled that these two I/O:
1260 	 *  1) are not aborted
1261 	 *  2) are sent back to their original thread for resubmission
1262 	 */
1263 	bdev_io_status[0] = SPDK_BDEV_IO_STATUS_PENDING;
1264 	rc = spdk_bdev_read_blocks(g_desc, io_ch[0], NULL, 0, 1, io_during_io_done, &bdev_io_status[0]);
1265 	CU_ASSERT(rc == 0);
1266 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_PENDING);
1267 	set_thread(1);
1268 	bdev_io_status[1] = SPDK_BDEV_IO_STATUS_PENDING;
1269 	rc = spdk_bdev_read_blocks(g_desc, io_ch[1], NULL, 0, 1, io_during_io_done, &bdev_io_status[1]);
1270 	CU_ASSERT(rc == 0);
1271 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1272 	poll_threads();
1273 
1274 	/*
1275 	 * Disable QoS: Read/Write IOPS, Read/Write byte,
1276 	 * Read only byte rate limits
1277 	 */
1278 	status = -1;
1279 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1280 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 0;
1281 	limits[SPDK_BDEV_QOS_R_BPS_RATE_LIMIT] = 0;
1282 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1283 	poll_threads();
1284 	CU_ASSERT(status == 0);
1285 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1286 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1287 
1288 	/* Disable QoS: Write only Byte per second rate limit */
1289 	status = -1;
1290 	limits[SPDK_BDEV_QOS_W_BPS_RATE_LIMIT] = 0;
1291 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1292 	poll_threads();
1293 	CU_ASSERT(status == 0);
1294 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1295 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1296 
1297 	/*
1298 	 * All I/O should have been resubmitted back on their original thread.  Complete
1299 	 *  all I/O on thread 0, and ensure that only the thread 0 I/O was completed.
1300 	 */
1301 	set_thread(0);
1302 	stub_complete_io(g_bdev.io_target, 0);
1303 	poll_threads();
1304 	CU_ASSERT(bdev_io_status[0] == SPDK_BDEV_IO_STATUS_SUCCESS);
1305 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_PENDING);
1306 
1307 	/* Now complete all I/O on thread 1 and ensure the thread 1 I/O was completed. */
1308 	set_thread(1);
1309 	stub_complete_io(g_bdev.io_target, 0);
1310 	poll_threads();
1311 	CU_ASSERT(bdev_io_status[1] == SPDK_BDEV_IO_STATUS_SUCCESS);
1312 
1313 	/* Disable QoS again */
1314 	status = -1;
1315 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1316 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1317 	poll_threads();
1318 	CU_ASSERT(status == 0); /* This should succeed */
1319 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1320 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1321 
1322 	/* Enable QoS on thread 0 */
1323 	status = -1;
1324 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1325 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1326 	poll_threads();
1327 	CU_ASSERT(status == 0);
1328 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1329 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1330 
1331 	/* Disable QoS on thread 1 */
1332 	set_thread(1);
1333 	status = -1;
1334 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 0;
1335 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1336 	/* Don't poll yet. This should leave the channels with QoS enabled */
1337 	CU_ASSERT(status == -1);
1338 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1339 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1340 
1341 	/* Enable QoS. This should immediately fail because the previous disable QoS hasn't completed. */
1342 	second_status = 0;
1343 	limits[SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT] = 10;
1344 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &second_status);
1345 	poll_threads();
1346 	CU_ASSERT(status == 0); /* The disable should succeed */
1347 	CU_ASSERT(second_status < 0); /* The enable should fail */
1348 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) == 0);
1349 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) == 0);
1350 
1351 	/* Enable QoS on thread 1. This should succeed now that the disable has completed. */
1352 	status = -1;
1353 	limits[SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT] = 10000;
1354 	spdk_bdev_set_qos_rate_limits(bdev, limits, qos_dynamic_enable_done, &status);
1355 	poll_threads();
1356 	CU_ASSERT(status == 0);
1357 	CU_ASSERT((bdev_ch[0]->flags & BDEV_CH_QOS_ENABLED) != 0);
1358 	CU_ASSERT((bdev_ch[1]->flags & BDEV_CH_QOS_ENABLED) != 0);
1359 
1360 	/* Tear down the channels */
1361 	set_thread(0);
1362 	spdk_put_io_channel(io_ch[0]);
1363 	set_thread(1);
1364 	spdk_put_io_channel(io_ch[1]);
1365 	poll_threads();
1366 
1367 	set_thread(0);
1368 	teardown_test();
1369 }
1370 
1371 static void
1372 histogram_status_cb(void *cb_arg, int status)
1373 {
1374 	g_status = status;
1375 }
1376 
1377 static void
1378 histogram_data_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
1379 {
1380 	g_status = status;
1381 	g_histogram = histogram;
1382 }
1383 
1384 static void
1385 histogram_io_count(void *ctx, uint64_t start, uint64_t end, uint64_t count,
1386 		   uint64_t total, uint64_t so_far)
1387 {
1388 	g_count += count;
1389 }
1390 
1391 static void
1392 bdev_histograms_mt(void)
1393 {
1394 	struct spdk_io_channel *ch[2];
1395 	struct spdk_histogram_data *histogram;
1396 	uint8_t buf[4096];
1397 	int status = false;
1398 	int rc;
1399 
1400 
1401 	setup_test();
1402 
1403 	set_thread(0);
1404 	ch[0] = spdk_bdev_get_io_channel(g_desc);
1405 	CU_ASSERT(ch[0] != NULL);
1406 
1407 	set_thread(1);
1408 	ch[1] = spdk_bdev_get_io_channel(g_desc);
1409 	CU_ASSERT(ch[1] != NULL);
1410 
1411 
1412 	/* Enable histogram */
1413 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, true);
1414 	poll_threads();
1415 	CU_ASSERT(g_status == 0);
1416 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1417 
1418 	/* Allocate histogram */
1419 	histogram = spdk_histogram_data_alloc();
1420 
1421 	/* Check if histogram is zeroed */
1422 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1423 	poll_threads();
1424 	CU_ASSERT(g_status == 0);
1425 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1426 
1427 	g_count = 0;
1428 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1429 
1430 	CU_ASSERT(g_count == 0);
1431 
1432 	set_thread(0);
1433 	rc = spdk_bdev_write_blocks(g_desc, ch[0], &buf, 0, 1, io_during_io_done, &status);
1434 	CU_ASSERT(rc == 0);
1435 
1436 	spdk_delay_us(10);
1437 	stub_complete_io(g_bdev.io_target, 1);
1438 	poll_threads();
1439 	CU_ASSERT(status == true);
1440 
1441 
1442 	set_thread(1);
1443 	rc = spdk_bdev_read_blocks(g_desc, ch[1], &buf, 0, 1, io_during_io_done, &status);
1444 	CU_ASSERT(rc == 0);
1445 
1446 	spdk_delay_us(10);
1447 	stub_complete_io(g_bdev.io_target, 1);
1448 	poll_threads();
1449 	CU_ASSERT(status == true);
1450 
1451 	set_thread(0);
1452 
1453 	/* Check if histogram gathered data from all I/O channels */
1454 	spdk_bdev_histogram_get(&g_bdev.bdev, histogram, histogram_data_cb, NULL);
1455 	poll_threads();
1456 	CU_ASSERT(g_status == 0);
1457 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == true);
1458 	SPDK_CU_ASSERT_FATAL(g_histogram != NULL);
1459 
1460 	g_count = 0;
1461 	spdk_histogram_data_iterate(g_histogram, histogram_io_count, NULL);
1462 	CU_ASSERT(g_count == 2);
1463 
1464 	/* Disable histogram */
1465 	spdk_bdev_histogram_enable(&g_bdev.bdev, histogram_status_cb, NULL, false);
1466 	poll_threads();
1467 	CU_ASSERT(g_status == 0);
1468 	CU_ASSERT(g_bdev.bdev.internal.histogram_enabled == false);
1469 
1470 	spdk_histogram_data_free(g_histogram);
1471 }
1472 
1473 int
1474 main(int argc, char **argv)
1475 {
1476 	CU_pSuite	suite = NULL;
1477 	unsigned int	num_failures;
1478 
1479 	if (CU_initialize_registry() != CUE_SUCCESS) {
1480 		return CU_get_error();
1481 	}
1482 
1483 	suite = CU_add_suite("bdev", NULL, NULL);
1484 	if (suite == NULL) {
1485 		CU_cleanup_registry();
1486 		return CU_get_error();
1487 	}
1488 
1489 	if (
1490 		CU_add_test(suite, "basic", basic) == NULL ||
1491 		CU_add_test(suite, "unregister_and_close", unregister_and_close) == NULL ||
1492 		CU_add_test(suite, "basic_qos", basic_qos) == NULL ||
1493 		CU_add_test(suite, "put_channel_during_reset", put_channel_during_reset) == NULL ||
1494 		CU_add_test(suite, "aborted_reset", aborted_reset) == NULL ||
1495 		CU_add_test(suite, "io_during_reset", io_during_reset) == NULL ||
1496 		CU_add_test(suite, "io_during_qos_queue", io_during_qos_queue) == NULL ||
1497 		CU_add_test(suite, "io_during_qos_reset", io_during_qos_reset) == NULL ||
1498 		CU_add_test(suite, "enomem", enomem) == NULL ||
1499 		CU_add_test(suite, "enomem_multi_bdev", enomem_multi_bdev) == NULL ||
1500 		CU_add_test(suite, "enomem_multi_io_target", enomem_multi_io_target) == NULL ||
1501 		CU_add_test(suite, "qos_dynamic_enable", qos_dynamic_enable) == NULL ||
1502 		CU_add_test(suite, "bdev_histograms_mt", bdev_histograms_mt) == NULL
1503 	) {
1504 		CU_cleanup_registry();
1505 		return CU_get_error();
1506 	}
1507 
1508 	CU_basic_set_mode(CU_BRM_VERBOSE);
1509 	CU_basic_run_tests();
1510 	num_failures = CU_get_number_of_failures();
1511 	CU_cleanup_registry();
1512 	return num_failures;
1513 }
1514