xref: /spdk/lib/bdev/bdev.c (revision 22898a91b9b6f289933db19b0175821cfb7e7820)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 #include "spdk/conf.h"
39 
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/io_channel.h"
43 #include "spdk/likely.h"
44 #include "spdk/queue.h"
45 #include "spdk/nvme_spec.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/util.h"
48 
49 #include "spdk_internal/bdev.h"
50 #include "spdk_internal/log.h"
51 #include "spdk/string.h"
52 
53 #ifdef SPDK_CONFIG_VTUNE
54 #include "ittnotify.h"
55 #include "ittnotify_types.h"
56 int __itt_init_ittlib(const char *, __itt_group_id);
57 #endif
58 
59 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
60 #define SPDK_BDEV_IO_CACHE_SIZE			256
61 #define BUF_SMALL_POOL_SIZE			8192
62 #define BUF_LARGE_POOL_SIZE			1024
63 #define NOMEM_THRESHOLD_COUNT			8
64 #define ZERO_BUFFER_SIZE			0x100000
65 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
66 #define SPDK_BDEV_SEC_TO_USEC			1000000ULL
67 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
68 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
69 
70 struct spdk_bdev_mgr {
71 	struct spdk_mempool *bdev_io_pool;
72 
73 	struct spdk_mempool *buf_small_pool;
74 	struct spdk_mempool *buf_large_pool;
75 
76 	void *zero_buffer;
77 
78 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
79 
80 	TAILQ_HEAD(, spdk_bdev) bdevs;
81 
82 	bool init_complete;
83 	bool module_init_complete;
84 
85 #ifdef SPDK_CONFIG_VTUNE
86 	__itt_domain	*domain;
87 #endif
88 };
89 
90 static struct spdk_bdev_mgr g_bdev_mgr = {
91 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
92 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
93 	.init_complete = false,
94 	.module_init_complete = false,
95 };
96 
97 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
98 static void			*g_init_cb_arg = NULL;
99 
100 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
101 static void			*g_fini_cb_arg = NULL;
102 static struct spdk_thread	*g_fini_thread = NULL;
103 
104 struct spdk_bdev_mgmt_channel {
105 	bdev_io_stailq_t need_buf_small;
106 	bdev_io_stailq_t need_buf_large;
107 
108 	/*
109 	 * Each thread keeps a cache of bdev_io - this allows
110 	 *  bdev threads which are *not* DPDK threads to still
111 	 *  benefit from a per-thread bdev_io cache.  Without
112 	 *  this, non-DPDK threads fetching from the mempool
113 	 *  incur a cmpxchg on get and put.
114 	 */
115 	bdev_io_stailq_t per_thread_cache;
116 	uint32_t	per_thread_cache_count;
117 };
118 
119 /*
120  * Per-module (or per-io_device) channel. Multiple bdevs built on the same io_device
121  * will queue here their IO that awaits retry. It makes it posible to retry sending
122  * IO to one bdev after IO from other bdev completes.
123  */
124 struct spdk_bdev_module_channel {
125 
126 	/* The bdev management channel */
127 	struct spdk_bdev_mgmt_channel *mgmt_ch;
128 
129 	/*
130 	 * Count of I/O submitted to bdev module and waiting for completion.
131 	 * Incremented before submit_request() is called on an spdk_bdev_io.
132 	 */
133 	uint64_t		io_outstanding;
134 
135 	/*
136 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
137 	 *  on this channel.
138 	 */
139 	bdev_io_tailq_t		nomem_io;
140 
141 	/*
142 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
143 	 */
144 	uint64_t		nomem_threshold;
145 
146 	TAILQ_ENTRY(spdk_bdev_module_channel) link;
147 };
148 
149 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
150 #define BDEV_CH_QOS_ENABLED		(1 << 1)
151 
152 struct spdk_bdev_channel {
153 	struct spdk_bdev	*bdev;
154 
155 	/* The channel for the underlying device */
156 	struct spdk_io_channel	*channel;
157 
158 	/* Channel for the bdev module */
159 	struct spdk_bdev_module_channel	*module_ch;
160 
161 	struct spdk_bdev_io_stat stat;
162 
163 	/*
164 	 * Count of I/O submitted through this channel and waiting for completion.
165 	 * Incremented before submit_request() is called on an spdk_bdev_io.
166 	 */
167 	uint64_t		io_outstanding;
168 
169 	bdev_io_tailq_t		queued_resets;
170 
171 	uint32_t		flags;
172 
173 #ifdef SPDK_CONFIG_VTUNE
174 	uint64_t		start_tsc;
175 	uint64_t		interval_tsc;
176 	__itt_string_handle	*handle;
177 #endif
178 
179 };
180 
181 struct spdk_bdev_desc {
182 	struct spdk_bdev		*bdev;
183 	spdk_bdev_remove_cb_t		remove_cb;
184 	void				*remove_ctx;
185 	bool				write;
186 	TAILQ_ENTRY(spdk_bdev_desc)	link;
187 };
188 
189 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
190 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
191 
192 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
193 
194 struct spdk_bdev *
195 spdk_bdev_first(void)
196 {
197 	struct spdk_bdev *bdev;
198 
199 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
200 	if (bdev) {
201 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
202 	}
203 
204 	return bdev;
205 }
206 
207 struct spdk_bdev *
208 spdk_bdev_next(struct spdk_bdev *prev)
209 {
210 	struct spdk_bdev *bdev;
211 
212 	bdev = TAILQ_NEXT(prev, link);
213 	if (bdev) {
214 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
215 	}
216 
217 	return bdev;
218 }
219 
220 static struct spdk_bdev *
221 _bdev_next_leaf(struct spdk_bdev *bdev)
222 {
223 	while (bdev != NULL) {
224 		if (bdev->claim_module == NULL) {
225 			return bdev;
226 		} else {
227 			bdev = TAILQ_NEXT(bdev, link);
228 		}
229 	}
230 
231 	return bdev;
232 }
233 
234 struct spdk_bdev *
235 spdk_bdev_first_leaf(void)
236 {
237 	struct spdk_bdev *bdev;
238 
239 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
240 
241 	if (bdev) {
242 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
243 	}
244 
245 	return bdev;
246 }
247 
248 struct spdk_bdev *
249 spdk_bdev_next_leaf(struct spdk_bdev *prev)
250 {
251 	struct spdk_bdev *bdev;
252 
253 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
254 
255 	if (bdev) {
256 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
257 	}
258 
259 	return bdev;
260 }
261 
262 struct spdk_bdev *
263 spdk_bdev_get_by_name(const char *bdev_name)
264 {
265 	struct spdk_bdev_alias *tmp;
266 	struct spdk_bdev *bdev = spdk_bdev_first();
267 
268 	while (bdev != NULL) {
269 		if (strcmp(bdev_name, bdev->name) == 0) {
270 			return bdev;
271 		}
272 
273 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
274 			if (strcmp(bdev_name, tmp->alias) == 0) {
275 				return bdev;
276 			}
277 		}
278 
279 		bdev = spdk_bdev_next(bdev);
280 	}
281 
282 	return NULL;
283 }
284 
285 static void
286 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
287 {
288 	assert(bdev_io->get_buf_cb != NULL);
289 	assert(buf != NULL);
290 	assert(bdev_io->u.bdev.iovs != NULL);
291 
292 	bdev_io->buf = buf;
293 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
294 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
295 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
296 }
297 
298 static void
299 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
300 {
301 	struct spdk_mempool *pool;
302 	struct spdk_bdev_io *tmp;
303 	void *buf;
304 	bdev_io_stailq_t *stailq;
305 	struct spdk_bdev_mgmt_channel *ch;
306 
307 	assert(bdev_io->u.bdev.iovcnt == 1);
308 
309 	buf = bdev_io->buf;
310 	ch = bdev_io->ch->module_ch->mgmt_ch;
311 
312 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
313 		pool = g_bdev_mgr.buf_small_pool;
314 		stailq = &ch->need_buf_small;
315 	} else {
316 		pool = g_bdev_mgr.buf_large_pool;
317 		stailq = &ch->need_buf_large;
318 	}
319 
320 	if (STAILQ_EMPTY(stailq)) {
321 		spdk_mempool_put(pool, buf);
322 	} else {
323 		tmp = STAILQ_FIRST(stailq);
324 		STAILQ_REMOVE_HEAD(stailq, buf_link);
325 		spdk_bdev_io_set_buf(tmp, buf);
326 	}
327 }
328 
329 void
330 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
331 {
332 	struct spdk_mempool *pool;
333 	bdev_io_stailq_t *stailq;
334 	void *buf = NULL;
335 	struct spdk_bdev_mgmt_channel *mgmt_ch;
336 
337 	assert(cb != NULL);
338 	assert(bdev_io->u.bdev.iovs != NULL);
339 
340 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
341 		/* Buffer already present */
342 		cb(bdev_io->ch->channel, bdev_io);
343 		return;
344 	}
345 
346 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
347 	mgmt_ch = bdev_io->ch->module_ch->mgmt_ch;
348 
349 	bdev_io->buf_len = len;
350 	bdev_io->get_buf_cb = cb;
351 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
352 		pool = g_bdev_mgr.buf_small_pool;
353 		stailq = &mgmt_ch->need_buf_small;
354 	} else {
355 		pool = g_bdev_mgr.buf_large_pool;
356 		stailq = &mgmt_ch->need_buf_large;
357 	}
358 
359 	buf = spdk_mempool_get(pool);
360 
361 	if (!buf) {
362 		STAILQ_INSERT_TAIL(stailq, bdev_io, buf_link);
363 	} else {
364 		spdk_bdev_io_set_buf(bdev_io, buf);
365 	}
366 }
367 
368 static int
369 spdk_bdev_module_get_max_ctx_size(void)
370 {
371 	struct spdk_bdev_module *bdev_module;
372 	int max_bdev_module_size = 0;
373 
374 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
375 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
376 			max_bdev_module_size = bdev_module->get_ctx_size();
377 		}
378 	}
379 
380 	return max_bdev_module_size;
381 }
382 
383 void
384 spdk_bdev_config_text(FILE *fp)
385 {
386 	struct spdk_bdev_module *bdev_module;
387 
388 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
389 		if (bdev_module->config_text) {
390 			bdev_module->config_text(fp);
391 		}
392 	}
393 }
394 
395 void
396 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
397 {
398 	struct spdk_bdev_module *bdev_module;
399 	struct spdk_bdev *bdev;
400 
401 	assert(w != NULL);
402 
403 	spdk_json_write_array_begin(w);
404 
405 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
406 		if (bdev_module->config_json) {
407 			bdev_module->config_json(w);
408 		}
409 	}
410 
411 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, link) {
412 		spdk_bdev_config_json(bdev, w);
413 	}
414 
415 	spdk_json_write_array_end(w);
416 }
417 
418 static int
419 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
420 {
421 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
422 
423 	STAILQ_INIT(&ch->need_buf_small);
424 	STAILQ_INIT(&ch->need_buf_large);
425 
426 	STAILQ_INIT(&ch->per_thread_cache);
427 	ch->per_thread_cache_count = 0;
428 
429 	return 0;
430 }
431 
432 static void
433 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
434 {
435 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
436 	struct spdk_bdev_io *bdev_io;
437 
438 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
439 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n");
440 	}
441 
442 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
443 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
444 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
445 		ch->per_thread_cache_count--;
446 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
447 	}
448 
449 	assert(ch->per_thread_cache_count == 0);
450 }
451 
452 static void
453 spdk_bdev_init_complete(int rc)
454 {
455 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
456 	void *cb_arg = g_init_cb_arg;
457 	struct spdk_bdev_module *m;
458 
459 	g_bdev_mgr.init_complete = true;
460 	g_init_cb_fn = NULL;
461 	g_init_cb_arg = NULL;
462 
463 	/*
464 	 * For modules that need to know when subsystem init is complete,
465 	 * inform them now.
466 	 */
467 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
468 		if (m->init_complete) {
469 			m->init_complete();
470 		}
471 	}
472 
473 	cb_fn(cb_arg, rc);
474 }
475 
476 static void
477 spdk_bdev_module_action_complete(void)
478 {
479 	struct spdk_bdev_module *m;
480 
481 	/*
482 	 * Don't finish bdev subsystem initialization if
483 	 * module pre-initialization is still in progress, or
484 	 * the subsystem been already initialized.
485 	 */
486 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
487 		return;
488 	}
489 
490 	/*
491 	 * Check all bdev modules for inits/examinations in progress. If any
492 	 * exist, return immediately since we cannot finish bdev subsystem
493 	 * initialization until all are completed.
494 	 */
495 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
496 		if (m->action_in_progress > 0) {
497 			return;
498 		}
499 	}
500 
501 	/*
502 	 * Modules already finished initialization - now that all
503 	 * the bdev modules have finished their asynchronous I/O
504 	 * processing, the entire bdev layer can be marked as complete.
505 	 */
506 	spdk_bdev_init_complete(0);
507 }
508 
509 static void
510 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
511 {
512 	assert(module->action_in_progress > 0);
513 	module->action_in_progress--;
514 	spdk_bdev_module_action_complete();
515 }
516 
517 void
518 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
519 {
520 	spdk_bdev_module_action_done(module);
521 }
522 
523 void
524 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
525 {
526 	spdk_bdev_module_action_done(module);
527 }
528 
529 static int
530 spdk_bdev_module_channel_create(void *io_device, void *ctx_buf)
531 {
532 	struct spdk_bdev_module_channel *ch = ctx_buf;
533 	struct spdk_io_channel *mgmt_ch;
534 
535 	ch->io_outstanding = 0;
536 	TAILQ_INIT(&ch->nomem_io);
537 	ch->nomem_threshold = 0;
538 
539 	mgmt_ch = spdk_get_io_channel(&g_bdev_mgr);
540 	if (!mgmt_ch) {
541 		return -1;
542 	}
543 
544 	ch->mgmt_ch = spdk_io_channel_get_ctx(mgmt_ch);
545 
546 	return 0;
547 }
548 
549 static void
550 spdk_bdev_module_channel_destroy(void *io_device, void *ctx_buf)
551 {
552 	struct spdk_bdev_module_channel *ch = ctx_buf;
553 
554 	assert(ch->io_outstanding == 0);
555 	assert(TAILQ_EMPTY(&ch->nomem_io));
556 
557 	spdk_put_io_channel(spdk_io_channel_from_ctx(ch->mgmt_ch));
558 }
559 
560 static int
561 spdk_bdev_modules_init(void)
562 {
563 	struct spdk_bdev_module *module;
564 	int rc = 0;
565 
566 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
567 		spdk_io_device_register(module,
568 					spdk_bdev_module_channel_create,
569 					spdk_bdev_module_channel_destroy,
570 					sizeof(struct spdk_bdev_module_channel));
571 		rc = module->module_init();
572 		if (rc != 0) {
573 			break;
574 		}
575 	}
576 
577 	g_bdev_mgr.module_init_complete = true;
578 	return rc;
579 }
580 void
581 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
582 {
583 	int cache_size;
584 	int rc = 0;
585 	char mempool_name[32];
586 
587 	assert(cb_fn != NULL);
588 
589 	g_init_cb_fn = cb_fn;
590 	g_init_cb_arg = cb_arg;
591 
592 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
593 
594 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
595 				  SPDK_BDEV_IO_POOL_SIZE,
596 				  sizeof(struct spdk_bdev_io) +
597 				  spdk_bdev_module_get_max_ctx_size(),
598 				  0,
599 				  SPDK_ENV_SOCKET_ID_ANY);
600 
601 	if (g_bdev_mgr.bdev_io_pool == NULL) {
602 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
603 		spdk_bdev_init_complete(-1);
604 		return;
605 	}
606 
607 	/**
608 	 * Ensure no more than half of the total buffers end up local caches, by
609 	 *   using spdk_env_get_core_count() to determine how many local caches we need
610 	 *   to account for.
611 	 */
612 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
613 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
614 
615 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
616 				    BUF_SMALL_POOL_SIZE,
617 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
618 				    cache_size,
619 				    SPDK_ENV_SOCKET_ID_ANY);
620 	if (!g_bdev_mgr.buf_small_pool) {
621 		SPDK_ERRLOG("create rbuf small pool failed\n");
622 		spdk_bdev_init_complete(-1);
623 		return;
624 	}
625 
626 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
627 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
628 
629 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
630 				    BUF_LARGE_POOL_SIZE,
631 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
632 				    cache_size,
633 				    SPDK_ENV_SOCKET_ID_ANY);
634 	if (!g_bdev_mgr.buf_large_pool) {
635 		SPDK_ERRLOG("create rbuf large pool failed\n");
636 		spdk_bdev_init_complete(-1);
637 		return;
638 	}
639 
640 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
641 				 NULL);
642 	if (!g_bdev_mgr.zero_buffer) {
643 		SPDK_ERRLOG("create bdev zero buffer failed\n");
644 		spdk_bdev_init_complete(-1);
645 		return;
646 	}
647 
648 #ifdef SPDK_CONFIG_VTUNE
649 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
650 #endif
651 
652 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
653 				spdk_bdev_mgmt_channel_destroy,
654 				sizeof(struct spdk_bdev_mgmt_channel));
655 
656 	rc = spdk_bdev_modules_init();
657 	if (rc != 0) {
658 		SPDK_ERRLOG("bdev modules init failed\n");
659 		spdk_bdev_init_complete(-1);
660 		return;
661 	}
662 
663 	spdk_bdev_module_action_complete();
664 }
665 
666 static void
667 spdk_bdev_mgr_unregister_cb(void *io_device)
668 {
669 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
670 
671 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
672 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
673 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
674 			    SPDK_BDEV_IO_POOL_SIZE);
675 	}
676 
677 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
678 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
679 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
680 			    BUF_SMALL_POOL_SIZE);
681 		assert(false);
682 	}
683 
684 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
685 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
686 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
687 			    BUF_LARGE_POOL_SIZE);
688 		assert(false);
689 	}
690 
691 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
692 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
693 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
694 	spdk_dma_free(g_bdev_mgr.zero_buffer);
695 
696 	cb_fn(g_fini_cb_arg);
697 	g_fini_cb_fn = NULL;
698 	g_fini_cb_arg = NULL;
699 }
700 
701 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
702 
703 static void
704 spdk_bdev_module_finish_iter(void *arg)
705 {
706 	struct spdk_bdev_module *bdev_module;
707 
708 	/* Start iterating from the last touched module */
709 	if (!g_resume_bdev_module) {
710 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
711 	} else {
712 		bdev_module = TAILQ_NEXT(g_resume_bdev_module, tailq);
713 	}
714 
715 	if (bdev_module) {
716 		/* Save our place so we can resume later. We must
717 		 * save the variable here, before calling module_fini()
718 		 * below, because in some cases the module may immediately
719 		 * call spdk_bdev_module_finish_done() and re-enter
720 		 * this function to continue iterating. */
721 		g_resume_bdev_module = bdev_module;
722 
723 		if (bdev_module->module_fini) {
724 			bdev_module->module_fini();
725 		}
726 
727 		if (!bdev_module->async_fini) {
728 			spdk_bdev_module_finish_done();
729 		}
730 
731 		return;
732 	}
733 
734 	g_resume_bdev_module = NULL;
735 
736 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
737 }
738 
739 static void
740 spdk_bdev_module_unregister_cb(void *io_device)
741 {
742 	if (spdk_get_thread() != g_fini_thread) {
743 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
744 	} else {
745 		spdk_bdev_module_finish_iter(NULL);
746 	}
747 }
748 
749 void
750 spdk_bdev_module_finish_done(void)
751 {
752 	spdk_io_device_unregister(g_resume_bdev_module, spdk_bdev_module_unregister_cb);
753 }
754 
755 static void
756 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
757 {
758 	struct spdk_bdev *bdev = cb_arg;
759 
760 	if (bdeverrno && bdev) {
761 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
762 			     bdev->name);
763 
764 		/*
765 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
766 		 *  bdev; try to continue by manually removing this bdev from the list and continue
767 		 *  with the next bdev in the list.
768 		 */
769 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
770 	}
771 
772 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
773 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
774 		/*
775 		 * Bdev module finish need to be deffered as we might be in the middle of some context
776 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
777 		 * after returning.
778 		 */
779 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
780 		return;
781 	}
782 
783 	/*
784 	 * Unregister the first bdev in the list.
785 	 *
786 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
787 	 *  calling the remove_cb of the descriptors first.
788 	 *
789 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
790 	 *  will be called again via the unregister completion callback to continue the cleanup
791 	 *  process with the next bdev.
792 	 */
793 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
794 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
795 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
796 }
797 
798 void
799 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
800 {
801 	assert(cb_fn != NULL);
802 
803 	g_fini_thread = spdk_get_thread();
804 
805 	g_fini_cb_fn = cb_fn;
806 	g_fini_cb_arg = cb_arg;
807 
808 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
809 }
810 
811 static struct spdk_bdev_io *
812 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
813 {
814 	struct spdk_bdev_mgmt_channel *ch = channel->module_ch->mgmt_ch;
815 	struct spdk_bdev_io *bdev_io;
816 
817 	if (ch->per_thread_cache_count > 0) {
818 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
819 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
820 		ch->per_thread_cache_count--;
821 	} else {
822 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
823 		if (!bdev_io) {
824 			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
825 			return NULL;
826 		}
827 	}
828 
829 	return bdev_io;
830 }
831 
832 static void
833 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
834 {
835 	struct spdk_bdev_mgmt_channel *ch = bdev_io->ch->module_ch->mgmt_ch;
836 
837 	if (bdev_io->buf != NULL) {
838 		spdk_bdev_io_put_buf(bdev_io);
839 	}
840 
841 	if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) {
842 		ch->per_thread_cache_count++;
843 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link);
844 	} else {
845 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
846 	}
847 }
848 
849 static void
850 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch)
851 {
852 	struct spdk_bdev_io		*bdev_io = NULL;
853 	struct spdk_bdev		*bdev = ch->bdev;
854 	struct spdk_bdev_qos		*qos = &bdev->qos;
855 	struct spdk_bdev_module_channel *module_ch = ch->module_ch;
856 
857 	while (!TAILQ_EMPTY(&qos->queued)) {
858 		if (qos->io_submitted_this_timeslice < qos->max_ios_per_timeslice) {
859 			bdev_io = TAILQ_FIRST(&qos->queued);
860 			TAILQ_REMOVE(&qos->queued, bdev_io, link);
861 			qos->io_submitted_this_timeslice++;
862 			ch->io_outstanding++;
863 			module_ch->io_outstanding++;
864 			bdev->fn_table->submit_request(ch->channel, bdev_io);
865 		} else {
866 			break;
867 		}
868 	}
869 }
870 
871 static void
872 _spdk_bdev_io_submit(void *ctx)
873 {
874 	struct spdk_bdev_io *bdev_io = ctx;
875 	struct spdk_bdev *bdev = bdev_io->bdev;
876 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
877 	struct spdk_io_channel *ch = bdev_ch->channel;
878 	struct spdk_bdev_module_channel	*module_ch = bdev_ch->module_ch;
879 
880 	bdev_io->submit_tsc = spdk_get_ticks();
881 	bdev_ch->io_outstanding++;
882 	module_ch->io_outstanding++;
883 	bdev_io->in_submit_request = true;
884 	if (spdk_likely(bdev_ch->flags == 0)) {
885 		if (spdk_likely(TAILQ_EMPTY(&module_ch->nomem_io))) {
886 			bdev->fn_table->submit_request(ch, bdev_io);
887 		} else {
888 			bdev_ch->io_outstanding--;
889 			module_ch->io_outstanding--;
890 			TAILQ_INSERT_TAIL(&module_ch->nomem_io, bdev_io, link);
891 		}
892 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
893 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
894 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
895 		bdev_ch->io_outstanding--;
896 		module_ch->io_outstanding--;
897 		TAILQ_INSERT_TAIL(&bdev->qos.queued, bdev_io, link);
898 		_spdk_bdev_qos_io_submit(bdev_ch);
899 	} else {
900 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
901 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
902 	}
903 	bdev_io->in_submit_request = false;
904 }
905 
906 static void
907 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
908 {
909 	struct spdk_bdev *bdev = bdev_io->bdev;
910 
911 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
912 
913 	if (bdev_io->ch->flags & BDEV_CH_QOS_ENABLED) {
914 		bdev_io->io_submit_ch = bdev_io->ch;
915 		bdev_io->ch = bdev->qos.ch;
916 		spdk_thread_send_msg(bdev->qos.thread, _spdk_bdev_io_submit, bdev_io);
917 	} else {
918 		_spdk_bdev_io_submit(bdev_io);
919 	}
920 }
921 
922 static void
923 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
924 {
925 	struct spdk_bdev *bdev = bdev_io->bdev;
926 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
927 	struct spdk_io_channel *ch = bdev_ch->channel;
928 
929 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
930 
931 	bdev_io->in_submit_request = true;
932 	bdev->fn_table->submit_request(ch, bdev_io);
933 	bdev_io->in_submit_request = false;
934 }
935 
936 static void
937 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
938 		  struct spdk_bdev *bdev, void *cb_arg,
939 		  spdk_bdev_io_completion_cb cb)
940 {
941 	bdev_io->bdev = bdev;
942 	bdev_io->caller_ctx = cb_arg;
943 	bdev_io->cb = cb;
944 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
945 	bdev_io->in_submit_request = false;
946 	bdev_io->buf = NULL;
947 	bdev_io->io_submit_ch = NULL;
948 }
949 
950 bool
951 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
952 {
953 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
954 }
955 
956 int
957 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
958 {
959 	if (bdev->fn_table->dump_info_json) {
960 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
961 	}
962 
963 	return 0;
964 }
965 
966 void
967 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
968 {
969 	assert(bdev != NULL);
970 	assert(w != NULL);
971 
972 	if (bdev->fn_table->write_config_json) {
973 		bdev->fn_table->write_config_json(bdev, w);
974 	} else {
975 		spdk_json_write_object_begin(w);
976 		spdk_json_write_named_string(w, "name", bdev->name);
977 		spdk_json_write_object_end(w);
978 	}
979 }
980 
981 static void
982 spdk_bdev_qos_update_max_ios_per_timeslice(struct spdk_bdev_qos *qos)
983 {
984 	uint64_t max_ios_per_timeslice = 0;
985 
986 	max_ios_per_timeslice = qos->rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
987 				SPDK_BDEV_SEC_TO_USEC;
988 	qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice,
989 					      SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
990 }
991 
992 static int
993 spdk_bdev_channel_poll_qos(void *arg)
994 {
995 	struct spdk_bdev_channel	*ch = arg;
996 
997 	/* Reset for next round of rate limiting */
998 	ch->bdev->qos.io_submitted_this_timeslice = 0;
999 
1000 	_spdk_bdev_qos_io_submit(ch);
1001 
1002 	return -1;
1003 }
1004 
1005 static int
1006 _spdk_bdev_channel_create(struct spdk_bdev_channel *ch, void *io_device)
1007 {
1008 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1009 
1010 	ch->bdev = bdev;
1011 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1012 	if (!ch->channel) {
1013 		return -1;
1014 	}
1015 
1016 	ch->module_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(bdev->module));
1017 
1018 	memset(&ch->stat, 0, sizeof(ch->stat));
1019 	ch->io_outstanding = 0;
1020 	TAILQ_INIT(&ch->queued_resets);
1021 	ch->flags = 0;
1022 
1023 	return 0;
1024 }
1025 
1026 static void
1027 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1028 {
1029 	if (!ch) {
1030 		return;
1031 	}
1032 
1033 	if (ch->channel) {
1034 		spdk_put_io_channel(ch->channel);
1035 	}
1036 
1037 	if (ch->module_ch) {
1038 		spdk_put_io_channel(spdk_io_channel_from_ctx(ch->module_ch));
1039 	}
1040 }
1041 
1042 /* Caller must hold bdev->mutex. */
1043 static int
1044 spdk_bdev_qos_channel_create(struct spdk_bdev *bdev)
1045 {
1046 	assert(bdev->qos.ch == NULL);
1047 	assert(bdev->qos.thread == NULL);
1048 
1049 	bdev->qos.ch = calloc(1, sizeof(struct spdk_bdev_channel));
1050 	if (!bdev->qos.ch) {
1051 		return -1;
1052 	}
1053 
1054 	bdev->qos.thread = spdk_get_thread();
1055 	if (!bdev->qos.thread) {
1056 		free(bdev->qos.ch);
1057 		bdev->qos.ch = NULL;
1058 		return -1;
1059 	}
1060 
1061 	if (_spdk_bdev_channel_create(bdev->qos.ch, __bdev_to_io_dev(bdev)) != 0) {
1062 		free(bdev->qos.ch);
1063 		bdev->qos.ch = NULL;
1064 		bdev->qos.thread = NULL;
1065 		return -1;
1066 	}
1067 
1068 	TAILQ_INIT(&bdev->qos.queued);
1069 
1070 	bdev->qos.ch->flags |= BDEV_CH_QOS_ENABLED;
1071 	spdk_bdev_qos_update_max_ios_per_timeslice(&bdev->qos);
1072 
1073 	bdev->qos.poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1074 						bdev->qos.ch,
1075 						SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1076 
1077 	return 0;
1078 }
1079 
1080 /* Caller must hold bdev->mutex */
1081 static int
1082 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1083 {
1084 	/* Rate limiting on this bdev enabled */
1085 	if (bdev->qos.enabled) {
1086 		if (bdev->qos.ch == NULL) {
1087 			if (spdk_bdev_qos_channel_create(bdev) != 0) {
1088 				return -1;
1089 			}
1090 		}
1091 		ch->flags |= BDEV_CH_QOS_ENABLED;
1092 	}
1093 
1094 	return 0;
1095 }
1096 
1097 static int
1098 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1099 {
1100 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1101 	struct spdk_bdev_channel	*ch = ctx_buf;
1102 
1103 	if (_spdk_bdev_channel_create(ch, io_device) != 0) {
1104 		_spdk_bdev_channel_destroy_resource(ch);
1105 		return -1;
1106 	}
1107 
1108 #ifdef SPDK_CONFIG_VTUNE
1109 	{
1110 		char *name;
1111 		__itt_init_ittlib(NULL, 0);
1112 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1113 		if (!name) {
1114 			_spdk_bdev_channel_destroy_resource(ch);
1115 			return -1;
1116 		}
1117 		ch->handle = __itt_string_handle_create(name);
1118 		free(name);
1119 		ch->start_tsc = spdk_get_ticks();
1120 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1121 	}
1122 #endif
1123 
1124 	pthread_mutex_lock(&bdev->mutex);
1125 
1126 	if (_spdk_bdev_enable_qos(bdev, ch)) {
1127 		_spdk_bdev_channel_destroy_resource(ch);
1128 		pthread_mutex_unlock(&bdev->mutex);
1129 		return -1;
1130 	}
1131 
1132 	bdev->channel_count++;
1133 
1134 	pthread_mutex_unlock(&bdev->mutex);
1135 
1136 	return 0;
1137 }
1138 
1139 /*
1140  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1141  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
1142  */
1143 static void
1144 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1145 {
1146 	bdev_io_stailq_t tmp;
1147 	struct spdk_bdev_io *bdev_io;
1148 
1149 	STAILQ_INIT(&tmp);
1150 
1151 	while (!STAILQ_EMPTY(queue)) {
1152 		bdev_io = STAILQ_FIRST(queue);
1153 		STAILQ_REMOVE_HEAD(queue, buf_link);
1154 		if (bdev_io->ch == ch) {
1155 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1156 		} else {
1157 			STAILQ_INSERT_TAIL(&tmp, bdev_io, buf_link);
1158 		}
1159 	}
1160 
1161 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1162 }
1163 
1164 /*
1165  * Abort I/O that are queued waiting for submission.  These types of I/O are
1166  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1167  */
1168 static void
1169 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1170 {
1171 	struct spdk_bdev_io *bdev_io, *tmp;
1172 
1173 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
1174 		if (bdev_io->ch == ch) {
1175 			TAILQ_REMOVE(queue, bdev_io, link);
1176 			/*
1177 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1178 			 *  been submitted to the bdev module.  Since in this case it
1179 			 *  hadn't, bump io_outstanding to account for the decrement
1180 			 *  that spdk_bdev_io_complete() will do.
1181 			 */
1182 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1183 				ch->io_outstanding++;
1184 				ch->module_ch->io_outstanding++;
1185 			}
1186 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1187 		}
1188 	}
1189 }
1190 
1191 static void
1192 _spdk_bdev_channel_destroy(struct spdk_bdev_channel *ch)
1193 {
1194 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1195 	struct spdk_bdev_module_channel	*module_ch = ch->module_ch;
1196 
1197 	mgmt_ch = module_ch->mgmt_ch;
1198 
1199 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1200 	_spdk_bdev_abort_queued_io(&module_ch->nomem_io, ch);
1201 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1202 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1203 
1204 	_spdk_bdev_channel_destroy_resource(ch);
1205 }
1206 
1207 struct qos_channel_destroy_ctx {
1208 	struct spdk_bdev_channel *qos_channel;
1209 	struct spdk_poller *poller;
1210 };
1211 
1212 static void
1213 spdk_bdev_qos_channel_destroy(void *cb_arg)
1214 {
1215 	struct qos_channel_destroy_ctx *ctx = cb_arg;
1216 
1217 	_spdk_bdev_channel_destroy(ctx->qos_channel);
1218 
1219 	spdk_poller_unregister(&ctx->poller);
1220 
1221 	free(ctx->qos_channel);
1222 	free(ctx);
1223 }
1224 
1225 static void
1226 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1227 {
1228 	struct spdk_bdev_channel	*ch = ctx_buf;
1229 	struct spdk_bdev		*bdev = ch->bdev;
1230 
1231 	_spdk_bdev_channel_destroy(ch);
1232 
1233 	pthread_mutex_lock(&bdev->mutex);
1234 	bdev->channel_count--;
1235 	if (bdev->channel_count == 0 && bdev->qos.enabled && bdev->qos.ch != NULL) {
1236 		struct qos_channel_destroy_ctx *ctx;
1237 
1238 		/* All I/O channels for this bdev have been destroyed - destroy the QoS channel. */
1239 
1240 		ctx = calloc(1, sizeof(*ctx));
1241 		if (!ctx) {
1242 			/* We can't stop the old QoS thread. Just leave it where it is. */
1243 			pthread_mutex_unlock(&bdev->mutex);
1244 			return;
1245 		}
1246 
1247 		ctx->qos_channel = bdev->qos.ch;
1248 		ctx->poller = bdev->qos.poller;
1249 
1250 		spdk_thread_send_msg(bdev->qos.thread, spdk_bdev_qos_channel_destroy,
1251 				     ctx);
1252 
1253 		/*
1254 		 * Set qos_channel to NULL within the critical section so that
1255 		 * if another channel is created, it will see qos_channel == NULL and
1256 		 * re-create the QoS channel even if the asynchronous qos_channel_destroy
1257 		 * isn't finished yet.
1258 		 */
1259 		bdev->qos.ch = NULL;
1260 		bdev->qos.thread = NULL;
1261 	}
1262 	pthread_mutex_unlock(&bdev->mutex);
1263 }
1264 
1265 int
1266 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1267 {
1268 	struct spdk_bdev_alias *tmp;
1269 
1270 	if (alias == NULL) {
1271 		SPDK_ERRLOG("Empty alias passed\n");
1272 		return -EINVAL;
1273 	}
1274 
1275 	if (spdk_bdev_get_by_name(alias)) {
1276 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1277 		return -EEXIST;
1278 	}
1279 
1280 	tmp = calloc(1, sizeof(*tmp));
1281 	if (tmp == NULL) {
1282 		SPDK_ERRLOG("Unable to allocate alias\n");
1283 		return -ENOMEM;
1284 	}
1285 
1286 	tmp->alias = strdup(alias);
1287 	if (tmp->alias == NULL) {
1288 		free(tmp);
1289 		SPDK_ERRLOG("Unable to allocate alias\n");
1290 		return -ENOMEM;
1291 	}
1292 
1293 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1294 
1295 	return 0;
1296 }
1297 
1298 int
1299 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1300 {
1301 	struct spdk_bdev_alias *tmp;
1302 
1303 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1304 		if (strcmp(alias, tmp->alias) == 0) {
1305 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1306 			free(tmp->alias);
1307 			free(tmp);
1308 			return 0;
1309 		}
1310 	}
1311 
1312 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1313 
1314 	return -ENOENT;
1315 }
1316 
1317 struct spdk_io_channel *
1318 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1319 {
1320 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1321 }
1322 
1323 const char *
1324 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1325 {
1326 	return bdev->name;
1327 }
1328 
1329 const char *
1330 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1331 {
1332 	return bdev->product_name;
1333 }
1334 
1335 const struct spdk_bdev_aliases_list *
1336 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1337 {
1338 	return &bdev->aliases;
1339 }
1340 
1341 uint32_t
1342 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1343 {
1344 	return bdev->blocklen;
1345 }
1346 
1347 uint64_t
1348 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1349 {
1350 	return bdev->blockcnt;
1351 }
1352 
1353 uint64_t
1354 spdk_bdev_get_qos_ios_per_sec(const struct spdk_bdev *bdev)
1355 {
1356 	return bdev->qos.rate_limit;
1357 }
1358 
1359 size_t
1360 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1361 {
1362 	/* TODO: push this logic down to the bdev modules */
1363 	if (bdev->need_aligned_buffer) {
1364 		return bdev->blocklen;
1365 	}
1366 
1367 	return 1;
1368 }
1369 
1370 uint32_t
1371 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1372 {
1373 	return bdev->optimal_io_boundary;
1374 }
1375 
1376 bool
1377 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1378 {
1379 	return bdev->write_cache;
1380 }
1381 
1382 const struct spdk_uuid *
1383 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1384 {
1385 	return &bdev->uuid;
1386 }
1387 
1388 int
1389 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1390 {
1391 	int ret;
1392 
1393 	pthread_mutex_lock(&bdev->mutex);
1394 
1395 	/* bdev has open descriptors */
1396 	if (!TAILQ_EMPTY(&bdev->open_descs) &&
1397 	    bdev->blockcnt > size) {
1398 		ret = -EBUSY;
1399 	} else {
1400 		bdev->blockcnt = size;
1401 		ret = 0;
1402 	}
1403 
1404 	pthread_mutex_unlock(&bdev->mutex);
1405 
1406 	return ret;
1407 }
1408 
1409 /*
1410  * Convert I/O offset and length from bytes to blocks.
1411  *
1412  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1413  */
1414 static uint64_t
1415 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1416 			  uint64_t num_bytes, uint64_t *num_blocks)
1417 {
1418 	uint32_t block_size = bdev->blocklen;
1419 
1420 	*offset_blocks = offset_bytes / block_size;
1421 	*num_blocks = num_bytes / block_size;
1422 
1423 	return (offset_bytes % block_size) | (num_bytes % block_size);
1424 }
1425 
1426 static bool
1427 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1428 {
1429 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1430 	 * has been an overflow and hence the offset has been wrapped around */
1431 	if (offset_blocks + num_blocks < offset_blocks) {
1432 		return false;
1433 	}
1434 
1435 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1436 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1437 		return false;
1438 	}
1439 
1440 	return true;
1441 }
1442 
1443 int
1444 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1445 	       void *buf, uint64_t offset, uint64_t nbytes,
1446 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1447 {
1448 	uint64_t offset_blocks, num_blocks;
1449 
1450 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1451 		return -EINVAL;
1452 	}
1453 
1454 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1455 }
1456 
1457 int
1458 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1459 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1460 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1461 {
1462 	struct spdk_bdev *bdev = desc->bdev;
1463 	struct spdk_bdev_io *bdev_io;
1464 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1465 
1466 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1467 		return -EINVAL;
1468 	}
1469 
1470 	bdev_io = spdk_bdev_get_io(channel);
1471 	if (!bdev_io) {
1472 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1473 		return -ENOMEM;
1474 	}
1475 
1476 	bdev_io->ch = channel;
1477 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1478 	bdev_io->u.bdev.iov.iov_base = buf;
1479 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1480 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1481 	bdev_io->u.bdev.iovcnt = 1;
1482 	bdev_io->u.bdev.num_blocks = num_blocks;
1483 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1484 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1485 
1486 	spdk_bdev_io_submit(bdev_io);
1487 	return 0;
1488 }
1489 
1490 int
1491 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1492 		struct iovec *iov, int iovcnt,
1493 		uint64_t offset, uint64_t nbytes,
1494 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1495 {
1496 	uint64_t offset_blocks, num_blocks;
1497 
1498 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1499 		return -EINVAL;
1500 	}
1501 
1502 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1503 }
1504 
1505 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1506 			   struct iovec *iov, int iovcnt,
1507 			   uint64_t offset_blocks, uint64_t num_blocks,
1508 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1509 {
1510 	struct spdk_bdev *bdev = desc->bdev;
1511 	struct spdk_bdev_io *bdev_io;
1512 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1513 
1514 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1515 		return -EINVAL;
1516 	}
1517 
1518 	bdev_io = spdk_bdev_get_io(channel);
1519 	if (!bdev_io) {
1520 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1521 		return -ENOMEM;
1522 	}
1523 
1524 	bdev_io->ch = channel;
1525 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1526 	bdev_io->u.bdev.iovs = iov;
1527 	bdev_io->u.bdev.iovcnt = iovcnt;
1528 	bdev_io->u.bdev.num_blocks = num_blocks;
1529 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1530 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1531 
1532 	spdk_bdev_io_submit(bdev_io);
1533 	return 0;
1534 }
1535 
1536 int
1537 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1538 		void *buf, uint64_t offset, uint64_t nbytes,
1539 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1540 {
1541 	uint64_t offset_blocks, num_blocks;
1542 
1543 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1544 		return -EINVAL;
1545 	}
1546 
1547 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1548 }
1549 
1550 int
1551 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1552 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1553 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1554 {
1555 	struct spdk_bdev *bdev = desc->bdev;
1556 	struct spdk_bdev_io *bdev_io;
1557 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1558 
1559 	if (!desc->write) {
1560 		return -EBADF;
1561 	}
1562 
1563 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1564 		return -EINVAL;
1565 	}
1566 
1567 	bdev_io = spdk_bdev_get_io(channel);
1568 	if (!bdev_io) {
1569 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1570 		return -ENOMEM;
1571 	}
1572 
1573 	bdev_io->ch = channel;
1574 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1575 	bdev_io->u.bdev.iov.iov_base = buf;
1576 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1577 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1578 	bdev_io->u.bdev.iovcnt = 1;
1579 	bdev_io->u.bdev.num_blocks = num_blocks;
1580 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1581 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1582 
1583 	spdk_bdev_io_submit(bdev_io);
1584 	return 0;
1585 }
1586 
1587 int
1588 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1589 		 struct iovec *iov, int iovcnt,
1590 		 uint64_t offset, uint64_t len,
1591 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1592 {
1593 	uint64_t offset_blocks, num_blocks;
1594 
1595 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1596 		return -EINVAL;
1597 	}
1598 
1599 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1600 }
1601 
1602 int
1603 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1604 			struct iovec *iov, int iovcnt,
1605 			uint64_t offset_blocks, uint64_t num_blocks,
1606 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1607 {
1608 	struct spdk_bdev *bdev = desc->bdev;
1609 	struct spdk_bdev_io *bdev_io;
1610 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1611 
1612 	if (!desc->write) {
1613 		return -EBADF;
1614 	}
1615 
1616 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1617 		return -EINVAL;
1618 	}
1619 
1620 	bdev_io = spdk_bdev_get_io(channel);
1621 	if (!bdev_io) {
1622 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1623 		return -ENOMEM;
1624 	}
1625 
1626 	bdev_io->ch = channel;
1627 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1628 	bdev_io->u.bdev.iovs = iov;
1629 	bdev_io->u.bdev.iovcnt = iovcnt;
1630 	bdev_io->u.bdev.num_blocks = num_blocks;
1631 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1632 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1633 
1634 	spdk_bdev_io_submit(bdev_io);
1635 	return 0;
1636 }
1637 
1638 int
1639 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1640 		       uint64_t offset, uint64_t len,
1641 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1642 {
1643 	uint64_t offset_blocks, num_blocks;
1644 
1645 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1646 		return -EINVAL;
1647 	}
1648 
1649 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1650 }
1651 
1652 int
1653 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1654 			      uint64_t offset_blocks, uint64_t num_blocks,
1655 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1656 {
1657 	struct spdk_bdev *bdev = desc->bdev;
1658 	struct spdk_bdev_io *bdev_io;
1659 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1660 	uint64_t len;
1661 	bool split_request = false;
1662 
1663 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1664 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1665 		return -ERANGE;
1666 	}
1667 
1668 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1669 		return -EINVAL;
1670 	}
1671 
1672 	bdev_io = spdk_bdev_get_io(channel);
1673 
1674 	if (!bdev_io) {
1675 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1676 		return -ENOMEM;
1677 	}
1678 
1679 	bdev_io->ch = channel;
1680 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1681 
1682 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1683 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1684 		bdev_io->u.bdev.num_blocks = num_blocks;
1685 		bdev_io->u.bdev.iovs = NULL;
1686 		bdev_io->u.bdev.iovcnt = 0;
1687 
1688 	} else {
1689 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1690 
1691 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1692 
1693 		if (len > ZERO_BUFFER_SIZE) {
1694 			split_request = true;
1695 			len = ZERO_BUFFER_SIZE;
1696 		}
1697 
1698 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1699 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1700 		bdev_io->u.bdev.iov.iov_len = len;
1701 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1702 		bdev_io->u.bdev.iovcnt = 1;
1703 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1704 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1705 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1706 	}
1707 
1708 	if (split_request) {
1709 		bdev_io->u.bdev.stored_user_cb = cb;
1710 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1711 	} else {
1712 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1713 	}
1714 	spdk_bdev_io_submit(bdev_io);
1715 	return 0;
1716 }
1717 
1718 int
1719 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1720 		uint64_t offset, uint64_t nbytes,
1721 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1722 {
1723 	uint64_t offset_blocks, num_blocks;
1724 
1725 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1726 		return -EINVAL;
1727 	}
1728 
1729 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1730 }
1731 
1732 int
1733 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1734 		       uint64_t offset_blocks, uint64_t num_blocks,
1735 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1736 {
1737 	struct spdk_bdev *bdev = desc->bdev;
1738 	struct spdk_bdev_io *bdev_io;
1739 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1740 
1741 	if (!desc->write) {
1742 		return -EBADF;
1743 	}
1744 
1745 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1746 		return -EINVAL;
1747 	}
1748 
1749 	if (num_blocks == 0) {
1750 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1751 		return -EINVAL;
1752 	}
1753 
1754 	bdev_io = spdk_bdev_get_io(channel);
1755 	if (!bdev_io) {
1756 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1757 		return -ENOMEM;
1758 	}
1759 
1760 	bdev_io->ch = channel;
1761 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1762 	bdev_io->u.bdev.iov.iov_base = NULL;
1763 	bdev_io->u.bdev.iov.iov_len = 0;
1764 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1765 	bdev_io->u.bdev.iovcnt = 1;
1766 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1767 	bdev_io->u.bdev.num_blocks = num_blocks;
1768 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1769 
1770 	spdk_bdev_io_submit(bdev_io);
1771 	return 0;
1772 }
1773 
1774 int
1775 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1776 		uint64_t offset, uint64_t length,
1777 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1778 {
1779 	uint64_t offset_blocks, num_blocks;
1780 
1781 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1782 		return -EINVAL;
1783 	}
1784 
1785 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1786 }
1787 
1788 int
1789 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1790 		       uint64_t offset_blocks, uint64_t num_blocks,
1791 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1792 {
1793 	struct spdk_bdev *bdev = desc->bdev;
1794 	struct spdk_bdev_io *bdev_io;
1795 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1796 
1797 	if (!desc->write) {
1798 		return -EBADF;
1799 	}
1800 
1801 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1802 		return -EINVAL;
1803 	}
1804 
1805 	bdev_io = spdk_bdev_get_io(channel);
1806 	if (!bdev_io) {
1807 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1808 		return -ENOMEM;
1809 	}
1810 
1811 	bdev_io->ch = channel;
1812 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1813 	bdev_io->u.bdev.iovs = NULL;
1814 	bdev_io->u.bdev.iovcnt = 0;
1815 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1816 	bdev_io->u.bdev.num_blocks = num_blocks;
1817 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1818 
1819 	spdk_bdev_io_submit(bdev_io);
1820 	return 0;
1821 }
1822 
1823 static void
1824 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1825 {
1826 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1827 	struct spdk_bdev_io *bdev_io;
1828 
1829 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1830 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1831 	spdk_bdev_io_submit_reset(bdev_io);
1832 }
1833 
1834 static void
1835 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1836 {
1837 	struct spdk_io_channel		*ch;
1838 	struct spdk_bdev_channel	*channel;
1839 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1840 	struct spdk_bdev_module_channel	*module_ch;
1841 
1842 	ch = spdk_io_channel_iter_get_channel(i);
1843 	channel = spdk_io_channel_get_ctx(ch);
1844 	module_ch = channel->module_ch;
1845 	mgmt_channel = module_ch->mgmt_ch;
1846 
1847 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1848 
1849 	_spdk_bdev_abort_queued_io(&module_ch->nomem_io, channel);
1850 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1851 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1852 
1853 	spdk_for_each_channel_continue(i, 0);
1854 }
1855 
1856 static void
1857 _spdk_bdev_reset_freeze_qos_channel(void *ctx)
1858 {
1859 	struct spdk_bdev		*bdev = ctx;
1860 	struct spdk_bdev_mgmt_channel	*mgmt_channel = NULL;
1861 	struct spdk_bdev_channel	*qos_channel = bdev->qos.ch;
1862 	struct spdk_bdev_module_channel	*module_ch = NULL;
1863 
1864 	if (qos_channel) {
1865 		module_ch = qos_channel->module_ch;
1866 		mgmt_channel = module_ch->mgmt_ch;
1867 
1868 		qos_channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1869 
1870 		_spdk_bdev_abort_queued_io(&module_ch->nomem_io, qos_channel);
1871 		_spdk_bdev_abort_queued_io(&bdev->qos.queued, qos_channel);
1872 		_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, qos_channel);
1873 		_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, qos_channel);
1874 	}
1875 }
1876 
1877 static void
1878 _spdk_bdev_start_reset(void *ctx)
1879 {
1880 	struct spdk_bdev_channel *ch = ctx;
1881 
1882 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
1883 			      ch, _spdk_bdev_reset_dev);
1884 }
1885 
1886 static void
1887 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1888 {
1889 	struct spdk_bdev *bdev = ch->bdev;
1890 
1891 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1892 
1893 	pthread_mutex_lock(&bdev->mutex);
1894 	if (bdev->reset_in_progress == NULL) {
1895 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1896 		/*
1897 		 * Take a channel reference for the target bdev for the life of this
1898 		 *  reset.  This guards against the channel getting destroyed while
1899 		 *  spdk_for_each_channel() calls related to this reset IO are in
1900 		 *  progress.  We will release the reference when this reset is
1901 		 *  completed.
1902 		 */
1903 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1904 		_spdk_bdev_start_reset(ch);
1905 	}
1906 	pthread_mutex_unlock(&bdev->mutex);
1907 }
1908 
1909 int
1910 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1911 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1912 {
1913 	struct spdk_bdev *bdev = desc->bdev;
1914 	struct spdk_bdev_io *bdev_io;
1915 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1916 
1917 	bdev_io = spdk_bdev_get_io(channel);
1918 	if (!bdev_io) {
1919 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1920 		return -ENOMEM;
1921 	}
1922 
1923 	bdev_io->ch = channel;
1924 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1925 	bdev_io->u.reset.ch_ref = NULL;
1926 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1927 
1928 	pthread_mutex_lock(&bdev->mutex);
1929 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1930 	pthread_mutex_unlock(&bdev->mutex);
1931 
1932 	_spdk_bdev_channel_start_reset(channel);
1933 
1934 	/* Explicitly handle the QoS bdev channel as no IO channel associated */
1935 	if (bdev->qos.enabled && bdev->qos.thread) {
1936 		spdk_thread_send_msg(bdev->qos.thread,
1937 				     _spdk_bdev_reset_freeze_qos_channel, bdev);
1938 	}
1939 
1940 	return 0;
1941 }
1942 
1943 void
1944 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1945 		      struct spdk_bdev_io_stat *stat)
1946 {
1947 #ifdef SPDK_CONFIG_VTUNE
1948 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1949 	memset(stat, 0, sizeof(*stat));
1950 	return;
1951 #endif
1952 
1953 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1954 
1955 	channel->stat.ticks_rate = spdk_get_ticks_hz();
1956 	*stat = channel->stat;
1957 	memset(&channel->stat, 0, sizeof(channel->stat));
1958 }
1959 
1960 int
1961 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1962 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1963 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1964 {
1965 	struct spdk_bdev *bdev = desc->bdev;
1966 	struct spdk_bdev_io *bdev_io;
1967 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1968 
1969 	if (!desc->write) {
1970 		return -EBADF;
1971 	}
1972 
1973 	bdev_io = spdk_bdev_get_io(channel);
1974 	if (!bdev_io) {
1975 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1976 		return -ENOMEM;
1977 	}
1978 
1979 	bdev_io->ch = channel;
1980 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1981 	bdev_io->u.nvme_passthru.cmd = *cmd;
1982 	bdev_io->u.nvme_passthru.buf = buf;
1983 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1984 	bdev_io->u.nvme_passthru.md_buf = NULL;
1985 	bdev_io->u.nvme_passthru.md_len = 0;
1986 
1987 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1988 
1989 	spdk_bdev_io_submit(bdev_io);
1990 	return 0;
1991 }
1992 
1993 int
1994 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1995 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1996 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1997 {
1998 	struct spdk_bdev *bdev = desc->bdev;
1999 	struct spdk_bdev_io *bdev_io;
2000 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2001 
2002 	if (!desc->write) {
2003 		/*
2004 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2005 		 *  to easily determine if the command is a read or write, but for now just
2006 		 *  do not allow io_passthru with a read-only descriptor.
2007 		 */
2008 		return -EBADF;
2009 	}
2010 
2011 	bdev_io = spdk_bdev_get_io(channel);
2012 	if (!bdev_io) {
2013 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2014 		return -ENOMEM;
2015 	}
2016 
2017 	bdev_io->ch = channel;
2018 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2019 	bdev_io->u.nvme_passthru.cmd = *cmd;
2020 	bdev_io->u.nvme_passthru.buf = buf;
2021 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2022 	bdev_io->u.nvme_passthru.md_buf = NULL;
2023 	bdev_io->u.nvme_passthru.md_len = 0;
2024 
2025 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2026 
2027 	spdk_bdev_io_submit(bdev_io);
2028 	return 0;
2029 }
2030 
2031 int
2032 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2033 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2034 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2035 {
2036 	struct spdk_bdev *bdev = desc->bdev;
2037 	struct spdk_bdev_io *bdev_io;
2038 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2039 
2040 	if (!desc->write) {
2041 		/*
2042 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2043 		 *  to easily determine if the command is a read or write, but for now just
2044 		 *  do not allow io_passthru with a read-only descriptor.
2045 		 */
2046 		return -EBADF;
2047 	}
2048 
2049 	bdev_io = spdk_bdev_get_io(channel);
2050 	if (!bdev_io) {
2051 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2052 		return -ENOMEM;
2053 	}
2054 
2055 	bdev_io->ch = channel;
2056 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2057 	bdev_io->u.nvme_passthru.cmd = *cmd;
2058 	bdev_io->u.nvme_passthru.buf = buf;
2059 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2060 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2061 	bdev_io->u.nvme_passthru.md_len = md_len;
2062 
2063 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2064 
2065 	spdk_bdev_io_submit(bdev_io);
2066 	return 0;
2067 }
2068 
2069 int
2070 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
2071 {
2072 	if (!bdev_io) {
2073 		SPDK_ERRLOG("bdev_io is NULL\n");
2074 		return -1;
2075 	}
2076 
2077 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
2078 		SPDK_ERRLOG("bdev_io is in pending state\n");
2079 		assert(false);
2080 		return -1;
2081 	}
2082 
2083 	spdk_bdev_put_io(bdev_io);
2084 
2085 	return 0;
2086 }
2087 
2088 static void
2089 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2090 {
2091 	struct spdk_bdev *bdev = bdev_ch->bdev;
2092 	struct spdk_bdev_module_channel	*module_ch = bdev_ch->module_ch;
2093 	struct spdk_bdev_io *bdev_io;
2094 
2095 	if (module_ch->io_outstanding > module_ch->nomem_threshold) {
2096 		/*
2097 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2098 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2099 		 *  the context of a completion, because the resources for the I/O are
2100 		 *  not released until control returns to the bdev poller.  Also, we
2101 		 *  may require several small I/O to complete before a larger I/O
2102 		 *  (that requires splitting) can be submitted.
2103 		 */
2104 		return;
2105 	}
2106 
2107 	while (!TAILQ_EMPTY(&module_ch->nomem_io)) {
2108 		bdev_io = TAILQ_FIRST(&module_ch->nomem_io);
2109 		TAILQ_REMOVE(&module_ch->nomem_io, bdev_io, link);
2110 		bdev_io->ch->io_outstanding++;
2111 		module_ch->io_outstanding++;
2112 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
2113 		bdev->fn_table->submit_request(bdev_io->ch->channel, bdev_io);
2114 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
2115 			break;
2116 		}
2117 	}
2118 }
2119 
2120 static inline void
2121 _spdk_bdev_io_complete(void *ctx)
2122 {
2123 	struct spdk_bdev_io *bdev_io = ctx;
2124 
2125 	if (spdk_unlikely(bdev_io->in_submit_request || bdev_io->io_submit_ch)) {
2126 		/*
2127 		 * Send the completion to the thread that originally submitted the I/O,
2128 		 * which may not be the current thread in the case of QoS.
2129 		 */
2130 		if (bdev_io->io_submit_ch) {
2131 			bdev_io->ch = bdev_io->io_submit_ch;
2132 			bdev_io->io_submit_ch = NULL;
2133 		}
2134 
2135 		/*
2136 		 * Defer completion to avoid potential infinite recursion if the
2137 		 * user's completion callback issues a new I/O.
2138 		 */
2139 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
2140 				     _spdk_bdev_io_complete, bdev_io);
2141 		return;
2142 	}
2143 
2144 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2145 		switch (bdev_io->type) {
2146 		case SPDK_BDEV_IO_TYPE_READ:
2147 			bdev_io->ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2148 			bdev_io->ch->stat.num_read_ops++;
2149 			bdev_io->ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2150 			break;
2151 		case SPDK_BDEV_IO_TYPE_WRITE:
2152 			bdev_io->ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2153 			bdev_io->ch->stat.num_write_ops++;
2154 			bdev_io->ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2155 			break;
2156 		default:
2157 			break;
2158 		}
2159 	}
2160 
2161 #ifdef SPDK_CONFIG_VTUNE
2162 	uint64_t now_tsc = spdk_get_ticks();
2163 	if (now_tsc > (bdev_io->ch->start_tsc + bdev_io->ch->interval_tsc)) {
2164 		uint64_t data[5];
2165 
2166 		data[0] = bdev_io->ch->stat.num_read_ops;
2167 		data[1] = bdev_io->ch->stat.bytes_read;
2168 		data[2] = bdev_io->ch->stat.num_write_ops;
2169 		data[3] = bdev_io->ch->stat.bytes_written;
2170 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
2171 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->ch->channel) : 0;
2172 
2173 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->ch->handle,
2174 				   __itt_metadata_u64, 5, data);
2175 
2176 		memset(&bdev_io->ch->stat, 0, sizeof(bdev_io->ch->stat));
2177 		bdev_io->ch->start_tsc = now_tsc;
2178 	}
2179 #endif
2180 
2181 	assert(bdev_io->cb != NULL);
2182 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->ch->channel));
2183 
2184 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS,
2185 		    bdev_io->caller_ctx);
2186 }
2187 
2188 static void
2189 _spdk_bdev_unfreeze_qos_channel(void *ctx)
2190 {
2191 	struct spdk_bdev	*bdev = ctx;
2192 
2193 	if (bdev->qos.ch) {
2194 		bdev->qos.ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2195 		assert(TAILQ_EMPTY(&bdev->qos.ch->queued_resets));
2196 	}
2197 }
2198 
2199 static void
2200 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2201 {
2202 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2203 
2204 	if (bdev_io->u.reset.ch_ref != NULL) {
2205 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2206 		bdev_io->u.reset.ch_ref = NULL;
2207 	}
2208 
2209 	_spdk_bdev_io_complete(bdev_io);
2210 }
2211 
2212 static void
2213 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2214 {
2215 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2216 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2217 
2218 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2219 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2220 		_spdk_bdev_channel_start_reset(ch);
2221 	}
2222 
2223 	spdk_for_each_channel_continue(i, 0);
2224 }
2225 
2226 void
2227 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2228 {
2229 	struct spdk_bdev *bdev = bdev_io->bdev;
2230 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
2231 	struct spdk_bdev_module_channel	*module_ch = bdev_ch->module_ch;
2232 
2233 	bdev_io->status = status;
2234 
2235 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2236 		bool unlock_channels = false;
2237 
2238 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2239 			SPDK_ERRLOG("NOMEM returned for reset\n");
2240 		}
2241 		pthread_mutex_lock(&bdev->mutex);
2242 		if (bdev_io == bdev->reset_in_progress) {
2243 			bdev->reset_in_progress = NULL;
2244 			unlock_channels = true;
2245 		}
2246 		pthread_mutex_unlock(&bdev->mutex);
2247 
2248 		if (unlock_channels) {
2249 			/* Explicitly handle the QoS bdev channel as no IO channel associated */
2250 			if (bdev->qos.enabled && bdev->qos.thread) {
2251 				spdk_thread_send_msg(bdev->qos.thread,
2252 						     _spdk_bdev_unfreeze_qos_channel, bdev);
2253 			}
2254 
2255 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2256 					      bdev_io, _spdk_bdev_reset_complete);
2257 			return;
2258 		}
2259 	} else {
2260 		assert(bdev_ch->io_outstanding > 0);
2261 		assert(module_ch->io_outstanding > 0);
2262 		bdev_ch->io_outstanding--;
2263 		module_ch->io_outstanding--;
2264 
2265 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2266 			TAILQ_INSERT_HEAD(&module_ch->nomem_io, bdev_io, link);
2267 			/*
2268 			 * Wait for some of the outstanding I/O to complete before we
2269 			 *  retry any of the nomem_io.  Normally we will wait for
2270 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2271 			 *  depth channels we will instead wait for half to complete.
2272 			 */
2273 			module_ch->nomem_threshold = spdk_max((int64_t)module_ch->io_outstanding / 2,
2274 							      (int64_t)module_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
2275 			return;
2276 		}
2277 
2278 		if (spdk_unlikely(!TAILQ_EMPTY(&module_ch->nomem_io))) {
2279 			_spdk_bdev_ch_retry_io(bdev_ch);
2280 		}
2281 	}
2282 
2283 	_spdk_bdev_io_complete(bdev_io);
2284 }
2285 
2286 void
2287 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2288 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2289 {
2290 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2291 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2292 	} else {
2293 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2294 		bdev_io->error.scsi.sc = sc;
2295 		bdev_io->error.scsi.sk = sk;
2296 		bdev_io->error.scsi.asc = asc;
2297 		bdev_io->error.scsi.ascq = ascq;
2298 	}
2299 
2300 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2301 }
2302 
2303 void
2304 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2305 			     int *sc, int *sk, int *asc, int *ascq)
2306 {
2307 	assert(sc != NULL);
2308 	assert(sk != NULL);
2309 	assert(asc != NULL);
2310 	assert(ascq != NULL);
2311 
2312 	switch (bdev_io->status) {
2313 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2314 		*sc = SPDK_SCSI_STATUS_GOOD;
2315 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2316 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2317 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2318 		break;
2319 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2320 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2321 		break;
2322 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2323 		*sc = bdev_io->error.scsi.sc;
2324 		*sk = bdev_io->error.scsi.sk;
2325 		*asc = bdev_io->error.scsi.asc;
2326 		*ascq = bdev_io->error.scsi.ascq;
2327 		break;
2328 	default:
2329 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2330 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2331 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2332 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2333 		break;
2334 	}
2335 }
2336 
2337 void
2338 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2339 {
2340 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2341 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2342 	} else {
2343 		bdev_io->error.nvme.sct = sct;
2344 		bdev_io->error.nvme.sc = sc;
2345 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2346 	}
2347 
2348 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2349 }
2350 
2351 void
2352 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2353 {
2354 	assert(sct != NULL);
2355 	assert(sc != NULL);
2356 
2357 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2358 		*sct = bdev_io->error.nvme.sct;
2359 		*sc = bdev_io->error.nvme.sc;
2360 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2361 		*sct = SPDK_NVME_SCT_GENERIC;
2362 		*sc = SPDK_NVME_SC_SUCCESS;
2363 	} else {
2364 		*sct = SPDK_NVME_SCT_GENERIC;
2365 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2366 	}
2367 }
2368 
2369 struct spdk_thread *
2370 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2371 {
2372 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
2373 }
2374 
2375 static void
2376 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
2377 {
2378 	struct spdk_conf_section	*sp = NULL;
2379 	const char			*val = NULL;
2380 	uint64_t			ios_per_sec = 0;
2381 	int				i = 0;
2382 
2383 	sp = spdk_conf_find_section(NULL, "QoS");
2384 	if (!sp) {
2385 		return;
2386 	}
2387 
2388 	while (true) {
2389 		val = spdk_conf_section_get_nmval(sp, "Limit_IOPS", i, 0);
2390 		if (!val) {
2391 			break;
2392 		}
2393 
2394 		if (strcmp(bdev->name, val) != 0) {
2395 			i++;
2396 			continue;
2397 		}
2398 
2399 		val = spdk_conf_section_get_nmval(sp, "Limit_IOPS", i, 1);
2400 		if (!val) {
2401 			return;
2402 		}
2403 
2404 		ios_per_sec = strtoull(val, NULL, 10);
2405 		if (ios_per_sec > 0) {
2406 			if (ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) {
2407 				SPDK_ERRLOG("Assigned IOPS %" PRIu64 " on bdev %s is not multiple of %u\n",
2408 					    ios_per_sec, bdev->name, SPDK_BDEV_QOS_MIN_IOS_PER_SEC);
2409 				SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
2410 			} else {
2411 				bdev->qos.enabled = true;
2412 				bdev->qos.rate_limit = ios_per_sec;
2413 				SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS:%lu\n",
2414 					      bdev->name, bdev->qos.rate_limit);
2415 			}
2416 		}
2417 
2418 		return;
2419 	}
2420 }
2421 
2422 static int
2423 spdk_bdev_init(struct spdk_bdev *bdev)
2424 {
2425 	assert(bdev->module != NULL);
2426 
2427 	if (!bdev->name) {
2428 		SPDK_ERRLOG("Bdev name is NULL\n");
2429 		return -EINVAL;
2430 	}
2431 
2432 	if (spdk_bdev_get_by_name(bdev->name)) {
2433 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2434 		return -EEXIST;
2435 	}
2436 
2437 	bdev->status = SPDK_BDEV_STATUS_READY;
2438 
2439 	TAILQ_INIT(&bdev->open_descs);
2440 
2441 	TAILQ_INIT(&bdev->aliases);
2442 
2443 	bdev->reset_in_progress = NULL;
2444 
2445 	_spdk_bdev_qos_config(bdev);
2446 
2447 	spdk_io_device_register(__bdev_to_io_dev(bdev),
2448 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2449 				sizeof(struct spdk_bdev_channel));
2450 
2451 	pthread_mutex_init(&bdev->mutex, NULL);
2452 	return 0;
2453 }
2454 
2455 static void
2456 spdk_bdev_destroy_cb(void *io_device)
2457 {
2458 	int			rc;
2459 	struct spdk_bdev	*bdev;
2460 	spdk_bdev_unregister_cb	cb_fn;
2461 	void			*cb_arg;
2462 
2463 	bdev = __bdev_from_io_dev(io_device);
2464 	cb_fn = bdev->unregister_cb;
2465 	cb_arg = bdev->unregister_ctx;
2466 
2467 	rc = bdev->fn_table->destruct(bdev->ctxt);
2468 	if (rc < 0) {
2469 		SPDK_ERRLOG("destruct failed\n");
2470 	}
2471 	if (rc <= 0 && cb_fn != NULL) {
2472 		cb_fn(cb_arg, rc);
2473 	}
2474 }
2475 
2476 
2477 static void
2478 spdk_bdev_fini(struct spdk_bdev *bdev)
2479 {
2480 	pthread_mutex_destroy(&bdev->mutex);
2481 
2482 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
2483 }
2484 
2485 static void
2486 spdk_bdev_start(struct spdk_bdev *bdev)
2487 {
2488 	struct spdk_bdev_module *module;
2489 
2490 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2491 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
2492 
2493 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
2494 		if (module->examine) {
2495 			module->action_in_progress++;
2496 			module->examine(bdev);
2497 		}
2498 	}
2499 }
2500 
2501 int
2502 spdk_bdev_register(struct spdk_bdev *bdev)
2503 {
2504 	int rc = spdk_bdev_init(bdev);
2505 
2506 	if (rc == 0) {
2507 		spdk_bdev_start(bdev);
2508 	}
2509 
2510 	return rc;
2511 }
2512 
2513 static void
2514 spdk_vbdev_remove_base_bdevs(struct spdk_bdev *vbdev)
2515 {
2516 	struct spdk_bdev **bdevs;
2517 	struct spdk_bdev *base;
2518 	size_t i, j, k;
2519 	bool found;
2520 
2521 	/* Iterate over base bdevs to remove vbdev from them. */
2522 	for (i = 0; i < vbdev->base_bdevs_cnt; i++) {
2523 		found = false;
2524 		base = vbdev->base_bdevs[i];
2525 
2526 		for (j = 0; j < base->vbdevs_cnt; j++) {
2527 			if (base->vbdevs[j] != vbdev) {
2528 				continue;
2529 			}
2530 
2531 			for (k = j; k + 1 < base->vbdevs_cnt; k++) {
2532 				base->vbdevs[k] = base->vbdevs[k + 1];
2533 			}
2534 
2535 			base->vbdevs_cnt--;
2536 			if (base->vbdevs_cnt > 0) {
2537 				bdevs = realloc(base->vbdevs, base->vbdevs_cnt * sizeof(bdevs[0]));
2538 				/* It would be odd if shrinking memory block fail. */
2539 				assert(bdevs);
2540 				base->vbdevs = bdevs;
2541 			} else {
2542 				free(base->vbdevs);
2543 				base->vbdevs = NULL;
2544 			}
2545 
2546 			found = true;
2547 			break;
2548 		}
2549 
2550 		if (!found) {
2551 			SPDK_WARNLOG("Bdev '%s' is not base bdev of '%s'.\n", base->name, vbdev->name);
2552 		}
2553 	}
2554 
2555 	free(vbdev->base_bdevs);
2556 	vbdev->base_bdevs = NULL;
2557 	vbdev->base_bdevs_cnt = 0;
2558 }
2559 
2560 static int
2561 spdk_vbdev_set_base_bdevs(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, size_t cnt)
2562 {
2563 	struct spdk_bdev **vbdevs;
2564 	struct spdk_bdev *base;
2565 	size_t i;
2566 
2567 	/* Adding base bdevs isn't supported (yet?). */
2568 	assert(vbdev->base_bdevs_cnt == 0);
2569 
2570 	vbdev->base_bdevs = malloc(cnt * sizeof(vbdev->base_bdevs[0]));
2571 	if (!vbdev->base_bdevs) {
2572 		SPDK_ERRLOG("%s - realloc() failed\n", vbdev->name);
2573 		return -ENOMEM;
2574 	}
2575 
2576 	memcpy(vbdev->base_bdevs, base_bdevs, cnt * sizeof(vbdev->base_bdevs[0]));
2577 	vbdev->base_bdevs_cnt = cnt;
2578 
2579 	/* Iterate over base bdevs to add this vbdev to them. */
2580 	for (i = 0; i < cnt; i++) {
2581 		base = vbdev->base_bdevs[i];
2582 
2583 		assert(base != NULL);
2584 		assert(base->claim_module != NULL);
2585 
2586 		vbdevs = realloc(base->vbdevs, (base->vbdevs_cnt + 1) * sizeof(vbdevs[0]));
2587 		if (!vbdevs) {
2588 			SPDK_ERRLOG("%s - realloc() failed\n", base->name);
2589 			spdk_vbdev_remove_base_bdevs(vbdev);
2590 			return -ENOMEM;
2591 		}
2592 
2593 		vbdevs[base->vbdevs_cnt] = vbdev;
2594 		base->vbdevs = vbdevs;
2595 		base->vbdevs_cnt++;
2596 	}
2597 
2598 	return 0;
2599 }
2600 
2601 int
2602 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2603 {
2604 	int rc;
2605 
2606 	rc = spdk_bdev_init(vbdev);
2607 	if (rc) {
2608 		return rc;
2609 	}
2610 
2611 	if (base_bdev_count == 0) {
2612 		spdk_bdev_start(vbdev);
2613 		return 0;
2614 	}
2615 
2616 	rc = spdk_vbdev_set_base_bdevs(vbdev, base_bdevs, base_bdev_count);
2617 	if (rc) {
2618 		spdk_bdev_fini(vbdev);
2619 		return rc;
2620 	}
2621 
2622 	spdk_bdev_start(vbdev);
2623 	return 0;
2624 
2625 }
2626 
2627 void
2628 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
2629 {
2630 	if (bdev->unregister_cb != NULL) {
2631 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2632 	}
2633 }
2634 
2635 static void
2636 _remove_notify(void *arg)
2637 {
2638 	struct spdk_bdev_desc *desc = arg;
2639 
2640 	desc->remove_cb(desc->remove_ctx);
2641 }
2642 
2643 void
2644 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2645 {
2646 	struct spdk_bdev_desc	*desc, *tmp;
2647 	bool			do_destruct = true;
2648 	struct spdk_thread	*thread;
2649 
2650 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2651 
2652 	thread = spdk_get_thread();
2653 	if (!thread) {
2654 		/* The user called this from a non-SPDK thread. */
2655 		cb_fn(cb_arg, -ENOTSUP);
2656 		return;
2657 	}
2658 
2659 	pthread_mutex_lock(&bdev->mutex);
2660 
2661 	spdk_vbdev_remove_base_bdevs(bdev);
2662 
2663 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2664 	bdev->unregister_cb = cb_fn;
2665 	bdev->unregister_ctx = cb_arg;
2666 
2667 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2668 		if (desc->remove_cb) {
2669 			do_destruct = false;
2670 			/*
2671 			 * Defer invocation of the remove_cb to a separate message that will
2672 			 *  run later on this thread.  This ensures this context unwinds and
2673 			 *  we don't recursively unregister this bdev again if the remove_cb
2674 			 *  immediately closes its descriptor.
2675 			 */
2676 			spdk_thread_send_msg(thread, _remove_notify, desc);
2677 		}
2678 	}
2679 
2680 	if (!do_destruct) {
2681 		pthread_mutex_unlock(&bdev->mutex);
2682 		return;
2683 	}
2684 
2685 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2686 	pthread_mutex_unlock(&bdev->mutex);
2687 
2688 	spdk_bdev_fini(bdev);
2689 }
2690 
2691 int
2692 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2693 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2694 {
2695 	struct spdk_bdev_desc *desc;
2696 
2697 	desc = calloc(1, sizeof(*desc));
2698 	if (desc == NULL) {
2699 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2700 		return -ENOMEM;
2701 	}
2702 
2703 	pthread_mutex_lock(&bdev->mutex);
2704 
2705 	if (write && bdev->claim_module) {
2706 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
2707 		free(desc);
2708 		pthread_mutex_unlock(&bdev->mutex);
2709 		return -EPERM;
2710 	}
2711 
2712 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2713 
2714 	desc->bdev = bdev;
2715 	desc->remove_cb = remove_cb;
2716 	desc->remove_ctx = remove_ctx;
2717 	desc->write = write;
2718 	*_desc = desc;
2719 
2720 	pthread_mutex_unlock(&bdev->mutex);
2721 
2722 	return 0;
2723 }
2724 
2725 void
2726 spdk_bdev_close(struct spdk_bdev_desc *desc)
2727 {
2728 	struct spdk_bdev *bdev = desc->bdev;
2729 	bool do_unregister = false;
2730 
2731 	pthread_mutex_lock(&bdev->mutex);
2732 
2733 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2734 	free(desc);
2735 
2736 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
2737 		do_unregister = true;
2738 	}
2739 	pthread_mutex_unlock(&bdev->mutex);
2740 
2741 	if (do_unregister == true) {
2742 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
2743 	}
2744 }
2745 
2746 int
2747 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
2748 			    struct spdk_bdev_module *module)
2749 {
2750 	if (bdev->claim_module != NULL) {
2751 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
2752 			    bdev->claim_module->name);
2753 		return -EPERM;
2754 	}
2755 
2756 	if (desc && !desc->write) {
2757 		desc->write = true;
2758 	}
2759 
2760 	bdev->claim_module = module;
2761 	return 0;
2762 }
2763 
2764 void
2765 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2766 {
2767 	assert(bdev->claim_module != NULL);
2768 	bdev->claim_module = NULL;
2769 }
2770 
2771 struct spdk_bdev *
2772 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2773 {
2774 	return desc->bdev;
2775 }
2776 
2777 void
2778 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2779 {
2780 	struct iovec *iovs;
2781 	int iovcnt;
2782 
2783 	if (bdev_io == NULL) {
2784 		return;
2785 	}
2786 
2787 	switch (bdev_io->type) {
2788 	case SPDK_BDEV_IO_TYPE_READ:
2789 		iovs = bdev_io->u.bdev.iovs;
2790 		iovcnt = bdev_io->u.bdev.iovcnt;
2791 		break;
2792 	case SPDK_BDEV_IO_TYPE_WRITE:
2793 		iovs = bdev_io->u.bdev.iovs;
2794 		iovcnt = bdev_io->u.bdev.iovcnt;
2795 		break;
2796 	default:
2797 		iovs = NULL;
2798 		iovcnt = 0;
2799 		break;
2800 	}
2801 
2802 	if (iovp) {
2803 		*iovp = iovs;
2804 	}
2805 	if (iovcntp) {
2806 		*iovcntp = iovcnt;
2807 	}
2808 }
2809 
2810 void
2811 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
2812 {
2813 
2814 	if (spdk_bdev_module_list_find(bdev_module->name)) {
2815 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
2816 		assert(false);
2817 	}
2818 
2819 	if (bdev_module->async_init) {
2820 		bdev_module->action_in_progress = 1;
2821 	}
2822 
2823 	/*
2824 	 * Modules with examine callbacks must be initialized first, so they are
2825 	 *  ready to handle examine callbacks from later modules that will
2826 	 *  register physical bdevs.
2827 	 */
2828 	if (bdev_module->examine != NULL) {
2829 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2830 	} else {
2831 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2832 	}
2833 }
2834 
2835 struct spdk_bdev_module *
2836 spdk_bdev_module_list_find(const char *name)
2837 {
2838 	struct spdk_bdev_module *bdev_module;
2839 
2840 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
2841 		if (strcmp(name, bdev_module->name) == 0) {
2842 			break;
2843 		}
2844 	}
2845 
2846 	return bdev_module;
2847 }
2848 
2849 static void
2850 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2851 {
2852 	uint64_t len;
2853 
2854 	if (!success) {
2855 		bdev_io->cb = bdev_io->u.bdev.stored_user_cb;
2856 		_spdk_bdev_io_complete(bdev_io);
2857 		return;
2858 	}
2859 
2860 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2861 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks,
2862 		       ZERO_BUFFER_SIZE);
2863 
2864 	bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks;
2865 	bdev_io->u.bdev.iov.iov_len = len;
2866 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2867 	bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2868 	bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2869 
2870 	/* if this round completes the i/o, change the callback to be the original user callback */
2871 	if (bdev_io->u.bdev.split_remaining_num_blocks == 0) {
2872 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb);
2873 	} else {
2874 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2875 	}
2876 	spdk_bdev_io_submit(bdev_io);
2877 }
2878 
2879 struct set_qos_limit_ctx {
2880 	void (*cb_fn)(void *cb_arg, int status);
2881 	void *cb_arg;
2882 	struct spdk_bdev *bdev;
2883 };
2884 
2885 static void
2886 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
2887 {
2888 	pthread_mutex_lock(&ctx->bdev->mutex);
2889 	ctx->bdev->qos.mod_in_progress = false;
2890 	pthread_mutex_unlock(&ctx->bdev->mutex);
2891 
2892 	ctx->cb_fn(ctx->cb_arg, status);
2893 	free(ctx);
2894 }
2895 
2896 static void
2897 _spdk_bdev_disable_qos_done(void *cb_arg)
2898 {
2899 	struct set_qos_limit_ctx *ctx = cb_arg;
2900 	struct spdk_bdev *bdev = ctx->bdev;
2901 	struct spdk_bdev_qos *qos;
2902 
2903 	pthread_mutex_lock(&bdev->mutex);
2904 	qos = &bdev->qos;
2905 
2906 	qos->enabled = false;
2907 	_spdk_bdev_abort_queued_io(&qos->queued, qos->ch);
2908 	_spdk_bdev_channel_destroy(qos->ch);
2909 	free(qos->ch);
2910 	qos->ch = NULL;
2911 	qos->thread = NULL;
2912 	qos->max_ios_per_timeslice = 0;
2913 	qos->io_submitted_this_timeslice = 0;
2914 	spdk_poller_unregister(&qos->poller);
2915 	pthread_mutex_unlock(&bdev->mutex);
2916 
2917 	_spdk_bdev_set_qos_limit_done(ctx, 0);
2918 }
2919 
2920 static void
2921 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
2922 {
2923 	void *io_device = spdk_io_channel_iter_get_io_device(i);
2924 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
2925 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
2926 	struct spdk_thread *thread;
2927 
2928 	pthread_mutex_lock(&bdev->mutex);
2929 	thread = bdev->qos.thread;
2930 	pthread_mutex_unlock(&bdev->mutex);
2931 
2932 	if (thread) {
2933 		spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
2934 	}
2935 }
2936 
2937 static void
2938 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
2939 {
2940 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2941 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
2942 
2943 	bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
2944 
2945 	spdk_for_each_channel_continue(i, 0);
2946 }
2947 
2948 static void
2949 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg)
2950 {
2951 	struct set_qos_limit_ctx *ctx = cb_arg;
2952 	struct spdk_bdev *bdev = ctx->bdev;
2953 
2954 	pthread_mutex_lock(&bdev->mutex);
2955 	spdk_bdev_qos_update_max_ios_per_timeslice(&bdev->qos);
2956 	pthread_mutex_unlock(&bdev->mutex);
2957 
2958 	_spdk_bdev_set_qos_limit_done(ctx, 0);
2959 }
2960 
2961 static void
2962 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
2963 {
2964 	void *io_device = spdk_io_channel_iter_get_io_device(i);
2965 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
2966 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2967 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
2968 	int rc;
2969 
2970 	pthread_mutex_lock(&bdev->mutex);
2971 	rc = _spdk_bdev_enable_qos(bdev, bdev_ch);
2972 	pthread_mutex_unlock(&bdev->mutex);
2973 
2974 	spdk_for_each_channel_continue(i, rc);
2975 }
2976 
2977 static void
2978 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
2979 {
2980 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
2981 
2982 	_spdk_bdev_set_qos_limit_done(ctx, status);
2983 }
2984 
2985 void
2986 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec,
2987 			     void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
2988 {
2989 	struct set_qos_limit_ctx *ctx;
2990 	struct spdk_thread *thread;
2991 
2992 	if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) {
2993 		SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n",
2994 			    ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC);
2995 		cb_fn(cb_arg, -EINVAL);
2996 		return;
2997 	}
2998 
2999 	ctx = calloc(1, sizeof(*ctx));
3000 	if (ctx == NULL) {
3001 		cb_fn(cb_arg, -ENOMEM);
3002 		return;
3003 	}
3004 
3005 	ctx->cb_fn = cb_fn;
3006 	ctx->cb_arg = cb_arg;
3007 	ctx->bdev = bdev;
3008 
3009 	pthread_mutex_lock(&bdev->mutex);
3010 	if (bdev->qos.mod_in_progress) {
3011 		pthread_mutex_unlock(&bdev->mutex);
3012 		free(ctx);
3013 		cb_fn(cb_arg, -EAGAIN);
3014 		return;
3015 	}
3016 	thread = bdev->qos.thread;
3017 	/* QoS not enabled on this bdev */
3018 	if (!thread && ios_per_sec == 0) {
3019 		pthread_mutex_unlock(&bdev->mutex);
3020 		free(ctx);
3021 		cb_fn(cb_arg, 0);
3022 		return;
3023 	}
3024 	bdev->qos.enabled = true;
3025 	bdev->qos.mod_in_progress = true;
3026 	bdev->qos.rate_limit = ios_per_sec;
3027 	pthread_mutex_unlock(&bdev->mutex);
3028 
3029 	if (thread) {
3030 		if (ios_per_sec == 0) {
3031 			/* Disabling */
3032 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3033 					      _spdk_bdev_disable_qos_msg, ctx,
3034 					      _spdk_bdev_disable_qos_msg_done);
3035 		} else {
3036 			/* Updating */
3037 			spdk_thread_send_msg(thread, _spdk_bdev_update_qos_limit_iops_msg, ctx);
3038 		}
3039 	} else {
3040 		/* Enabling */
3041 		spdk_for_each_channel(__bdev_to_io_dev(bdev),
3042 				      _spdk_bdev_enable_qos_msg, ctx,
3043 				      _spdk_bdev_enable_qos_done);
3044 	}
3045 }
3046 
3047 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
3048